Giter Site home page Giter Site logo

earl's Introduction

Earl

Service objects for Crystal, aka Agents.

Crystal provides primitives for achieving concurrent applications, but doesn't have advanced layers for structuring applications. Earl tries to fill that gap with a simple object-based API that's easy to grasp and understand.

Is Earl for me?

  • Your application has different, interconnected, objects that should always be alive, until they decide or are asked to stop.
  • These different objects must communicate together.
  • You feel that you spawn and loop and must rescue exceptions and restart objects too often.
  • You need a pool of workers to dispatch work to.
  • ...

If so, then Earl is for you.

Status

Earl is still in its infancy, but is fairly useable already.

If you believe Earl could help structure your application(s) please try it, and report any shortcomings and successes you had!

Usage

Add the earl shard to your dependencies then run shards install:

dependencies:
  earl:
    github: ysbaddaden/earl

For a formal depiction of the Earl library, you can read <SPEC.md>. For an informal introduction filled with examples, keep reading. For usage examples see the directory.

Getting Started

Agents

A simple agent is a class that includes Earl::Agent and implements a #call method. For example:

require "earl"

class Foo
  include Earl::Agent

  @count = 0

  def call
    while running?
      @count += 1
      sleep 1
    end
  end
end

Earl monitors the agent's state, and provides facilities to start and stop agents, to trap an agent crash or normal stop, as well as recycling them.

Communication (Earl::Mailbox) and broadcasting (Earl::Registry) are opt-in extensions, and introduced below.

Start Agents

You can start this agent in the current fiber with #start. This will block until the agent is stopped:

foo = Foo.new
foo.start

Alternatively you can call #spawn to start the agent in its own fiber, and return immediately:

foo = Foo.new
foo.spawn

do_something_else_concurrently

Depending on the context, it can be useful to block the current fiber. A library, for example, already spawned a dedicated fiber (e.g. HTTP::Server connections). Sometimes we need to start services in the background instead, and continue on.

Stop Agents

We can ask an agent to stop gracefully with #stop. Each agent must return quickly from the #call method when the agent's state changes. Hence the running? call in the Foo agent above to break out of the loop, for example.

foo.stop

When an agent is stopped its #terminate method hook is called, allowing the agent to act upon termination. For example notify other services, closing connections, or cleaning up.

Link & Trap Agents

When starting or spawning an agent A we can link another agent B to be notified when the agent A stopped or crashed (raised an unhandled exception). The linked agent B must implement the #trap(Agent, Exception?) method. If agent A crashed, then the unhandled exception is passed, otherwise it's nil. In all cases, the stopped/crashed agent is passed.

For example:

require "earl"

class A
  include Earl::Agent

  def call
    # ...
  end
end

class B
  include Earl::Agent

  def call
    # ...
  end

  def trap(agent, exception = nil)
    log.error("crashed with #{exception.message}") if exception
  end
end

a = A.new
b = B.new

a.start
b.start(link: a)

The Earl::Supervisor and Earl::Pool agents use links and traps to keep services alive for instance.

Recycle Agents

A stopped or crashed agent can be recycled to be restarted. Agents meant to be recycled must implement the #reset method, and return the agent's internal state to its pristine condition. A recycled agent must be indistinguishable from a created agent.

A recycled agent will return to the initial starting state, allowing it to restart. Earl::Supervisor, for example, expects the agents it monitors to properly reset themselves.

Agent Extensions

Mailbox

The Earl::Mailbox(M) module extends an agent with a Channel(M) along with methods to #send(M) a message to an agent and to receive them (concurrency safe).

The module merely wraps a Channel(M) but proposes a standard structure for agents to have an incoming mailbox of messages. All agents thus behave the same, and we can assume that an agent that expects to receive messages has a #send(M) method.

An agent's mailbox will be closed when the agent is asked to stop. An agent can simply loop over #receive? until it returns nil, without having to check for the agent's state.

See the Registry section below for an example.

Registry

The Earl::Registry(A, M) module will extend an agent to #register and #unregister agents of type A that can receive messages of type M. The agents to register must be capable to receive messages of type M โ€”i.e. include Earl::Mailbox(M) or Earl::Artist(M)). When running, the agent can broadcast a message to all registered agents. It can also ask registered agents to stop.

For example, we can declare a Consumer agent that receives a count and prints it, until it's asked to stop:

class Consumer
  include Earl::Agent
  include Earl::Mailbox(Int32)

  def call
    while count = message.receive?
      p count
    end
  end
end

Now we can declare a producer that will broadcast numbers to registered consumers:

class Producer
  include Earl::Agent
  include Earl::Registry(Consumer, Int32)

  @count = 0

  def call
    while running?
      registry.send(@count += 1)
    end
  end

  def terminate
    registry.stop
  end
end

Now, we can create our producer and consumer agents, and register the consumers to the producer. We spawn the consumers that will start in their dedicated fiber. Last, we start the producer in the current fiber, that will block until we hit Ctrl+C to interrupt the program:

producer = Producer.new

a = Consumer.new
producer.register(a)
a.spawn

b = Consumer.new
producer.register(b)
b.spawn

Signal::INT.trap { producer.stop }
producer.start

The example registers consumers before starting the produce, but the registry is concurrency-safe. Consumers can be added and removed at any time.

Specific Agents

Supervisor

The Earl::Supervisor agent monitors other agents (including other supervisors). Monitored agents are spawned in their own fiber when the supervisor starts. If a monitored agent crashes it's recycled then restarted in its own fiber.

A supervisor can keep indefinitely running concurrent agents. It can also prevent the main thread from exiting.

For example, let's supervise the Producer example from the Registry section:

supervisor = Supervisor.new

producer = Producer.new
supervisor.monitor(producer)

a = Consumer.new
producer.register(a)
a.spawn

b = Consumer.new
producer.register(b)
b.spawn

Signal::INT.trap { supervisor.stop }
supervisor.start

Now if the producer crashes, it will be restarted. You can test this by adding a random raise "chaos monkey" into the Producer#call loop. The error will be logged, the producer restarted and the application continue running.

Pool

The Earl::Pool(A, M) agent spawns a fixed size list of agents of type A, to which we can dispatch messages (of type M). Messages are delivered to a single worker of the pool in an exactly-once manner. This is different from Earl::Registry that broadcasts a message to all registered agents.

Whenever a worker agent crashes, the pool will recycle and restart it. A worker can stop normally, but it should only do so when asked to stop.

Worker agents (of type A) must be capable to receive messages of type M. I.e. they include Earl::Mailbox(M) or Earl::Artist(M). They must also override their #reset method to properly reset an agent.

Note that Earl::Pool will replace the workers' mailbox. All workers then share a single Channel(M) for an exactly-once delivery of messages.

For example:

class Worker
  include Earl::Agent
  include Earl::Mailbox(String)

  def call
    while message = receive?
      p message
    end
  end
end

pool = Earl::Pool(Worker, String).new(capacity: 10)

spawn do
  5.times do |i|
    pool.send("message #{i}")
  end
  pool.stop
end

pool.start
# => message 1
# => message 2
# => message 3
# => message 4
# => message 5

Pools are regular agents, so we can have pools of pools, but we discourage such usage. It'll only increase the complexity of your application for little or no real benefit.

You can supervise pools with Earl::Supervisor. It can feel redundant because pools already monitor other agents, but it can be useful to only have a few supervisors to start (and stop).

Credits

  • Author: Julien Portalier (@ysbaddaden)

Somewhat inspired by my very limited knowledge of Erlang OTP & Elixir.

License

Distributed under the Apache Software License 2.0. See LICENSE for details.

earl's People

Contributors

ysbaddaden avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

earl's Issues

Agent state should be an Atomic

An agent's state may be changed by whatever fiber is calling an action on the agent (e.g. call #stop). The state machine only allows state to transition from one state to another, but doesn't do it atomically.

This isn't a problem with a single thread (only one fiber is running at any given time) but with MT multiple fibers may affect the state, and there is a race condition between checking the current state and setting the new state.

Using an atomic with cmpxchg might be able to resolve this issue. This may also help with #7.

However, this may bring some other questions to light, related to the finite state machine, like silencing some state change errors (e.g. stopping a stopped agent), or resolving some concurrent situations.

Improve Scheduler design

The Scheduler agent was quickly put together and only supports CRON scheduling. We should review it's API design, and maybe introduce alternatives ways to schedule something, leveraging Time::Span or skipping Earl::Scheduler for a simpler interface.

For example:

Earl.schedule(agent, cron: "5 4 * * 6") # every Saturday at 04:05 am
Earl.schedule(agent, every: 5.minutes)

EDIT

It might also be nice to schedule one time jobs, they wouldn't survive a restart, but still interesting nonetheless:

Earl.schedule(agent, at: Time.parse("2023-05-31T15:12:00Z"))
Earl.schedule(agent, in: 5.minutes)
  • Earl::Schedulable
  • Earl::Every
  • Earl.schedule(agent, cron:)
  • Earl.schedule(agent, every:)
  • Inherit from DynamicSupervisor (blocked by #7)
  • Tests
  • README
  • SPEC

Related #10

Repurpose Registry to be a named registry of agents

Registry should be re-purposed to become a named registry of agents.

Agents would be able to register themselves with an unique name across the application. Supervisors might be able do it for them (especially useful for Dynamic Supervisors, see #7).

Agents would then be capable to communicate with an Agent by name, without holding an actual reference. Ideally, agents could even be (re)started on demand.

This would overcome the current limit that we must hold a reference to an agent, and that all agents must always be running all the time.

Related: #8 and #7

Dynamic Supervisor

Earl misses a dynamic supervisor that would start actors on demand, and maybe stop them at some point.

No SPEC or design yet. I need to understand them better in Erlang/Elixir more before I start drafting something.

Document Scheduler

Earl::Scheduler should be properly documented. A brief example in the README.md as well as an extended documentation in SPEC.md.

Related to #11

Earl::Aplication on-exit behavior

On exit, Earl::Application calls terminate on all agents in the supervision tree, but does not wait for them to transition from stopping to stopped or crashed state. This leaves no time for agents to gracefully stop, should Earl::Application wait for itself to transition to stopped? Perhaps with a short timeout to terminate anyway.

Erlang gen_server like behavior

The Erlang gen_server behavior allows to abstract the communication with an actor, by providing a public API that sends messages to the actor (from the outside actors) and have distinct internal handlers to execute in the actor's context, hence free from any concurrency/parallelism issues.

The behavior could avoid lots of boilerplate in Earl, and completely avoid the need to define the M type of Mailbox(M), could help to transparently support synchronous and asynchronous communication, and even have defaults to handle agent hooks such as terminate and trap as standardized info messages.

I doubted that it was possible, but playing with the concept showed that some subset of gen_server can be applied to Crystal, with the help of macros (of course): https://gist.github.com/ysbaddaden/fb41d89663e40dbd1e4d19cc2438931e

I doubt we can fully implement the behavior:

  1. there is no pattern matching in Crystal for example, so the proof-of-concept above expects that the first argument is a Symbol while the rest are arguments;
  2. I don't see how to support {:reply, *args), maybe a very simplified version, like passing whatever the handler returns? But we'd need a Future(T) and thus need to know or infer T... while we can't call typeof in macros... but maybe we can access a method's return type in macros? ๐Ÿค”

Yet, it's already nice to avoid to repeated boilerplate (see the example Bus in the above Gist).

Rename Registry as Broadcast, PubSub or ...

The current Registry actor is poorly named.

It's purpose is to broadcast the messages it receives to all subscribed listeners. There is no selection of what listeners will receive, so it's closer to a broadcast (notify everyone) than PubSub (notify me of messages with these topics).

It should thus either be named Broadcast or PubSub if we intent to have topic subscription, where broadcasting would be the wildcard (*). I'm all ears for a better naming, especially if the concept already exists in Erlang OTP.

Registry would be re-purposed to become a named registry of actors. See #9.

Unhandled exception in spawn: can't transition agent state from Starting to Stopping

Hello,

I have tried pool.spawn and the process crashed with this error:

[00] Unhandled exception in spawn: can't transition agent state from Starting to Stopping (Earl::TransitionError)
[00]   from Earl::Agent::State#transition<Earl::Pool(App::Worker, String), Earl::Agent::Status>:Nil
[00]   from Earl::Pool(App::Worker, String)
[00]   from Earl::Pool(App::Worker, String)
[00]   from ~procProc(Nil)
[00]   from Fiber#run:(IO::FileDescriptor | Nil)
[00]   from ~proc2Proc(Fiber, (IO::FileDescriptor | Nil))
[00]   from ???
[00] "message 0"
[00] "message 1"
[00] "message 2"
[00] "message 3"
[00] "message 4"
[00] "message 5"
[00] "message 6"
[00] "message 7"
[00] "message 8"
[00] "message 9"
[00] Unhandled exception in spawn: can't transition agent state from Running to Recycling (Earl::TransitionError)
[00]   from Earl::Agent::State#transition<App::Worker, Earl::Agent::Status>:Nil
[00]   from App::Worker
[00]   from Earl::Pool(App::Worker, String)
[00]   from App::Worker
[00]   from ~procProc(Nil)
[00]   from Fiber#run:(IO::FileDescriptor | Nil)
[00]   from ~proc2Proc(Fiber, (IO::FileDescriptor | Nil))
[00]   from ???
[00] Unhandled exception in spawn: can't transition agent state from Running to Recycling (Earl::TransitionError)
[00]   from Earl::Agent::State#transition<App::Worker, Earl::Agent::Status>:Nil
[00]   from App::Worker
[00]   from Earl::Pool(App::Worker, String)
[00]   from App::Worker
[00]   from ~procProc(Nil)
[00]   from Fiber#run:(IO::FileDescriptor | Nil)
[00]   from ~proc2Proc(Fiber, (IO::FileDescriptor | Nil))
[00]   from ???
[00] Unhandled exception in spawn: can't transition agent state from Running to Recycling (Earl::TransitionError)
[00]   from Earl::Agent::State#transition<App::Worker, Earl::Agent::Status>:Nil
[00]   from App::Worker
[00]   from Earl::Pool(App::Worker, String)
[00]   from App::Worker
[00]   from ~procProc(Nil)
[00]   from Fiber#run:(IO::FileDescriptor | Nil)
[00]   from ~proc2Proc(Fiber, (IO::FileDescriptor | Nil))
[00]   from ???
[00] Unhandled exception in spawn: can't transition agent state from Running to Recycling (Earl::TransitionError)
[00]   from Earl::Agent::State#transition<App::Worker, Earl::Agent::Status>:Nil
[00]   from App::Worker
[00]   from Earl::Pool(App::Worker, String)
[00]   from App::Worker
[00]   from ~procProc(Nil)
[00]   from Fiber#run:(IO::FileDescriptor | Nil)
[00]   from ~proc2Proc(Fiber, (IO::FileDescriptor | Nil))
[00]   from ???
[00] Unhandled exception in spawn: can't transition agent state from Running to Recycling (Earl::TransitionError)
[00]   from Earl::Agent::State#transition<App::Worker, Earl::Agent::Status>:Nil
[00]   from App::Worker
[00]   from Earl::Pool(App::Worker, String)
[00]   from App::Worker
[00]   from ~procProc(Nil)
[00]   from Fiber#run:(IO::FileDescriptor | Nil)
[00]   from ~proc2Proc(Fiber, (IO::FileDescriptor | Nil))
[00]   from ???

The code:

class Worker
    include Earl::Agent
    include Earl::Mailbox(String)

    def call
      while message = receive?
        p message
      end
    end

    def terminate
      puts "terminate"
    end
  end

  pool = Earl::Pool(Worker, String).new(capacity: 5)

  spawn do
    10.times do |i|
      pool.send("message #{i}")
    end
    pool.stop
    puts "stop"
  end

  pool.spawn
  sleep 4

An idea please?

`Earl::Artist(AbstractType)` raises `#call(AbstractType) must be defined`

This example fails to compile with Error: method Bar#call(AbstractMessage) must be defined:

require "earl/artist"

abstract struct AbstractMessage
end

record FooMessage < AbstractMessage

module Foo
  macro included
    include Earl::Artist(AbstractMessage)
  end

  def call(event : FooMessage)
    p "event: #{event}"
  end
end

class Bar
  include Foo
end

bar = Bar.new
bar.spawn
bar.send FooMessage.new

sleep

But there is call(event : FooMessage) implementation in module Foo so it should not raise the error.

CPU 100% with the pool

Hello,

First of all, thank you for this lib, it's awesome. I've been testing it for five hours, top.
I encountered a bug or a mistake on my part maybe. If the capacity of a pool is above 5, the CPU rises to 100% until I kill the process.

 class Worker
    include Earl::Agent
    include Earl::Mailbox(String)

    def call
      while message = receive?
        p message
      end
    end

    def terminate
      puts "terminate"
    end
  end

  pool = Earl::Pool(Worker, String).new(capacity: 10)

  spawn do
    8.times do |i|
      pool.send("message #{i}")
    end
    pool.stop
    puts "stop"
  end

  pool.start

Is it related to the number of CPUs in my machine?

Bidirectional communication (replies)

Right now Mailbox(M) merely delegates to Channel(M) which only permits unidirectional communication from an actor A to an actor B, but doesn't permit B to reply to A after it processed its message, because there is no way to know about A. In order to achieve this the developer must manually create and use a type to wrap the message (e.g. {Actor, M}) then manually which is bothersome.

Earl must introduce a solution.

Rework Earl::Logger to use Log (stdlib)

The current Earl::Logger never made the jump to Log from stdlib. Worst, it implements its custom Logger. Upgrading to Log is long overdue.

Ideally, Earl::Logger would supervise any fiber that Log spawns (using Earl.application), because Earl doesn't accept fibers to be started without supervision.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.