Giter Site home page Giter Site logo

wework / grabbit Goto Github PK

View Code? Open in Web Editor NEW
98.0 8.0 20.0 1.84 MB

A lightweight transactional message bus on top of RabbitMQ

License: Apache License 2.0

Go 100.00%
go golang microservices rabbitmq reliable-messages saga saga-pattern transactional-message-exchange outbox

grabbit's Introduction

CircleCI Go Report Card Coverage Status GitHub release

grabbit

A lightweight transactional message bus on top of RabbitMQ supporting:

  1. Supported Messaging Styles
    • One Way (Fire and forget)
    • Publish/Subscribe
    • Aync Command/Reply
    • Blocking Command/Reply (RPC)
  2. Transactional message processing
  3. Message Orchestration via the Saga pattern
  4. At least once reliable messaging via Transaction Outbox and Publisher Confirms
  5. Retry and backoffs
  6. Structured logging
  7. Reporting Metrics via Prometheus
  8. Distributed Tracing via OpenTracing
  9. Extensible serialization with default support for gob, protobuf and avro

Stable release

the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.

Supported transactional resources

  1. MySql > 8.0 (InnoDB)

Basic Usage

  • For a complete sample application see the vacation booking sample app in the examples directory

The following outlines the basic usage of grabbit. For a complete view of how you would use grabbit including how to write saga's and handle deadlettering refer to grabbit/tests package

import (
  "github.com/wework/grabbit/gbus"
  "github.com/wework/grabbit/gbus/builder"
)

Define a message

type SomeMessage struct {}

func(SomeMessage) SchemaName() string{
   return "some.unique.namespace.somemessage"
}

Creating a transactional GBus instance

gb := builder.
        New().
    Bus("connection string to RabbitMQ").
    Txnl("mysql", "connection string to mysql").
    WithConfirms().
    Build("name of your service")

Register a command handler

handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error{
    cmd, ok := message.Payload.(*SomeCommand)
    if ok {
      fmt.Printf("handler invoked with  message %v", cmd)
            return nil
    }

        return fmt.Errorf("failed to handle message")
  }

gb.HandleMessage(SomeCommand{}, handler)

Register an event handler

eventHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) {
    evt, ok := message.Payload.(*SomeEvent)
    if ok {
      fmt.Printf("handler invoked with event %v", evt)
            return nil
    }

        return fmt.Errorf("failed to handle event")
  }

gb.HandleEvent("name of exchange", "name of topic", SomeEvent{}, eventHandler)

Start the bus

gb.Start()
defer gb.Shutdown()

Send a command

gb.Send(context.Background(), "name of service you are sending the command to", gbus.NewBusMessage(SomeCommand{}))

Publish an event

gb.Publish(context.Background(), "name of exchange", "name of topic", gbus.NewBusMessage(SomeEvent{}))

RPC style call

request := gbus.NewBusMessage(SomeRPCRequest{})
reply := gbus.NewBusMessage(SomeRPCReply{})
timeOut := 2 * time.Second

reply, e := gb.RPC(context.Background(), "name of service you are sending the request to", request, reply, timeOut)

if e != nil{
  fmt.Printf("rpc call failed with error %v", e)
} else{
  fmt.Printf("rpc call returned with reply %v", reply)
}

Testing

  1. ensure that you have the dependencies installed: go get -v -t -d ./...
  2. make sure to first: docker-compose up -V -d
  3. then to run the tests: go test ./...

grabbit's People

Contributors

avigailberger avatar danielwitz avatar vladshub avatar yuvmendel avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

grabbit's Issues

Report message handling metrics

As for now we don't have any visibility over handling messages which is one of the key feature of grabbit.

I would like a way to gather metrics for successes and failures count of handlers and of rejected messages (I think prometheus is the best way to report these metrics).

In this PR I've added an integration with prometheus and started to report the following metrics:

  • Result counter of every run of a handler (command and events and both for the message and saga handlers) with a label for success or failure.
  • Latency summary of every run of the handlers.
  • Rejected messages counter - counts the total rejected messages

Not registering different ids of the same avro entity

Describe the bug
When there are multiple ids to the same avro entity in the schema registry grabbit registers the first one (The one with the smaller id I think) and rejects the others as the entity is "already registered" in the map.
When an entity from the unregistered schema arrives grabbit fails the serialisation.

A completing saga action may get executed twice

When a saga can be completed by more than one message there may be a scenario in which both messages arrive and processed on the bus and since grabbit does not check for concurrency when deleting a saga completing actions may get executed twice.

when a deadletterhandler panics grabbit fails to reject the message

In case the deadletter handler panics grabbit fails to reject the message leaving it permanently in "delivered" state and effectively stops the service

To Reproduce

  1. create a bus instance listening to the dlq
  2. register a deadletter handler on a dlq bound to a dlx
  3. reject a message to the dlx from another service
  4. when the deadletter handler is called panic

Expected behavior
The message gets queued back and the handler reprocesses it

Actual behavior
The message remains in the delivered state and the service stops consuming messages from the dlq

transactional outbox configuration

Add the capability to configure the transactional outbox runtime parameters.

  • db polling time interval
  • page sizes
  • max delivery attempts
  • number of confirm handlers

Cleanup bus builder interface

the gbus.Builder interface has some methods that are not in use nor will be. need to delete them from the interface

Workers leak message channels

Closing a worker does not cancel the associated amqp consumer leaving the messages channel open and which causes a leak after this PR #125

Set the Type and Content-Type headers

When sending a message grabbit should set the amqp Type header and Content-Type header as follows
Type: this should be set to gbus.Message.SchemaName()
Content-Type: should be set to the wire format we are using to serialize the content

  • gob: application/x-gob
  • proto: application/x-protobuf
  • avro: avro/binary (avro 1.8.1 spec)

Add generic handler metrics with the message type as a label

Is your feature request related to a problem? Please describe.
The handler metrics that grabbit exposes are per handler which makes creating generic dashboards difficult.

Describe the solution you'd like
I'd like generic metrics with the message type as a label, for the result counters the result would be a label as well

Optimistic locking violation when saving saga instances should retry handler until success

When a saga instance gets invoked and persisted to the saga store an optimistic locking error may occur due to concurrent invocations.
Currently when this happens the general retry strategy retries the message until success or until the set max retry count is exceeded and the message gets rejected.

In highly concurrent saga's this may cause unneeded operational efforts due to failed messages.
In case of concurrency violations we should retry the message until success.

Saga configuration functions only run when saga created

Currently, the saga configuration functions run only when the saga gets created but not when a saga is fetched from the saga store.

The configuration functions need to run every time the saga is created or fetched so nonpersistent state (such as sensitive configuration, dependencies, etc) can be injected to the saga instance

Message deduplication

Add capabilities to detect and handle duplicate messages.
Currently, grabbit provides at least once messaging semantics and we should evaluate whether to provide message deduplication capabilities to provide for a "best effort" at least once messaging semantics

Dead Letter Handler does not nack/ack after Processing

Dead letter handler does not nack/ack after processing.

  1. Consume message from dead letter queue with "dead letter handler"
  2. mark message as poison
  3. the message is not enqueued.

The message should go back to the queue.

Improved documentation

Need to add more documentation and code samples illustrating different capabilities of grabbit

Persistent timeout manager

Support a timeout manager supporting durable timeouts that survive service restarts and is based on mysql

Potential issue with Logrus on multiple go routines

There is a potential issue with logrus when an instance is shared amongst multiple go rotines

Since we have added the Logged interface and using it to get access to logs we can make sure that we create new instances of logrus via WithField function to ensure that we are not sharing the same instance of logrus over multiple gorutines.

Saga's should have a way to reply to their originators

Current one can reply to a message in a single interaction (request>reply) however in some cases a saga instance may need to reply to the producer initiating the message that created the saga in some intermediate step of the saga.
In such cases, we might want to add functionality to allow for a handler to reply to the one that sent the message that triggered the creation of the saga

Allow for scaling out outbox relay to multiple go routines

Currently, the outbox relais pending messages to the rabbitmq broker via 1 goroutine.
In order to provide for improved send throughput we should add the ability to configure and scale out the number of goroutines that are relaying messages out of the outbox.

Saga timeouts should be reported via a metric

Having a saga timing out is usually a significant event that might require operational attention and perhaps some corrective action.
grabbit should have a Prometheus reported metric that should be incremented on each saga instance timing out

Allow configuring max retries

Provide a way to externally configure the maximum amount of retries handlers get executed before declaring the message as poison and rejecting it

Report operational metrics

Collect and report operational metrics

Candidate metrics

  • tx/s
  • outbox size
  • confirm rates
  • pending saga instances
  • confirm rates
  • saga timeout
  • health errors

Transactional Outbox Metrics:

  • outbox_total_records: reports the total amount of records currently in the outbox
  • outbox_pending_delivery: reports the total amount of records pending delivery currently in the outbox
  • outbox_pending_removal: reports the total amount of records that were sent and pending removal currently in the outbox

Add the capability to configure the transaction isolation level

Currently the default isolation level of the transaction that grabbit creates and passes on to handlers has an isolation level of read-committed. while some services would require an isolation level that provides more consistency in order to properly function (such as repeatable read)

We should consider allowing the configuration of the isolation level of the transaction grabbit creates.

Handling incoming messages with an empty body stops consumer

When a message with empty (nil or length 0) body is consumed, the consumption of messages stops.

To Reproduce
Send a message with no body.

Expected behavior
It's possible for the global handler or dead letter handler to process this message. normal handlers cannot (Because it cannot be decoded).

Allow Sending Raw Messages

When processing a message sent from one service ("original sending bus") to another ("original destination") fails, the message is marked as poison and, if configured, will end up in a special "dead letter queue" ("dlq").

We would like to be able to consume those messages and re-send them to their "original destination" - meaning - re-enqueueing them into their original destination queue.

Currently, when sending a message, Grabbit is setting headers that match the sending bus.

In order to restore the original flow, we need to add an "impersonation" ability, by adding headers that match to the original sending bus.

Meaning, allowing setting the headers manually - by allowing to send raw messages.

Fetching saga instances from the saga store should be optimized

The current implementation of the mysql saga store, when fetching saga instances by sage type, fetches all saved instances in one round trip which may cause a performance issue when there are large amounts of saga instances that need to be fetched.

In order to prevent a potential performance hit the fetching logic should be optimized to include paging and potentially parallel the fetching using a set of goroutines

Expose the logger in the invocation

In the implementation of a saga we always have to do a lot of copy paste to ensure we have all of the saga meta fields like saga I'd.

I'd like to have an interface of a logger exposed as part of the invocation so I won't need to initialize a new one in each function.

Include error message in rejected message headers

In order to provide for better diagnostics when handling poison messages ending up in the dead-letter queue we should have a way to include the error causing the failure of processing the message included in the rejected message headers.

Add the ability for producers to specify an idempotency key on the BusMessage.

Add the ability for producers to specify an idempotency key on the BusMessage.

In order for downstream consumers to be able to deduplicate an inbound message, there needs to be a way for upstream producers to explicitly specify a unique idempotency key that can be carried over between retries (initiated by grabbit or by the client code)

Cleanup logs

Logs are littered with a lot of entries that should be logged in level Debug but get logged in Info level.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.