Giter Site home page Giter Site logo

tochemey / goakt Goto Github PK

View Code? Open in Web Editor NEW
54.0 2.0 4.0 1.63 MB

[Go] Fast and Distributed Actor framework using protocol buffers as message for Golang

License: MIT License

Earthly 0.45% Go 99.53% Makefile 0.01%
actor-model actors actorsystem golang golang-library distributed-systems protocol-buffers actor-framework actor-system distributed

goakt's Introduction

Go-Akt

build Go Reference GitHub go.mod Go version Go Report Card

Distributed Go actor framework to build reactive and distributed system in golang using protocol buffers as actor messages.

GoAkt is highly scalable and available when running in cluster mode. It comes with the necessary features require to build a distributed actor-based system without sacrificing performance and reliability. With GoAkt, you can instantly create a fast, scalable, distributed system across a cluster of computers.

If you are not familiar with the actor model, the blog post from Brian Storti here is an excellent and short introduction to the actor model. Also, check reference section at the end of the post for more material regarding actor model.

Table of Content

Design Principles

This framework has been designed:

  • to be very simple - it caters for the core component of an actor framework as stated by the father of the actor framework here.
  • to be very easy to use.
  • to have a clear and defined contract for messages - no need to implement/hide any sort of serialization.
  • to make use existing battle-tested libraries in the go ecosystem - no need to reinvent solved problems.
  • to be very fast.
  • to expose interfaces for custom integrations rather than making it convoluted with unnecessary features.

Features

Actors

The fundamental building blocks of Go-Akt are actors.

  • They are independent, isolated unit of computation with their own state.
  • They can be long-lived actors or be passivated after some period of time that is configured during their creation. Use this feature with care when dealing with persistent actors (actors that require their state to be persisted).
  • They are automatically thread-safe without having to use locks or any other shared-memory synchronization mechanisms.
  • They can be stateful and stateless depending upon the system to build.
  • Every actor in Go-Akt:
    • has a process id PID. Via the process id any allowable action can be executed by the actor.
    • has a lifecycle via the following methods: PreStart, PostStop. It means it can live and die like any other process.
    • handles and responds to messages via the method Receive. While handling messages it can:
    • create other (child) actors via their process id PID SpawnChild method
    • send messages to other actors locally or remotely via their process id PID Ask, RemoteAsk(request/response fashion) and Tell, RemoteTell(fire-and-forget fashion) methods
    • stop (child) actors via their process id PID
    • watch/unwatch (child) actors via their process id PID Watch and UnWatch methods
    • supervise the failure behavior of (child) actors. The supervisory strategy to adopt is set during its creation:
    • Restart and Stop directive are supported at the moment.
    • remotely lookup for an actor on another node via their process id PID RemoteLookup. This allows it to send messages remotely via RemoteAsk or RemoteTell methods
    • stash/unstash messages. See Stashing
    • can adopt various form using the Behavior feature
    • can be restarted (respawned)
    • can be gracefully stopped (killed). Every message in the mailbox prior to stoppage will be processed within a configurable time period.

Passivation

Actors can be passivated when they are idle after some period of time. Passivated actors are removed from the actor system to free-up resources. When cluster mode is enabled, passivated actors are removed from the entire cluster. To bring back such actors to live, one needs to Spawn them again. By default, all actors are passivated and the passivation time is two minutes.

  • To enable passivation use the actor system option WithExpireActorAfter(duration time.Duration) when creating the actor system. See actor system options.
  • To disable passivation use the actor system option WithPassivationDisabled when creating the actor system. See actor system options.

Actor System

Without an actor system, it is not possible to create actors in Go-Akt. Only a single actor system is recommended to be created per application when using Go-Akt. At the moment the single instance is not enforced in Go-Akt, this simple implementation is left to the discretion of the developer. To create an actor system one just need to use the NewActorSystem method with the various Options. Go-Akt ActorSystem has the following characteristics:

  • Actors lifecycle management (Spawn, Kill, ReSpawn)
  • Concurrency and Parallelism - Multiple actors can be managed and execute their tasks independently and concurrently. This helps utilize multicore processors efficiently.
  • Location Transparency - The physical location of actors is abstracted. Remote actors can be accessed via their address once remoting is enabled.
  • Fault Tolerance and Supervision - Set during the creation of the actor system.
  • Actor Addressing - Every actor in the ActorSystem has an address.

Behaviors

Actors in Go-Akt have the power to switch their behaviors at any point in time. When you change the actor behavior, the new behavior will take effect for all subsequent messages until the behavior is changed again. The current message will continue processing with the existing behavior. You can use Stashing to reprocess the current message with the new behavior.

To change the behavior, call the following methods on the ReceiveContext interface when handling a message:

  • Become - switches the current behavior of the actor to a new behavior.
  • UnBecome - resets the actor behavior to the default one which is the Actor.Receive method.
  • BecomeStacked - sets a new behavior to the actor to the top of the behavior stack, while maintaining the previous ones.
  • UnBecomeStacked() - sets the actor behavior to the previous behavior before BecomeStacked() was called. This only works with BecomeStacked().

Mailbox

Once can implement a custom mailbox. See Mailbox. The default mailbox makes use of buffered channels.

Events Stream

To receive some system events and act on them for some particular business cases, you just need to call the actor system Subscribe. Make sure to Unsubscribe whenever the subscription is no longer needed to free allocated resources. The subscription methods can be found on the ActorSystem interface.

Supported events

  • ActorStarted: emitted when an actor has started
  • ActorStopped: emitted when an actor has stopped
  • ActorPassivated: emitted when an actor is passivated
  • ActorChildCreated: emitted when a child actor is created
  • ActorRestarted: emitted when an actor has restarted
  • NodeJoined: cluster event emitted when a node joins the cluster. This only happens when cluster mode is enabled
  • NodeLeft: cluster event emitted when a node leaves the cluster. This only happens when cluster mode is enabled
  • Deadletter: emitted when a message cannot be delivered or that were not handled by a given actor. Dead letters are automatically emitted when a message cannot be delivered to actors' mailbox or when an Ask times out. Also, one can emit dead letters from the receiving actor by using the ctx.Unhandled() method. This is useful instead of panicking when the receiving actor does not know how to handle a particular message. Dead letters are not propagated over the network, there are tied to the local actor system.

Messaging

Communication between actors is achieved exclusively through message passing. In Go-Akt Google Protocol Buffers is used to define messages. The choice of protobuf is due to easy serialization over wire and strong schema definition. As stated previously the following messaging patterns are supported:

  • Tell/RemoteTell - send a message to an actor and forget it
  • Ask/RemoteAsk - send a message to an actor and expect a reply within a time period
  • Forward - pass a message from one actor to the actor by preserving the initial sender of the message. At the moment you can only forward messages from the ReceiveContext when handling a message within an actor and this to a local actor.
  • BatchTell - send a bulk of messages to actor in a fire-forget manner. Messages are processed one after the other in the other they have been sent.
  • BatchAsk - send a bulk of messages to an actor and expect responses for each message sent within a time period. Messages are processed one after the other in the other they were sent. This help return the response of each message in the same order that message was sent. This method hinders performance drastically when the number of messages to sent is high. Kindly use this method with caution.

Scheduler

You can schedule sending messages to actor that will be acted upon in the future. To achieve that you can use the following methods on the Actor System:

  • ScheduleOnce - will send the given message to a local actor after a given interval
  • RemoteScheduleOnce - will send the given message to a remote actor after a given interval. This requires remoting to be enabled on the actor system.
  • ScheduleWithCron - will send the given message to a local actor using a cron expression.
  • RemoteScheduleWithCron - will send the given message to a remote actor using a cron expression. This requires remoting to be enabled on the actor system.

Cron Expression Format

Field Required Allowed Values Allowed Special Characters
Seconds yes 0-59 , - * /
Minutes yes 0-59 , - * /
Hours yes 0-23 , - * /
Day of month yes 1-31 , - * ? /
Month yes 1-12 or JAN-DEC , - * /
Day of week yes 1-7 or SUN-SAT , - * ? /
Year no empty, 1970- , - * /

Note

When running the actor system in a cluster only one instance of a given scheduled message will be running across the entire cluster.

Stashing

Stashing is a mechanism you can enable in your actors, so they can temporarily stash away messages they cannot or should not handle at the moment. Another way to see it is that stashing allows you to keep processing messages you can handle while saving for later messages you can't. Stashing are handled by Go-Akt out of the actor instance just like the mailbox, so if the actor dies while processing a message, all messages in the stash are processed. This feature is usually used together with Become/UnBecome, as they fit together very well, but this is not a requirement.

It’s recommended to avoid stashing too many messages to avoid too much memory usage. If you try to stash more messages than the capacity the actor will panic. To use the stashing feature, call the following methods on the ReceiveContext interface when handling a message:

  • Stash() - adds the current message to the stash buffer.
  • Unstash() - unstashes the oldest message in the stash and prepends to the stash buffer.
  • UnstashAll() - unstashes all messages from the stash buffer and prepends in the mailbox. Messages will be processed in the same order they arrived. The stash buffer will be empty after processing all messages, unless an exception is thrown or messages are stashed while unstashing.

Remoting

This allows remote actors to communicate. The underlying technology is gRPC. To enable remoting just use the WithRemoting option when creating the actor system. See actor system options. These are the following remoting features available:

  • RemoteTell: to send a fire-and-forget message to an actor remotely
  • RemoteAsk: to send a request/response type of message to a remote actor
  • RemoteBatchTell: to send a fire-and-forget bulk of messages to a remote actor
  • RemoteBatchAsk: to send a bulk messages to a remote actor with replies
  • RemoteLookup: to lookup for an actor on a remote host
  • RemoteReSpawn: to restarts an actor on a remote machine
  • RemoteStop: to stop an actor on a remote machine
  • RemoteSpawn: to start an actor on a remote machine. The given actor implementation must be registered using the Register method of the actor system on the remote machine for this call to succeed.

These methods can be used from the API as well as from the PID which is the actor reference when an actor is created.

Cluster

This offers simple scalability, partitioning (sharding), and re-balancing out-of-the-box. Go-Akt nodes are automatically discovered. See Clustering. Beware that at the moment, within the cluster the existence of an actor is unique. When the node where a given actor has left the cluster, the given actor is no longer accessible. We can improve this behaviour by introducing the redeployment of actors on new nodes.

Observability

Observability is key in distributed system. It helps to understand and track the performance of a system. Go-Akt offers out of the box features that can help track, monitor and measure the performance of a Go-Akt based system.

Tracing

One can enable/disable tracing on a Go-Akt actor system to instrument and measure the performance of some of the methods. Go-Akt uses under the hood OpenTelemetry to instrument a system. One just need to use the WithTracing option when instantiating a Go-Akt actor system and use the default Telemetry engine or set a custom one with WithTelemetry option of the actor system.

Metrics

One can enable/disable metrics on a Go-Akt actor system to collect the following metrics:

  • Actor Metrics:
    • Number of children
    • Number of messages stashed
    • Number of Restarts
    • Last message received processing latency in milliseconds
  • System Metrics:
    • Total Number of Actors

Go-Akt uses under the hood OpenTelemetry to instrument a system. One just need to use the WithMetric option when instantiating a Go-Akt actor system and use the default Telemetry engine or set a custom one with WithTelemetry option of the actor system.

Logging

A simple logging interface to allow custom logger to be implemented instead of using the default logger.

Testkit

Go-Akt comes packaged with a testkit that can help test that actors receive expected messages within unit tests. To test that an actor receive and respond to messages one will have to:

  1. Create an instance of the testkit: testkit := New(ctx, t) where ctx is a go context and t the instance of *testing.T. This can be done in setup before the run of each test.
  2. Create the instance of the actor under test. Example: pinger := testkit.Spawn(ctx, "pinger", &pinger{})
  3. Create an instance of test probe: probe := testkit.NewProbe(ctx) where ctx is a go context
  4. Use the probe to send a message to the actor under test. Example: probe.Send(pinger, new(testpb.Ping))
  5. Assert that the actor under test has received the message and responded as expected using the probe methods:
    • ExpectMessage(message proto.Message) proto.Message: asserts that the message received from the test actor is the expected one
    • ExpectMessageWithin(duration time.Duration, message proto.Message) proto.Message: asserts that the message received from the test actor is the expected one within a time duration
    • ExpectNoMessage(): asserts that no message is expected
    • ExpectAnyMessage() proto.Message: asserts that any message is expected
    • ExpectAnyMessageWithin(duration time.Duration) proto.Message: asserts that any message within a time duration
    • ExpectMessageOfType(messageType protoreflect.MessageType): asserts the expectation of a given message type
    • ExpectMessageOfTypeWithin(duration time.Duration, messageType protoreflect.MessageType): asserts the expectation of a given message type within a time duration
  6. Make sure to shut down the testkit and the probe. Example: probe.Stop(), testkit.Shutdown(ctx) where ctx is a go context. These two calls can be in a tear down after all tests run.

To help implement unit tests in GoAkt-based applications. See Testkit

API

The API interface helps interact with a Go-Akt actor system as kind of client. The following features are available:

  • Tell: to send a message to an actor in a fire-and-forget manner
  • Ask: to send a message to an actor and expect a response within a given timeout
  • BatchAsk: to send a batch of requests to an actore remotely and expect responses back for each request.
  • BatchTell: to send a batch of fire-and-forget messages to an actor remotely
  • RemoteTell: to send a fire-and-forget message to an actor remotely
  • RemoteAsk: to send a request/response type of message to a remote actor
  • RemoteBatchTell: to send a fire-and-forget bulk of messages to a remote actor
  • RemoteBatchAsk: to send a bulk messages to a remote actor with replies
  • RemoteLookup: to lookup for an actor on a remote host
  • RemoteReSpawn: to restarts an actor on a remote machine
  • RemoteStop: to stop an actor on a remote machine
  • RemoteSpawn: to start an actor on a remote machine. The given actor implementation must be registered using the Register method of the actor system on the remote machine for this call to succeed.

Use Cases

  • Event-Driven programming
  • Event Sourcing and CQRS - eGo
  • Highly Available, Fault-Tolerant Distributed Systems

Installation

go get github.com/tochemey/goakt

Clustering

The cluster engine depends upon the discovery mechanism to find other nodes in the cluster. Under the hood, it leverages Olric to scale out and guarantee performant, reliable persistence, simple scalability, partitioning (sharding), and re-balancing out-of-the-box. It requires remoting to be enabled.

At the moment the following providers are implemented:

Note: One can add additional discovery providers using the following interface.

Operations Guide

The following outlines the cluster mode operations which can help have a healthy GoAkt cluster:

  • One can start a single node cluster or a multiple nodes cluster.
  • One can add more nodes to the cluster which will automatically discover the cluster.
  • One can remove nodes. However, to avoid losing data, one need to scale down the cluster to the minimum number of nodes which started the cluster.

Note: At the moment when a node is removed from the cluster, all actors on the given node are no longer accessible. The remaining members of the cluster will still function as expected. There is some ongoing work to address that issue. One can look at the following discussion

Built-in Discovery Providers

Kubernetes Discovery Provider Setup

To get the kubernetes discovery working as expected, the following pod labels need to be set:

  • app.kubernetes.io/part-of: set this label with the actor system name
  • app.kubernetes.io/component: set this label with the application name
  • app.kubernetes.io/name: set this label with the application name
Get Started
const (
    namespace = "default"
    applicationName = "accounts"
    actorSystemName    = "AccountsSystem"
    gossipPortName     = "gossip-port"
    clusterPortName    = "cluster-port"
    remotingPortName   = "remoting-port"
)
// define the discovery config
config := kubernetes.Config{
    ApplicationName:  applicationName,
    ActorSystemName:  actorSystemName,
    Namespace:        namespace,
    GossipPortName:   gossipPortName,
    RemotingPortName: remotingPortName,
    ClusterPortName:  clusterPortName,
}

// instantiate the k8 discovery provider
disco := kubernetes.NewDiscovery(&config)

// pass the service discovery when enabling cluster mode in the actor system
Role Based Access

You’ll also have to grant the Service Account that your pods run under access to list pods. The following configuration can be used as a starting point. It creates a Role, pod-reader, which grants access to query pod information. It then binds the default Service Account to the Role by creating a RoleBinding. Adjust as necessary:

kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: pod-reader
rules:
  - apiGroups: [""] # "" indicates the core API group
    resources: ["pods"]
    verbs: ["get", "watch", "list"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: read-pods
subjects:
  # Uses the default service account. Consider creating a new one.
  - kind: ServiceAccount
    name: default
roleRef:
  kind: Role
  name: pod-reader
  apiGroup: rbac.authorization.k8s.io
Sample Project

A working example can be found here with a small doc showing how to run it.

mDNS Discovery Provider Setup

  • Service Name: the service name
  • Domain: The mDNS discovery domain
  • Port: The mDNS discovery port
  • IPv6: States whether to lookup for IPv6 addresses.

NATS Discovery Provider Setup

To use the NATS discovery provider one needs to provide the following:

  • NATS Server Address: the NATS Server address
  • NATS Subject: the NATS subject to use
  • Actor System Name: the actor system name
  • Application Name: the application name
const (
    natsServerAddr = "nats://localhost:4248"
    natsSubject = "goakt-gossip"
    applicationName = "accounts"
    actorSystemName = "AccountsSystem"
)
// define the discovery options
config := nats.Config{
    ApplicationName: applicationName,
    ActorSystemName: actorSystemName,
    NatsServer:      natsServer,
    NatsSubject:     natsSubject,
}

// define the host node instance
hostNode := discovery.Node{}

// instantiate the NATS discovery provider by passing the config and the hostNode
disco := nats.NewDiscovery(&config, &hostNode)

// pass the service discovery when enabling cluster mode in the actor system

DNS Provider Setup

This provider performs nodes discovery based upon the domain name provided. This is very useful when doing local development using docker.

To use the DNS discovery provider one needs to provide the following:

  • Domain Name: the NATS Server address
  • IPv6: States whether to lookup for IPv6 addresses.
const domainName = "accounts"
// define the discovery options
config := dnssd.Config{
    dnssd.DomainName: domainName,
    dnssd.IPv6:       false,
}
// instantiate the dnssd discovery provider
disco := dnssd.NewDiscovery(&config)

// pass the service discovery when enabling cluster mode in the actor system
Sample Project

There is an example here that shows how to use it.

Examples

Kindly check out the examples' folder.

Contribution

Contributions are welcome! The project adheres to Semantic Versioning and Conventional Commits. This repo uses Earthly.

To contribute please:

  • Fork the repository
  • Create a feature branch
  • Submit a pull request

Test & Linter

Prior to submitting a pull request, please run:

earthly +test

Benchmark

One can run the benchmark test: go test -bench=. -benchtime 2s -count 5 -benchmem -cpu 8 -run notest from the bench package or just run the command make bench.

goakt's People

Contributors

alexandertar avatar chenxyzl avatar renovate[bot] avatar tochemey avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

goakt's Issues

Behavior as FSM

Brief Overview

An actor can should be able to behave like a FSM. An FSM as described in the Erlang design principles is a set of relations of the form:

State(S) x Event(E) -> Actions (A), State(S’).

If we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S’. So what it means that an actor in state S when a message is received will process the message with state. When it transits to another state, every new message will be processed with the new state behaviour. This type of feature helps make an actor wear many coats.

Possible Action Items

  • Implement a behavior stack to hold the actor behaviors
  • Implement a mechanism to allow actor to transit from one behavior to the other and vice-versa
  • Implement the internal FSM that can help the actor processes message at particular state
  • Allow an actor to switch back to its default behavior

Use case

Let us consider for instance we are building an authentication/authorization system using Go-Akt. The session actor can switch between the various states transitions:
Authenticating -> Authenticated -> Authorizing -> Authorized.

Yes of course this one way of building such system using a single actor.

[refactor] Refactor the implementation of the deadletter stream

The current implementation is limited to very few scenarios of dead letters. We need to extend it to other parts of the message delivery system in the code to capture more data.
This change will not alter the API definition which means developers don't need to do anything.

preparation for 1.0.0

  • Enhance performance
  • Cleanup code base
  • Better documentation
  • Enhance observability (not needed)
  • Possible additional features (dead letters)

[feat] Implement Routers

The first iteration of the implementation will focus on implementing a PoolRouter that will send a specific type of message to a bunch of routees to process in parallel. There will be two types of routees:

  • synchronous routee: their job is to process the message and ignore the outcome.
  • asynchronous routee: their job is to process the message and return the outcome back to the router.

Router features:

  • Remove a stopped routee from the set of routees
  • Shutdown when the last routee is stopped
  • Resize pool
  • Handle idle routees (when passivated)

reimplement the cluster engine using a dedicated raft library

The current implementation relies on etcd embedded server. We need to rethink this approach when we want to add cluster sharding and co. The reason is that the replication behind etcd cannot horizontally scale because it lacks data sharding.

[feat] Reimplement the actor metrics

This ticket/issue outlines the various metrics to implement for both the actor system and a given actor.

Actor System Metrics

  • Total Number of Actors
  • Dead-letters count

Actor Metrics

  • Number of children
  • Number of messages stashed
  • Number of Restarts
  • Last message received processing duration
  • Number of Instances created for the same type of Actor

[feat] RemoteSpawn method

RemoteSpawn method help start an actor on remote machine.

  • Register the actor type with the actor system
  • call RemoteSpawn from the api or the actor PID.

Cluster actor passivation awareness

At the moment, actors created in cluster mode can be passivated on the node they are created. The passivated actor is removed from the actors map on the node it has been created. However, that information is not sent across the cluster which means that actor still has a reference in the cluster and that can cause an issue.

Feature: Add message scheduler

This feature let an actor to stash some messages that it can process at later time. This will be an extension of the stash mechanism with a simple scheduler. The API definition will be as follow:

  • ScheduleOnce: to schedule a message to be processed one time
  • ScheduleTimes(count int): to schedule a message to be process count times

This feature should be available both on the PID and the receiver context

prepare for 1.4.0

  • Add cluster events to events streams.
    • NodeJoined
    • NodeLeft
  • Cluster metrics
    • Number of Members
  • Metrics #170
  • Complete #210

Add clustering

The cluster feature will help actor system nodes to form a cluster when that flag is turn on.

How will it work?

  • Define and implement a discovery mechanism to discover nodes based upon their ip and additional meta data. The discovery plugin should be extensible and pluggable. This is the proposed interface for the discovery plugin:
// Discovery helps discover other running actor system in a cloud environment
type Discovery interface {
	// Start the discovery engine
	Start(ctx context.Context, meta Meta) error
	// Nodes returns the list of Nodes at a given time
	Nodes(ctx context.Context) ([]*Node, error)
	// Watch returns event based upon node lifecycle
	Watch(ctx context.Context) (<-chan Event, error)
	// EarliestNode returns the earliest node
	EarliestNode(ctx context.Context) (*Node, error)
	// Stop shutdown the discovery provider
	Stop() error
}

With this interface we can implement mDNS, kubernetes and co discovery mode.

  • Define a cluster interface:
// Cluster defines the cluster contract
type Cluster interface {
	// Start starts the cluster node
	Start(ctx context.Context) error
	// Stop stops the cluster node
	Stop(ctx context.Context) error
	// GetPeers fetches all the peers of a given node
	GetPeers(ctx context.Context) ([]*Peer, error)
	// PutActor adds an actor meta to the cluster
	PutActor(ctx context.Context, actor *goaktpb.WireActor) error
	// GetActor reads an actor meta from the cluster
	GetActor(ctx context.Context, actorName string) (*goaktpb.WireActor, error)
}
  • Implement a raft node.

  • Each node on start will make use the discovery plugin to try and join an existing cluster using the underlying raft implementation.

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

Open

These updates have all been created already. Click a checkbox below to force a retry/rebase of any.

  • fix(deps): update minor go modules (go.opentelemetry.io/otel, go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc, go.opentelemetry.io/otel/exporters/prometheus, go.opentelemetry.io/otel/exporters/stdout/stdouttrace, go.opentelemetry.io/otel/metric, go.opentelemetry.io/otel/sdk, go.opentelemetry.io/otel/sdk/metric, go.opentelemetry.io/otel/trace, google.golang.org/protobuf, k8s.io/api, k8s.io/apimachinery, k8s.io/client-go)

Ignored or Blocked

These are blocked by an existing closed PR and will not be recreated unless you click a checkbox below.

Detected dependencies

github-actions
.github/workflows/build.yml
  • actions/checkout v4
  • docker/login-action v3
  • earthly/actions v1
  • codecov/codecov-action v4
.github/workflows/pr.yml
  • actions/checkout v4
  • docker/login-action v3
  • earthly/actions v1
  • codecov/codecov-action v4
gomod
go.mod
  • go 1.21
  • connectrpc.com/connect v1.16.1
  • connectrpc.com/otelconnect v0.7.0
  • github.com/buraksezer/olric v0.5.6-0.20240205222928-c5efb0d4b5ea@c5efb0d4b5ea
  • github.com/caarlos0/env/v11 v11.0.0
  • github.com/cespare/xxhash/v2 v2.3.0
  • github.com/deckarep/golang-set/v2 v2.6.0
  • github.com/flowchartsman/retry v1.2.0
  • github.com/google/go-cmp v0.6.0
  • github.com/google/uuid v1.6.0
  • github.com/grandcat/zeroconf v1.0.0
  • github.com/joho/godotenv v1.5.1
  • github.com/nats-io/nats-server/v2 v2.10.14
  • github.com/nats-io/nats.go v1.34.1
  • github.com/pkg/errors v0.9.1
  • github.com/prometheus/client_golang v1.19.0
  • github.com/redis/go-redis/v9 v9.5.1
  • github.com/reugn/go-quartz v0.11.2
  • github.com/spf13/cobra v1.8.0
  • github.com/stretchr/testify v1.9.0
  • github.com/travisjeffery/go-dynaport v1.0.0
  • go.opentelemetry.io/otel v1.25.0
  • go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.25.0
  • go.opentelemetry.io/otel/exporters/prometheus v0.47.0
  • go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.25.0
  • go.opentelemetry.io/otel/metric v1.25.0
  • go.opentelemetry.io/otel/sdk v1.25.0
  • go.opentelemetry.io/otel/sdk/metric v1.25.0
  • go.opentelemetry.io/otel/trace v1.25.0
  • go.uber.org/atomic v1.11.0
  • go.uber.org/goleak v1.3.0
  • go.uber.org/multierr v1.11.0
  • go.uber.org/zap v1.27.0
  • golang.org/x/net v0.24.0
  • golang.org/x/sync v0.7.0
  • google.golang.org/protobuf v1.33.0
  • k8s.io/api v0.29.4
  • k8s.io/apimachinery v0.29.4
  • k8s.io/client-go v0.29.4
  • k8s.io/utils v0.0.0-20240423183400-0849a56e8f22@0849a56e8f22
regex
Earthfile
  • tochemey/docker-go 1.21.0-1.0.0
.github/workflows/build.yml
  • earthly/earthly v0.8.9
.github/workflows/pr.yml
  • earthly/earthly v0.8.9

  • Check this box to trigger a request for Renovate to run again on this repository

[refact] refactor the code to enable gRPC interceptors

This is necessary because of:

  1. The breaking change introduced by the new version of otelconnect
  2. Only enable the gRPC interceptors when metric or traces are enabled on the actor system
  3. Enhance the API implementation to optionally toggle observability or not for remote calls

[feat] Add an ID and Equal methods to PID

The current implementation does not allow the comparison of two PID and there is no proper way to get the ID of an actor.

Current solution

The current solution is to use the actor path.

[feat]Reimplement cluster engine

Implementation details: Simple distributed in-memory key/value store

  • Allow leader/coordination re-election when cluster topology changes
  • Handle efficiently network partition
  • Data should be sharded in the cluster
  • Able to move left node data to another node efficiently

Events Stream for eventsourcing

We need to provide a event stream to publish events persisted on the event store so that downstream applications can subscribe to those events and build some read models.

  • The event streams should be singleton which means once instantiated it should be running until the actor system is shutdown
  • Actor can publish messages to the stream
  • Actor can subscribe to messages
  • External service can subscribe to messages (TBD)

Alternatives

  • Make use of gRPC streaming
  • Use a golang standard streaming library

Feature: Add a deadletter buffer

Messages that cannot be delivered or handled will push to a deadletter box. The purpose of this feature is help debugging when there is some inconsistency in messages processing.

This feature will require:

  • deadletter queue(memory/durable storage)
  • event stream(pub/sub)
  • deadletter logging mechanism

Metrics integration

  • Implement the various metrics to track for an Actor using Opentelemetry
  • Implement the various metrics to track for an Actor system using Opentelemetry
  • Add a default Prometheus server to scrape the metrics

[feat] Implement Batch(Tell/Ask)

  • BatchTell- helps send a bunch of messages to a given actor to process in a fire-forget manner.
  • BatchAsk - helps send a bunch of messages to a given actor to process and expect responses. The responses will be sent in the same order as the messages.

The messages will be processed one after the other in the order they are sent.

[feat] Actor redeployment when cluster mode is enabled

The cluster implementation simply relies on a distributed sharded key/value store engine. It handles the rebalance and the sharding mechanism.

  • A single instance of an actor is created when cluster mode is enabled. The actor identifier is propagated across the cluster.
  • Each node in the cluster has a copy of the created actor identifier and can easily refers to the location of the actor.
  • When the node where the given actor is created dies, then the other nodes in the cluster are aware of the event. However, at the moment, Go-Akt is not doing actor redeployment which is the issue we need to address.

It is an issue because that actor cannot be found when its node dies

[feat] discovery.HostNode() should accept configs rather than read env directly

Reading and merging env to configs should be handled by user using some modules like viper or koanf.

To my opinion, I always avoid using env and put all configs in local config files. I have come accross many times that a mysterious env is controlling a key feature and no one knows about it. It is a nightmare to debug.

Message stashing during behavior changes

  • Ability to stash incoming messages between behavior changes
  • Ability to unstash messages

Stashing enables an actor to temporarily buffer all or some messages that cannot or should not be handled using the actor’s current behavior.

A typical example when this is useful is if the actor has to load some initial state or initialize some resources before it can accept the first real message. Another example is when the actor is waiting for something to complete before processing the next message.

[fix] Cluster not exiting when olric fails to start

{"level":"info","ts":"2024-04-19T13:59:51+08:00","msg":"enabling clustering..."}
{"level":"info","ts":"2024-04-19T13:59:51+08:00","msg":"starting cluster engine..."}
{"level":"info","ts":"2024-04-19T13:59:51+08:00","msg":"Starting GoAkt cluster Node service on (DESKTOP-1T48526:9000)....🤔"}
{"level":"info","ts":"2024-04-19T13:59:51+08:00","msg":"Olric 0.6.0-alpha.1 on linux/amd64 go1.22.2 => olric.go:319"}
{"level":"error","ts":"2024-04-19T13:59:51+08:00","msg":"Failed to run the routing table subsystem: mDNS domain is not provided => olric.go:347"}
{"level":"error","ts":"2024-04-19T13:59:51+08:00","msg":"failed to start the cluster Node=(thingcross-0).💥: mDNS domain is not provided"}
{"level":"info","ts":"2024-04-19T13:59:51+08:00","msg":"127.0.1.1:9000 is gone => olric.go:448"}

go func() {
if err = n.server.Start(); err != nil {
logger.Error(errors.Wrapf(err, "failed to start the cluster Node=(%s).💥", n.name))
if e := n.server.Shutdown(ctx); e != nil {
logger.Panic(e)
}
}
}()
<-startCtx.Done()

The context is never cancelled.

[feat] reimplement tracing

  1. Make tracing configurable

  2. Add span tracing to the following methods on PID and ActorSystem:

    • Tell
    • Ask
    • BatchTell
    • BatchAsk
    • init
    • Start (for actor system)
    • Stop (for actor system)
    • Shutdown
    • Spawn (for actor system)
    • ReSpawn (for actor system)
    • Kill
    • Forward
    • etc...
  3. Record errors in the span

A a partitioner interface for cluster

The current implementation does not allow the user to specify his/her partitioner function using a different algo. This can be an issue when the integration requires a different hash algo

  • Add an interface to make the partitioner function customisable
  • Add a default partitioner
  • Add an option to set the custom partition in the actor system

[feat]: Add Initialized system message

  • Push that message to the actor mailbox after the actor has successfully started.
  • The message can be handled in the receive loop.

message definition:

// Started is used when an actor has successfully started
message PostStart {
}

[feat] Better discovery config passing

NewDiscovery() should accept typed configs type discoConfig struct rather than using discovery.Config{}.

disco := dnssd.NewDiscovery(dnssd.Config{
    Domain: "xxx",
    IPv6: false
})

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.