Giter Site home page Giter Site logo

kmgreen2 / agglo Goto Github PK

View Code? Open in Web Editor NEW
2.0 4.0 0.0 57.08 MB

Agglo: A Process-Anywhere Framework for Event Stream Processing

Home Page: https://kmgreen2.github.io/agglo/

License: MIT License

Go 76.51% Makefile 0.48% HCL 0.64% Shell 0.96% HTML 0.19% CSS 0.10% TypeScript 20.92% Dockerfile 0.20%
edge-computing iot streaming event-driven edge stream-processing integrations ipaas

agglo's Introduction

Agglo: A Process-Anywhere Framework for Event Stream Processing

Workflow Status

Overview

Agglo is an experimental event stream processing framework that enables lightweight, reliable and scalable stream processing alongside persistent object storage, key-value storage or stream processing platforms.

Binge (BINary-at-the-edGE) is the main artifact of the framework. As the name implies, it is a single, compiled binary and can run as a stateless daemon, persistent daemon or as a stand-alone command. This allows the same binary to be deployed in edge gateways, as Kubernetes deployments, in load balancers, cloud (lambda) functions, or anywhere else you need to perform stream processing. The deployed artifact is simply a binary and a single JSON config that defines the event processing pipelines and configuration for connecting to external object stores, key-value stores, HTTP/RPC endpoints and stream processing systems.

With the exception of persistent daemon mode, binge is completely stateless and can be seamlessly scaled, killed and restarted. When run as a persistent daemon, binge requires a volume (local or cloud managed, such as EBS) to maintain a durable queue. If a cloud-managed block storage volume is used to back the durable queue, a persistent daemon may also be treated as if it was stateless. In this way, we like to think of binge as nginx-for-stream-processing.

Rationale and Approach

We already have plenty of stable, reliable stream processing frameworks, so why do we need this one?

Most stream processing is done explicitly using a framework, such as Spark, Flink, Kafka Streams, etc. or via a custom microservice architecture that leverages a reliable queue or event bus. Many stream processing tasks are simple transformations, filters, aggregations, annotations, etc. that can be done at the point of ingestion. This becomes more and more important with IoT use cases, where ingestion points (the edge) is far away from your operations VPC(s). In short, we believe that many stream processing tasks can be accomplished without the use of stream processing systems and/or custom microservice architectures.

Our approach is to provide a single multi-use binary that can handle most stream processing tasks, where any stateful interactions are handled by external key-value stores, object stores or file systems, and more complex stream processing tasks can be forwarded to the appropriate stream processing system. This provides the flexibility to deploy the binary to the most appropriate "edge" (e.g., load balancer, IoT gateway, Lambda function, etc.) depending on the use case. In addition, it provides the flexibility to rely on external systems for stateful interactions, which can also exist at the edge (e.g., Agglo provides in-memory key-value stores), existing on-prem deployment or cloud-managed deployment.

Agglo/binge is not intended to replace existing stream processing systems. While there are many cases where it could be the main technology used for stream processing, it is also a perfect fit for complimenting existing stream processing workloads, since it can preprocess and route events from the edge to a centrally managed stream processing system or event bus.

The main insight here is that there is no need to do all stream processing in traditional SPE. The value here is to provide the ability to tradeoff cost, latency, throughput, consistency by using binge with other systems. Our hypothesis is that binge can be used as the basis of any event-driven architecture, where the binge processes are distributed to different edges (IoT gateways, LB, Kubernetes, Lambda) and utilize managed systems for persistence and more involved processing.

While we believe that Agglo/binge is best used in conjunction with existing stream processing systems, we also believe that it can be the main building block for IoT event processing, building complex iPaaS (integration-platform-as-a-service) architectures and any future use case that requires highly distributed event processing from varied sources.

Overview

The four main components that influence the use of Agglo are shown in the figure below: event generation, event processing, persistence and additional processing. Events are generated by webhooks (e.g. integrations such as Slack , Jira, Facebook, etc.), explicit API calls, IoT devices, or really anything that can call an API endpoint. The core of Agglo is the binge process, which can run as a Lambda/Cloud function, a pod in a Kubernetes ReplicaSet, a process on compute instances, a process in an IoT gateway, or anywhere that can run a Linux process with a network connection. In addition to basic stream processing tasks, binge also provides the ability to call external systems for persistence and additional processing.

The binge component is the icon with a pipe, gear and trifurcated arrows. As shown in the figure, binge runs as the main event processing process and can also be used for additional processing after initial processing.

Overview

Getting Started

To make life easier, it is best to make sure the tests pass before doing anything else. This requires that the following is installed:

  • aws-cli: Local AWS services are used for testing
  • go-mock: Mocks are generated at test time (I'll probably just check the mocks in eventually)
  • minio-client: Used for testing local object stores

First, make sure everything builds and the unit-tests pass:

make test

Once the tests pass, you can try out some of the Examples to get an idea of how agglo and binge work.

Configuration Generator

The raw JSON pipeline configs can become very unwieldy. Please use the Agglo Config Generator to generate all of you configs. A few caveats:

  1. The maintainer (@kmgreen2) is not a front-end developer. I tried to do things the "React-way", but am sure things can be way better.
  2. It does not explicitly save your progress, so you should periodically "Download" from the "Editor" screen to ensure you do not lose you edits.

This will be enhanced as I find time or others to help. This was my first React project, so am sure someone else can do way better.

Main Components

There are 5 main components to Agglo:

  1. Process: A stage in the pipeline that will consume an input map, perform an operation (annotation, aggregation, completion, filter, spawner, transformation, tee, continuation, entwine) and output a map.
  2. Pipeline: An ordered collection of processes applied to an event.
  3. External systems: A connector to an external system that can used by a process. Today, we support S3-like Object Stores, POSIX filesystems, Key-value stores, messaging/pubsub systems and REST endpoints.
  4. Binge: A event processing binary and can run as a stateless daemon, persistent daemon or as a stand-alone command.
  5. Pipeline configuration: A binge instance is instantiated using the pipeline configuration that contains one or more pipelines and their dependent processes and external systems.

Processes

All processes must implement the following interface:

type PipelineProcess interface {
	Process(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error)
}

In general, a process will perform one or more actions based on the input map in. While the input map can technically be anything serialized to/from map[string]interface{}, we currently support JSON. Each process will also output a map, which is the input to the next process in the pipeline, or gets dropped on the floor if it is the last process in a pipeline. The input to the first process is the raw event posted to or read by binge.

See the process README for detailed information on processes.

Pipelines

Please refer to the examples for examples of pipelines.

External Systems

Adding a new external system is as "simple" as implementing the proper interface.

  • KVStores

    Current Support: In-memory local key-value store, local DynamoDB and managed DynamoDB

    See KVStores for the current implementations.

    Implementing a new key-value store is a matter of implementing the following interface:

    type KVStore interface {
    	AtomicPut(ctx context.Context, key string, prev, value []byte) error
    	AtomicDelete(ctx context.Context, key string, prev []byte) error
    	Put(ctx context.Context, key string, value []byte) error
    	Get(ctx context.Context, key string) ([]byte, error)
    	Head(ctx context.Context, key string) error
    	Delete(ctx context.Context, key string) error
    	List(ctx context.Context, prefix string) ([]string, error)
    	ConnectionString() string
    	Close() error
    }
    
  • Object Stores

    Current Support: In-memory local object store and Minio (supports most managed object stores).

    See ObjectStores for the current implementations.

    Implementing a new object store is a matter of implementing the following interface:

    type ObjectStore interface {
    	Put(ctx context.Context, key string, reader io.Reader)	error
    	Get(ctx context.Context, key string) (io.Reader, error)
    	Head(ctx context.Context, key string) error
    	Delete(ctx context.Context, key string) error
    	List(ctx context.Context, prefix string) ([]string, error)
    	ConnectionString() string
    	ObjectStoreBackendParams() ObjectStoreBackendParams
    }
    
  • Pubsub

    Current Support: In-memory local pub/sub (Kafka coming soon)

    See Streaming for the current implementations.

    Implementing a new publisher is a matter of implementing the following interface:

    type Publisher interface {
    	Publish(ctx context.Context, b []byte) error
    	Flush(ctx context.Context, timeout time.Duration) error
    	Close() error
    	ConnectionString() string
    }
    
  • REST Endpoints

    All HTTP/S requests are supported.

Entwine: Immutable, Partial Ordering of Events

Entwine is a special process that provides the ability to "entwine" event timelines. It is similar to existing blockchain technologies in that is relies on hash chains for immutability and zero-knowledge proofs for secure verification of individual event timelines. There is no consensus mechanism. The assumption is that each event timeline is generated by an individual, single organization, binge binary or collection of binge binaries.

See the Entwine README for more information.

agglo's People

Contributors

kmgreen2 avatar

Stargazers

 avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

agglo's Issues

Create entwinectl Utility

This should connect to a KVStore and/or Ticker API to do things such as:

1.) Evaluate happened before (Ticker API)
2.) other stuff...

Implement GCS Object Store

Can probably just use Minio for this and have the connection string differentiate between GCS, S3 and Azure

Audit for Adding errors.Wrap

There are many places where we just return err, which makes troubleshooting very hard. Audit the entire code base and errors.Wrap() to make error reporting mo' betta

Entwine Process has Race Conditions

The main race is:

  1. Many processes appending to the same substream
  2. Many processes anchoring

The problem is the current anchor code will formulate an incorrect proof, because it builds the proof based on the current head of the substream. All append+anchor operations to the same substream should be synchronized.

Failed Futures Should Cancel Child Futures

Basically need to cancel all futures that occur after a failed future, provided the future is required.

The current behavior is to fail all futures after a failed future. This is confusing, because all child futures have the same failure message as the originally failed future.

Here are examples based on the following chain: CreateFuture(r1).Then(r2).Then(r3).Then(r4) == f1, f2, f3, f4

  1. f1 succeeds and f2 fails: f1 == success, f2 == failed, f3 ==canceled, f4 == canceled
  2. f1 succeeds, f2 fails and f2 is fail-able, f3 fails: f1 == success, f2 == failed, f3 == failed, f4 == canceled

StreamStore and Ticker Needs to Store State Externally

The main state is a head for each substream, a head for the ticker and a distributed lock for each substream and ticker.

We can just use the underlying KVStore for this stuff. Just need to make sure there are no naming conflicts.

Distributed Lock Fails Under Load

Noticed this working on #56 where increasing the concurrency of a test client > 1 can lead to a 400 from Dynamo when appending to a stream (entwine.go:136)

Create NewSubstreamStore Constructor

The constructor should should return an object that wraps StreamStore and does operations on a given substream.

This will be used by StreamStoreTee() that will process in-bound messages that have an objectStore reference.

Cleanup Hash Type

We use different hash types throughout the code... Need to revisit where we are providing it as a parameter and the different enums used to track.

Bare minimum, we should remove crypto. and use gocrypto. everythwere.

Make Entwine Process

All of the entwine code is more-or-less independent of the core agglo stuff. Make it part of it!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.