Giter Site home page Giter Site logo

scripbox / flume Goto Github PK

View Code? Open in Web Editor NEW
73.0 9.0 7.0 369 KB

A blazing fast job processing system backed by GenStage & Redis.

Elixir 99.44% Lua 0.56%
elixir-lang genstage background-jobs concurrent redis batch-processing scheduled-jobs rate-limiting

flume's Introduction

Flume

Test

Flume is a job processing system backed by GenStage & Redis

Table of Contents

Features

  • Durability - Jobs are backed up before processing. Incase of crashes, these jobs are restored.
  • Back-pressure - Uses gen_stage to support this.
  • Scheduled Jobs - Jobs can be scheduled to run at any point in future.
  • Rate Limiting - Uses redis to maintain rate-limit on pipelines.
  • Batch Processing - Jobs are grouped based on size.
  • Logging - Provides a behaviour Flume.Logger to define your own logger module.
  • Pipeline Control - Queues can be pause/resume at runtime.
  • Instrumentation - Metrics like worker duration and latency to fetch jobs from redis are emitted via telemetry.
  • Exponential Back-off - On failure, jobs are retried with exponential back-off. Minimum and maximum can be set via configuration.

Requirements

  • Elixir 1.6.6+
  • Erlang/OTP 21.1+
  • Redis 4.0+

Installation

Add Flume to your list of dependencies in mix.exs:

def deps do
  [
    {:flume, github: "scripbox/flume"}
  ]
end

Then run mix deps.get to install Flume and its dependencies.

Usage

Add Flume supervisor to your application's supervision tree:

defmodule MyApplication.Application do
  use Application

  import Supervisor.Spec

  def start(_type, _args) do
    children = [
      # Start Flume supervisor
      supervisor(Flume, [])
    ]

    opts = [strategy: :one_for_one, name: MyApplication.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Add config/flume.exs:

config :flume,
  name: Flume,
  # Redis host
  host: "127.0.0.1",
  # Redis port
  port: "6379",
  # Redis keys namespace
  namespace: "my-app",
  # Redis database
  database: 0,
  # Redis pool size
  redis_pool_size: 10,
  # Redis connection timeout in ms (Default 5000 ms)
  redis_timeout: 10_000,
  # Retry backoff intial in ms (Default 500 ms)
  backoff_initial: 30_000,
  # Retry backoff maximum in ms (Default 10_000 ms)
  backoff_max: 36_00_000,
  # Maximum number of retries (Default 5)
  max_retries: 15,
  # Scheduled jobs poll interval in ms (Default 10_000 ms)
  scheduler_poll_interval: 10_000,
  # Time to move jobs from processing queue to retry queue in seconds (Default 600 sec)
  visibility_timeout: 600,
  # ttl of the acquired lock to fetch jobs for bulk pipelines in ms (Default 30_000 ms)
  dequeue_lock_ttl: 30_000,
  # process timeout to fetch jobs for bulk pipelines in ms (Default 10_000 ms)
  dequeue_process_timeout: 10_000,
  # time to poll the queue again if it was locked by another process in ms (Default 500 ms)
  dequeue_lock_poll_interval: 500

Import flume config in config/config.exs as given below:

...
import_config "#{Mix.env()}.exs"
+import_config "flume.exs"

Pipelines

Each pipeline is a GenStage pipeline having these parameters -

  • name - Name of the pipeline
  • queue - Name of the Redis queue to pull jobs from
  • max_demand - Maximum number of jobs to pull from the queue

Configuration

config :flume,
  pipelines: [
    %{name: "default_pipeline", queue: "default", max_demand: 1000},
  ]

Flume supervisor will start these processes:

                  [Flume.Supervisor]   <- (Supervisor)
                         |
                         |
                         |
              [default_pipeline_producer]   <- (Producer)
                         |
                         |
                         |
          [default_pipeline_producer_consumer]   <- (ProducerConsumer)
                         |
                         |
                         |
         [default_pipeline_consumer_supervisor]   <- (ConsumerSupervisor)
                        / \
                       /   \
                      /     \
             [worker_1]     [worker_2]   <- (Worker Processes)

Enqueuing Jobs

Enqueuing jobs into flume requires these things -

  • Specify a queue-name (like priority)
  • Specify the worker module (MyApp.FancyWorker)
  • Specify the worker module's function name (default :perform)
  • Specify the arguments as per the worker module's function arity

With default function

Flume.enqueue(:queue_name, MyApp.FancyWorker, [arg_1, arg_2])

With custom function

Flume.enqueue(:queue_name, MyApp.FancyWorker, :myfunc, [arg_1, arg_2])

Creating Workers

Worker modules are responsible for processing a job. A worker module should define the function-name with the exact arity used while queuing the job.

defmodule MyApp.FancyWorker do
  def perform(arg_1, arg_2) do
    # your job processing logic
  end
end

Scheduled Jobs

With default function

# 10 seconds
schedule_time = 10_000

Flume.enqueue_in(:queue_name, schedule_time, MyApp.FancyWorker, [arg_1, arg_2])

With custom function

# 10 seconds
schedule_time = 10_000

Flume.enqueue_in(:queue_name, schedule_time, MyApp.FancyWorker, :myfunc, [arg_1, arg_2])

Rate Limiting

Flume supports rate-limiting for each configured pipeline.

Rate-Limiting has two key parameters -

  • rate_limit_scale - Time scale in milliseconds for the pipeline
  • rate_limit_count - Total number of jobs to be processed within the time scale
  • rate_limit_key(optional) - Using this option, rate limit can be set across pipelines.

       Note: When this option is not set, rate limit will be maintained for a pipeline.

rate_limit_count = 1000
rate_limit_scale = 6 * 1000

config :flume,
  pipelines: [
    # This pipeline will process 1000 jobs every 6 seconds
    %{
      name: "promotional_email_pipeline",
      queue: "promotional_email",
      rate_limit_count: rate_limit_count,
      rate_limit_scale: rate_limit_scale,
      rate_limit_key: "email"
    },
    %{
      name: "transactional_email_pipeline",
      queue: "transactional_email",
      rate_limit_count: rate_limit_count,
      rate_limit_scale: rate_limit_scale,
      rate_limit_key: "email"
    }
  ]

OR

config :flume
  pipelines: [
    %{
      name: "webhooks_pipeline",
      queue: "webhooks",
      rate_limit_count: 1000,
      rate_limit_scale: 5000
    }
  ]

Flume will process the configured number of jobs (rate_limit_count) for each rate-limited pipeline, even if we are running multiple instances of our application.

Batch Processing

Flume supports batch-processing for each configured pipeline. It groups individual jobs by the configured batch_size option and each worker process will receive a group of jobs.

config :flume,
  pipelines: [
    # This pipeline will pull (100 * 10) jobs from the queue
    # and group them in batches of 10.
    %{
      name: "batch_pipeline",
      queue: "batch-queue",
      max_demand: 100,
      batch_size: 10
    }
  ]
defmodule MyApp.BatchWorker do
  def perform(args) do
    # args will be a list of arguments
    # E.g - [[job_1_args], [job_2_args], ...]
    # your job processing logic
  end
end

Pipeline Control

Flume has support to pause/resume each pipeline. Once a pipeline is paused, the producer process will stop pulling jobs from the queue. It will process the jobs which are already pulled from the queue.

Refer to "Options" section for supported options and default values.

Pause all pipelines

# Pause all pipelines permanently (in Redis) and asynchronously
Flume.pause_all(temporary: false, async: true)

Pause a pipeline

# Pause a pipeline temporarily (in current node) and asynchronously
Flume.pause(:default_pipeline, temporary: true, async: true)

Resume all pipelines

# Resume all pipelines temporarily (in current node) and synchronously with infinite timeout
Flume.resume_all(temporary: true, async: false, timeout: :infinity)

Resume a pipeline

# Resume a pipeline permanently (in Redis) and synchronously with a 10000 milli-second timeout
Flume.resume(:default_pipeline, temporary: false, async: false, timeout: 10000)

Options

The following options can be used to pause/resume a pipeline

  • :async - (boolean) Defaults to false.
    • true - The caller will not wait for the operation to complete.
    • false - The caller will wait for the operation to complete, this can lead to timeout if the operation takes too long to succeed. See https://hexdocs.pm/elixir/GenServer.html#call/3 for more details.
  • :temporary - (boolean) Defaults to true.
    • true - The pause/resume operation will be applied only on the current node.
    • false - Will update the value in persistent-store (Redis) and will apply the operation on all nodes.
  • :timeout - (timeout) Defaults to 5000. Timeout(in milliseconds) for synchronous pause/resume calls. See https://hexdocs.pm/elixir/GenServer.html#call/3-timeouts for more details.

Instrumentation

We use telemetry to emit metrics. Following metrics are emitted:

  • duration of a job/worker
  • count, latency and payload_size of dequeued jobs

Writing Tests

To enable mock in the test environment

config/test.exs

config :flume, mock: true

To mock individual test

import Flume.Mock
...
describe "enqueue/4" do
  test "mock works" do
    with_flume_mock do
      Flume.enqueue(:test, List, :last, [[1]])

      assert_receive %{
        queue: :test,
        worker: List,
        function_name: :last,
        args: [[1]]
      }
    end
  end
end

To enable mock for all tests in a module

defmodule ListTest do
  use ExUnit.Case, async: true
  use Flume.Mock

  describe "enqueue/4" do
    test "mock works" do
      Flume.enqueue(:test, List, :last, [[1]])

      assert_receive %{
        queue: :test,
        worker: List,
        function_name: :last,
        args: [[1]]
      }
    end
  end
end

Roadmap

  • Support multiple queue backends (right now only Redis is supported)

References

Contributing

  • Check formatting (mix format --check-formatted)
  • Run all tests (mix test)

flume's People

Contributors

ananthakumaran avatar harshbalyan avatar nitinstp23 avatar rahuljayaraman avatar vasuadari avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flume's Issues

Add telemetry for producer, producer_consumer & consumer

Producer

  • When producer starts(flume.producer.init = system_time)
  • No. of events fetched(flume.producer.events.fetch = 10)
  • Time taken to fetch events(flume.producer.events.fetch.stop)
  • metadata for all events
    • :pipeline_name - name of the pipeline for which producer is running
    • :queue_name - name of the queue in the redis
    • :rate_limit_count - no. of events that can be processed by the consumer
    • :rate_limit_scale - duration for which no. of events as per rate_limit_count can be processed
    • :rate_limit_key - unique key for which rate limit is applied
    • :max_demand - Maximum number of jobs to pull from the queue

ProducerConsumer

  • When producer consumer starts(flume.producer_consumer.init = system_time)
  • No. of events fetched(flume.producer_consumer.events.received = 10)

Consumer

  • When consumer starts(flume.consumer.init = system_time)

cc @nitinstp23

Fix async pipeline control function defs

The asynchronous calls for pause/resume fail with

iex(1)> Flume.pause(:default_pipeline, async: true)
** (FunctionClauseError) no function clause matching in Flume.Pipeline.Event.Producer.pause/3

    The following arguments were given to Flume.Pipeline.Event.Producer.pause/3:

        # 1
        "default_pipeline"

        # 2
        true

        # 3
        5000

    Attempted function clauses (showing 1 out of 1):

        def pause(pipeline_name, false = _async, timeout)

    (flume) lib/flume/pipeline/event/producer.ex:29: Flume.Pipeline.Event.Producer.pause/3

The function definitions at

def pause(pipeline_name, true = _async) do

and
def resume(pipeline_name, true = _async) do

need to be fixed.

Support asynchronous pipeline pause/resume

Currently, the Flume.pause and Flume.resume issues synchronous calls to the Producer GenStage to pause/resume pipeline.
Package should also support asynchronous calls to allow for cases where GenStage calls timeout and the operation fails.

Return error for an invalid job

An invalid job is accepted. It fails and then gets retried for the configured number of times.
An invalid job can be defined to have one or more of the following set incorrectly

  • queue name
  • worker module
  • worker function

Fix

A job in the aforementioned case is bound to fail and eventually move to the dead queue.
Therefore, we can return an error before enqueing in such cases by checking the following:

  • Check the queue name against the configured pipelines
  • Check existence of the worker module
  • Check existence of worker function(name + arity)

Serialize-deseriable job args in Mocked APIs

On mocking a Flume enqueue in test env. we do not serialize the args. This is inconsistent with how the worker module receives the args on a dequeue from the queue.

Example

Operation
Flume.enqueue(:test, WorkerModule, :do_it, [%{a: 1, b: 2}])
Test env with mocks
with_flume_mock do
  Flume.enqueue(:test, WorkerModule, :do_it, [%{a: 1, b: 2}])

  assert_receive %{
    queue: :test,
    worker: WorkerModule,
    function_name: :do_it,
    args: [%{a: 1, b: 2}]
  }
end

However, in an environment without mocks the job is serialized(Jason.encode!/1) before we enqueue it.
This may differ from a possible expectation of a worker to receive serialized arguments.
The worker module executes the following equivalent of the operation

Flume.enqueue(:test, WorkerModule, :do_it, [%{"a" => 1, "b" => 2}])

So, the assertion should actually be against a serialized-deserialized version of the arguments.

with_flume_mock do
  Flume.enqueue(:test, WorkerModule, :do_it, [%{a: 1, b: 2}])

  assert_receive %{
    queue: :test,
    worker: WorkerModule,
    function_name: :do_it,
    args: [%{"a" => 1, "b" => 2}]
  }
end

Solution

Update the Mocked API to serialize and deserialize the arguments.

args = Jason.encode!(args) |> Jason.decode!()

Move config options to Flume.Supervisor

The current approach require us to define config options in config.exs (or env specific files).

Moving config options to Flume.Supervisor will have these benefits -

  • Run multiple instances of Flume in an app with different settings
  • Flexibility in Flume tests, each test can run Flume with different settings like,
    running a test with only one pipeline and another with multiple.

Incorrect documentation for scheduling

Readme -> Scheduling

# 10 seconds
schedule_time = 10_000

Flume.enqueue_in(:queue_name, schedule_time, MyApp.FancyWorker, [arg_1, arg_2])

Here the schedule_time is shown to be in milliseconds. It should actually be unix time in seconds.

Also, Flume.enqueue_in/4, Flume.enqueue_in/5 and Flume.enqueue_in/6 accept argument time_in_seconds. It is confusing. A better name for this argument would be unix_time_in_seconds.

Add a test helper to mock flume APIs

One advantage is that we can avoid using redis in test environment.

Usage will be similar to this

  import Flume.Mock

  describe "enqueue/4" do
    test "mock works" do
      with_flume_mock do
        Flume.enqueue(:test, List, :last, [[1]])

        assert_receive %{
          queue: :test,
          worker: List,
          function_name: :last,
          args: [[1]]
        }
      end
    end
  end

Support Redis Sentinels

The Flume configuration works with a single Redis server.
However, it would not work in a setup with Redis Sentinels.

Possible Solution

Extend the configuration to accept Sentinel options.
The underlying Redis library supports and manages the switching to a master server after a fail-over.

Inconsistent pipeline state on permanent pipeline pause/resume

On a Flume.pause("pipeline-name", _temporary = false) call, the key in Redis store is set even in the case when Producer GenStage pause call fails.

On a Flume.resume("pipeline-name", _temporary = false) call, the key in Redis store is deleted even in the case when Producer GenStage resume call fails.

This leads to an inconsistent pipeline state.

We should set/delete the Redis key on success of Producer GenStage pause/resume.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.