Giter Site home page Giter Site logo

jafarlihi / cppq Goto Github PK

View Code? Open in Web Editor NEW
91.0 4.0 5.0 508 KB

Simple, reliable & efficient distributed task queue for C++17

License: MIT License

C++ 56.26% Python 13.62% HTML 3.48% CSS 1.96% TypeScript 24.68%
asynchronous-tasks cpp cpp17 distributed-computing task-queue tasks worker-pool background-jobs redis redis-queue

cppq's Introduction

TOC

Overview

cppq is a simple, reliable & efficient distributed task queue for C++17.

cppq is a C++ library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable and easy to get started with.

Highlevel overview of how cppq works:

  • Client puts tasks on a queue
  • Server pulls tasks off queues and starts a thread for each task
  • Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Features

  • Guaranteed at least one execution of a task
  • Retries of failed tasks
  • Automatic recovery of tasks in the event of a worker crash
  • Low latency to add a task since writes are fast in Redis
  • Queue priorities
  • Scheduling of tasks
  • Periodic tasks
  • Ability to pause queue to stop processing tasks from the queue
  • Web UI to inspect and control queues and tasks
  • CLI to inspect and control queues and tasks

Quickstart

cppq is a header-only library with 2 dependencies: libuuid and hiredis.

Just include the header: #include "cppq.h" and add these flags to your build -luuid -lhiredis.

libuuid and hiredis can be installed using your distro's package manager.

For Arch Linux that'd be: sudo pacman -S hiredis util-linux-libs

Example

#include "cppq.hpp"

#include <nlohmann/json.hpp>

// Specify task type name
const std::string TypeEmailDelivery = "email:deliver";

// Define a payload type for your task
typedef struct {
  int UserID;
  std::string TemplateID;
} EmailDeliveryPayload;

// Provide conversion to JSON (optional, you can use any kind of payload)
void to_json(nlohmann::json& j, const EmailDeliveryPayload& p) {
  j = nlohmann::json{{"UserID", p.UserID}, {"TemplateID", p.TemplateID}};
}

// Helper function to create a new task with the given payload
cppq::Task NewEmailDeliveryTask(EmailDeliveryPayload payload) {
  nlohmann::json j = payload;
  // "10" is maxRetry -- the number of times the task will be retried on exception
  return cppq::Task{TypeEmailDelivery, j.dump(), 10};
}

// The actual task code
void HandleEmailDeliveryTask(cppq::Task& task) {
  // Fetch the parameters
  nlohmann::json parsedPayload = nlohmann::json::parse(task.payload);
  int userID = parsedPayload["UserID"];
  std::string templateID = parsedPayload["TemplateID"];

  // Send the email...

  // Return a result
  nlohmann::json r;
  r["Sent"] = true;
  task.result = r.dump();
  return;
}

int main(int argc, char *argv[]) {
  // Register task types and handlers
  cppq::registerHandler(TypeEmailDelivery, &HandleEmailDeliveryTask);

  // Create a Redis connection for enqueuing, you can reuse this for subsequent enqueues
  redisOptions redisOpts = {0};
  REDIS_OPTIONS_SET_TCP(&redisOpts, "127.0.0.1", 6379);
  redisContext *c = redisConnectWithOptions(&redisOpts);
  if (c == NULL || c->err) {
    std::cerr << "Failed to connect to Redis" << std::endl;
    return 1;
  }

  // Create tasks
  cppq::Task task = NewEmailDeliveryTask(EmailDeliveryPayload{.UserID = 666, .TemplateID = "AH"});
  cppq::Task task2 = NewEmailDeliveryTask(EmailDeliveryPayload{.UserID = 606, .TemplateID = "BH"});
  cppq::Task task3 = NewEmailDeliveryTask(EmailDeliveryPayload{.UserID = 666, .TemplateID = "CH"});

  // Enqueue a task on default queue
  cppq::enqueue(c, task, "default");
  // Enqueue a task on high priority queue
  cppq::enqueue(c, task2, "high");
  // Enqueue a task on default queue to be run at exactly 1 minute from now
  cppq::enqueue(
    c,
    task3,
    "default",
    cppq::scheduleOptions(std::chrono::system_clock::now() + std::chrono::minutes(1))
  );

  // Pause queue to stop processing tasks from it
  cppq::pause(c, "default");
  // Unpause queue to continue processing tasks from it
  cppq::unpause(c, "default");

  // This call will loop forever checking the pending queue and processing tasks in the thread pool.
  // Second argument defines queues and their priorities.
  // Third argument is time in seconds that task can be alive in active queue
  // before being pushed back to pending queue (i.e. when worker dies in middle of execution).
  cppq::runServer(redisOpts, {{"low", 5}, {"default", 10}, {"high", 20}}, 1000);
}

Web UI

If you are on Linux then web UI can be started by running: cd web && ./start.sh

Web UI is made with React/TypeScript and Flask/Python. It is still work-in-progress.

Web UI demo

CLI

CLI can be run with: cd cli && pip3 install -r requirements && python3 main.py

CLI is made with Python. It is still work-in-progress.

usage: main.py [-h] [--redis_uri REDIS_URI] [--queues] [--stats QUEUE] [--list QUEUE STATE] [--task QUEUE UUID] [--pause QUEUE] [--unpause QUEUE]

cppq CLI

options:
  -h, --help            show this help message and exit
  --redis_uri REDIS_URI
  --queues              print queues, priorities, and pause status
  --stats QUEUE         print queue statistics
  --list QUEUE STATE    list task UUIDs in queue
  --task QUEUE UUID     get task details
  --pause QUEUE         pause a queue
  --unpause QUEUE       unpause a queue

License

cppq is MIT-licensed.

Thread pooling functionality is retrofitted from https://github.com/bshoshany/thread-pool

cppq's People

Contributors

jafarlihi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

cppq's Issues

Request: Separation of JSON library

Your library looks neat. One downside I see is that it directly uses nlohmann::json. It would be nice if nlohmann::json was an optional dependency and if the main library just worked on std::string. This would allow various JSON libraries to be used or even other formats for messages.

You could require a cppq::transform struct, which you could define in another header. That would allow nlohmann::json to be entirely optional and another implementation could be added instead.

Something like:

struct transform
{
   template <class T>
   std::string operator()(T& payload) {
      nlohmann::json j = payload;
      return j.dump();
   }
};

You could then still have the exact same code:

cppq::Task NewEmailDeliveryTask(EmailDeliveryPayload payload) {
  nlohmann::json j = payload;
  return cppq::Task{TypeEmailDelivery, j, 10};
}

cppq::Task here would just call transform on the second argument.

transform could be a function, but a struct is probably the better long term approach for more flexibility.
You can just predeclare struct transform; in cppq.hpp so that the sequence of including headers doesn't matter.

You could automate receiving the message, but I think it would be simpler, cleaner, and more flexible to have the user parse the std::string returned to the client. It's not hard to call nlohmann::json::parse(task.payload);
This also gives the user control over how the message is parsed and when it is parsed.

Thanks for your work on this!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.