Giter Site home page Giter Site logo

marshal's Introduction

Marshal - a Kafka consumer coordination library

GoDoc Build Status

Marshal is in beta. We have deployed it in a few places and are working to ensure it's stable and fast. It is not 100% battle tested yet, feedback is very welcome.

Purpose

This project assumes you have some familiarity with Kafka. You should know what a topic is and what partitions are.

In Kafka, the unit of scalability is the partition. If you have a topic that is getting "too busy", you increase the partition count. Consumption of data from those busy topics requires consumers to be aware of these partitions and be able to coordinate their consumption across all of the consumers.

Traditional setups use Zookeeper or some other system for coordinating consumers. This works in many situations, but introduces a point of failure that isn't necessary. It is possible to completely perform consumer coordination using Kafka alone.

Additionally, getting consumer coordination correct is a rather taxing exercise in development and, frankly, shouldn't need to be done for every single project, company, etc. There should be an open source system that handles it for you.

Marshal is a library that you can drop into your Go programs and use it to coordinate the consumption of partitions across multiple processes, servers, etc. It is implemented in terms of Kafka itself: zero extra dependencies.

Marshal is designed for use in production environments where there are many topics, each topic having hundreds of partitions, with potentially thousands of consumers working in concert across the infrastructure to consume them. Marshal is designed for big environments with critical needs.

Usage

This module is designed to be extremely simple to use. The basic logical flow is that you create a Marshaler and then you use that to create as many Consumers as you need topics to consume. Logically, you want one Marshaler in your program, and you want a single Consumer per topic that you need to consume from.

Here's the simplest example (but see a more complicated example in the example directory):

package main

import "fmt"
import "github.com/zorkian/marshal/marshal"

func main() {
    marshaler, _ := marshal.NewMarshaler(
        "clientid", "groupid", []string{"127.0.0.1:9092"})
    defer marshaler.Terminate()

    consumer, _ := marshaler.NewConsumer(
        []string{"some-topic"}, marshal.NewConsumerOptions())
    defer consumer.Terminate()

    msgChan := consumer.ConsumeChannel()

    for {
        msg := <-msgChan
        fmt.Printf("Consumed message: %s", msg.Value)
        consumer.Commit(msg)
    }
}

If you were to hypothetically run this against a cluster that contained a topic named some-topic that had 8 partitions, it would begin claiming those partitions one by one until it had them all. If you started up a second copy of the program, it would only claim the partitions that are not already claimed. If the first one dies, the second one will pick up the dropped partitions within a few minutes.

In essence, Marshal takes all of the effort of consumer coordination out of your software and puts it where it belongs: on Kafka.

How Coordination Works

Please read this section to get a handle on how Kafka performs coordination and the guarantees that it gives you. In particular, the failure scenarios might be interesting.

If you want the gory details about the protocol used internally, please see the PROTOCOL documentation. You don't need to read and understand it, though, but it might be useful.

Basic Coordination

In essence, Marshal uses a special topic within Kafka to coordinate the actions of many consumers anywhere in the infrastructure. As long as the consumers can connect to the Kafka cluster you want to coordinate, you can use Marshal. There is no language dependency either -- Marshal the algorithm could be implemented in any language and consumers could coordinate with each other.

We assume that you're familiar with the basics of Kafka -- notably that each partition is effectively a write-ahead log that records an ordered set of events, and that it's not possible (barring unclean leader elections) for two consumers to see different event orderings. Marshal takes advantage of that property to perform distributed coordination.

When a program using Marshal starts up, the first thing it does is read the logs in the coordinating topic. These logs contain certain events, such as: claim partition, heartbeat, and release partition to name a few.

Using these events Marshal can know not only what consumers exist, but what partitions they are currently working on and how far along they are. Using that information the local program can decide such things as "which partitions are unclaimed" and then take action to claim and begin consuming those partitions.

Groups and Clients

Coordination happens within "groups". When you create a Marshaler you can specify the group that your consumer is part of. All claims are done on a per-group basis, which means you can consume the same topic N times -- as long as you have N groups. There is a one-to-one mapping between "consumers that can claim a given partition" and "number of groups".

The "client ID" specified when you create a Marshaler is used to identify a particular instance of a program. These should be unique per instance of software, but they should be reasonably stable. At Dropbox we use the name of the machine the software is running on, plus possibly an instance ID if we run multiple copies on a single box.

Consumption of Messages

The main engine of Marshal happens when you create a consumer and call consumer.Consume(). This will possibly return a message from one of the partitions you have claimed. You then do something with the message... and consume the next one. You don't have to do anything else.

Behind the scenes, the act of consuming updates internal cursors and timers and will possibly generate heartbeat messages into the Marshal event log. These messages contain information about the last offset consumed, allowing other consumers (and monitoring systems) to know where you are within the partition. In case of failure, they can resume at the last point you heartbeated.

Presently, all consumption within Marshal is at least once. In case of most consumer failures, it is likely a block of messages (one heartbeat interval) will be reprocessed by the next consumer.

Message Ordering

Kafka guarantees the ordering of messages committed to a partition, but does not guarantee any ordering across partitions. Marshal will give you messages from any partition it has claimed, so in essence, Marshal does not guarantee ordering. If you need message ordering, this library is not presently appropriate for you.

If you are having throughput problems you should increase the number of partitions you have available so that Marshal can have more in-flight messages.

Failure Modes

This documents some of the failure modes and how Marshal handles them. Please let us know about more questions and we can analyze and write about them.

Consumer Too Slow

In the case where a consumer is too slow -- i.e. it is consuming more slowly from a partition than data is coming in -- Marshal will detect this and internally it will start failing its health checks. When this happens it will, after enough time has passed, decide that it is not able to sustain the load and will voluntarily surrender partitions.

This is useful as a load balancing mechanism if you happen to have one consumer that ends up with 8 claims while another has only a handful, the former can shed load and the latter will pick it up.

However, it is worth noting that in the unbalanced scenario, as long as the consumers are keeping up with the traffic they won't release partitions. It is perfectly valid right now for Marshal consumers to end up unbalanced -- as long as they're all pulling their weight.

Consumer Death: Expected

If a consumer dies or shuts down in an expected (controlled) way, Marshal will attempt to commit release partition events into the log. If this happens successfully then other consumers will be able to pick up the partitions within seconds and begin consuming exactly where the last consumer left off.

No data is skipped or double-consumed in this mode and the downtime is extremely minimal.

Consumer Death: Unexpected

If a consumer dies unexpectedly, things are slightly worse off. Assuming a hardware failure or other such issue (network split, etc), the partition's claim will start to become stale. From the perspective of the rest of the fleet, they will have to wait an appropriate interval (two heartbeats) until they can claim the partition.

Data might be double-consumed, but the maximum amount is one heartbeat's worth. Depending on the last time you heartbeated, at worst you will see that many messages be double-consumed. The downtime of consumption is also up to two heartbeat intervals at worst.

Network Partitions

Since Kafka can only have a single leader for a partition, any consumers that are on the side of the leader will be able to continue working. Consumers that are on the other side will fail to heartbeat and will stop being able to work -- even if they could otherwise reach the leader for the topics they were consuming.

The consumers on the side of the Marshal coordination partitions will be able to tell that the other consumers dropped off and will be able to start working. (Of course, this may cause them to overload themselves with too many claims, leading to consumer slowness.)

If the partition is between the consumer and Kafka, the consumers will be unable to consume and will also fail their heartbeat. This is effectively treated as Consumer Death: Unexpected. When the partition heals, the consumers that lost their lock will know (assuming machine time is synchronized) and will abandon their claims.

Important Notes

This system assumes that timestamps are valid. If your machines are not using NTP to synchronize their clocks, you will not be able to get deterministic behavior. Sorry.

Marshal also relies on all actors being good actors. Malicious users can cause the system to act unpredictably or at their choosing.

Frequently Asked Questions

Here are some questions we've seen. For more, see us on IRC.

My consumers are unbalanced; one has more partitions than the others.

This is a design property of Marshal's implementation. We start with the premise that we can capably health check ourself and determine whether or not we are keeping up with our current claims. If that's true, then it doesn't matter how many partitions we have -- we'll be healthy.

This means that we can end up in a state where one consumer has several partitions and another consumer has fewer (or none), but Marshal guarantees that all of them will be healthy.

My consumer isn't claiming any partitions.

This usually happens when you are reusing Client IDs and your consumer has previously become unhealthy and released partitions. A sick consumer will not reclaim partitions it has previously released.

Make sure you have multiple consumers with different Client IDs, or make sure that in the single consumer use case you are using randomly generated Client IDs every time your program starts.

Bugs and Contact

There may be bugs. This is a new project. There are tests, however, and we very much welcome the submission of bug reports, pull requests, etc.

Github: https://github.com/zorkian/marshal

IRC: #kafka-marshal on Freenode

marshal's People

Contributors

drtall avatar robot-dreams avatar zorkian avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

marshal's Issues

Switch to protobuf?

The current protocol is text based which is nice for some things, but it's also pretty verbose. A binary protocol would be more efficient, but also add a lot of work to implement in various languages. The current text based protocol has that ease of use advantage (plus readability for humans, a dubious benefit) but has extensibility problems.

@DrTall, thoughts on this?

Batching consume API?

This was brought up in discussion today, but particularly for the AMO consumer when we get it it would be nice to have batching. I.e. when you consume you get a batch of messages and you can deal with them however you want, rather than just consuming one by one with the current API.

Import confusion

Should users be importing github.com/dropbox/marshal, or github.com/zorkian/marshal? Could one of these be deprecated?

Code Review Notes

Not sure if an Issue is the best way to track this but github doesn't seem to have any review features outside a pull request.

I spent about an hour reading the code this morning so I haven't read everything yet, but I thought I would send over the comments I do have so far. My main macro-level concern is that I haven't found the code for the logic of "Somebody else has claimed a partition I think that I have claimed, and I need to release my claim because it is stale.".

marshal.go
35: Is this a stale comment or does the Wait on line 91 do something different?

consumer.go
51,57: The comments say one of these is the offset of the 'next message' and the other is the offset of the '"next" message'. I don't understand the distinction.
100: Perhaps the caveat about Consumer needing to be discarded and rebuilt when partition count increases ought to be replicated in this comment block?
117: Consider making behavior a struct behaviorOpts so you can add more options later without breaking existing code.
347: If this returns an err it seems like we should probably not ignore it? If a consumer can't get Kafka metadata it should probably stop consuming after some timeout? Now, it may be that line 293 will cause this to happen but it's not 100% clear to me.
300: How would nowTime ever exactly equal claim.startTime? The claim start time only gets set in tryClaimPartition but nowTime is from line 277. I am guessing you are trying not to divide by zero on line 306 but if I am right that it should be impossible I think it ought to be its own if statement with its own comment. If I'm wrong about it being impossible then nevermind of course.
355: I don't understand why it is safe to hold on to any unhealthy partitions. If I understand the protocol correctly, other consumers may have already picked up these partitions if we haven't issued timely heartbeats.
225: How does this get cleaned up? It seems like line 300 will make sure this never gets unclaimed if we don't have a consumer running on it?
457: Would this be cleaner if there were a single message chan for the consumer (in addition to the per-claim chans) and there were just a single goroutine ranging over the list of claims to populate the consumer-level chan? This would avoid starvation for the individual claims and also make Consume more straightforward. You could just block on reading from this chan instead of busy waiting (lines 468-471).
489: How do we know the next offset is 1 more? If we are reading a compacted topic can't it be an arbitrary offset larger than the current one?
166,212,216,493,494: These reads/writes to lastHeartbeat appear to be unsynchronized across goroutines.

rationalizer.go:
137: How do we end up with multiple pending claims for a partition? We would need to get a call to ClaimPartition for this partition twice in the span of the rationalizer running right? But that call can only come from tryClaimPartition from claimPartitions from manageClaims from NewConsumer. Since NewConsumer is a singleton and all the calls after go manageClaims are blocking, I don't see how ClaimPartition could have two oustanding calls simultaneously.
150-155: Just call fireEvents(topic.partitions[msg.PartID].ClientID == w.clientID && topic.partitions[msg.PartID].GroupID == w.groupID)
169-173: Just call fireEvents(true) unless 172 is a typo in which case just call fireEvents(msg.ClientID == w.clientID && msg.GroupID == w.groupID)

Real at-least-once consumption

Currently the Consumer returns messages and, as soon as the client retrieves them, it marks them as consumed. This is technically not ALO nor is it AMO, since there are at least some failure cases:

  • If client retrieves message and dies while processing, we might have already heartbeated with the new offset (since it's recorded on retrieval) and this means the message never got processed. This COULD be AMO, but...
  • The heartbeats are done asynchronously so it could be up to a heartbeat interval before we actually do report that we processed the offset. This means that if we died the next person who took over for us would resume at an earlier offset, violating AMO.

We need to keep track of each message we retrieve and give to the client, and the client needs to be responsible for telling us that they've successfully processed the message -- if they want true ALO semantics.

This is complicated because we want to be able to distribute as many messages in parallel as they want to consume (i.e. we shouldn't limit their concurrency factor) so we can't just give them the head message and wait for them to mark it "done".

I'm going to pursue an implementation where:

  • Messages that are returned from Consume have some metadata that includes (partition, offset) so we can later identify it
  • The client is responsible for calling ConsumeDone with that (partition, offset)
  • Heartbeat will only automatically move our current offset to the minimum outstanding message offset within a partition

This means we could potentially have the case where someone could consume a message and the client could wedge on that message -- but other routines continue processing many messages. With one message "stuck" like this our heartbeat will never progress forward and we'll start failing health checks even though, in large part, we are healthy. But this is probably the correct action.

Allow "topic claim" mode

Marshal should support an option where you can specify that you want a claim to be on the entire topic. This is used to support the sharded production use case, where you want to ensure all messages in a given shard go to a given consumer.

cc @DrTall @basharal

Use Kafka committed offsets too

The Marshal topic is not log compacted (and I don't think can be with our algorithm), so instead we solve scalability by making it expire quickly. Since heartbeats are relatively short as long as there's enough time for several heartbeats to remain in the logs then it's fine.

The downside however is that if you have a consumer group go away for longer than the retention period, we lose the last processed offset. This is suboptimal.

We should instead also commit the offsets of the groups to Kafka so that when we recover a consumer we can use the greatest of (Kafka offset, Marshal offset). This also means that monitoring that depends on the Kafka committed offsets will work automatically on Marshal consumers, which is a nice benefit.

Implement fast-reclaim mode

When a consumer starts up it should read to see if any partition claims are still fresh with the same clientid/groupid that it has. If so, it can resume consuming those partitions. This minimizes turnover when you restart things, although it depends on setting the clientid.

Add instance ID to marshal/protocol

In the situation where you start up two consumers with the same client/group IDs, bad things could happen -- well, mostly double consumption of messages.

I propose adding an Instance ID to the Marshaler object. This will be randomly generated and can be used as a way to detect that a protocol message didn't come from us. Expected use would be, for example, if we see a Heartbeat that is from our group/client but not from us -- we can stop all work and exit, throwing exceptions.

@DrTall, thoughts?

Support a limit on number of claims

By default Marshal will claim as many as it can until it becomes unhealthy.

A user has requested the ability to put an upper bound. This is useful in the situation where you know that your consumers can never operate with more than X partitions/topics. It does put more work on the part of the client to ensure they never underprovision, but this should be fine.

cc @DrTall @basharal

Add load/lag data to protocol messages

The Heartbeat message should contain some indication of consumer load/lag. This will enable us to implement some more aggressive load shedding/balancing algorithm (if we want). At the very least it makes the problem of monitoring Marshal consumers much more straightforward.

Batch claim mode

Right now doing a single claim can take ~1s since low volume produce/consume requests involve some amount of waiting for Kafka to commit things.

Since we know immediately how many partitions are unclaimed and how many partitions we want to claim, there's no harm in attempting to batch up ClaimPartition messages.

Ordered consumption

Right now when you consume messages they are technically delivered in-order (if they came from a single partition) but that's not really a useful statement when the library encourages you to shard them off to your various goroutines.

As a suggested option, if you set an "ordered" option then the consumer should only return a single message (or ordered batch) from a partition at a time. The next call to Consume would return a message/batch from a different partition. If all partitions have outstanding unacknowledged messages then Consume would block.

The client would then be responsible for acknowledging the messages (standard in the ALO consumer) and this would "unblock" the Consume call with the next available message. This guarantees that you process things in order without having to do any tricky logic on your end. (At a performance penalty, but that's expected.)

Create callback for "overdue" work

Right now Marshal is vulnerable to a type of situation where, if you get a message that causes your processing to break, your committed offset will never be advanced since that message will end up the "oldest outstanding" and Marshal will not advance.

One solution to this is to be able to give Marshal a callback and a timeout and, when we detect that a message has been outstanding for to too long, Marshal will call you and give you a copy of the message. You can then take whatever action you want (such as moving the message to a new queue or firing an exception or etc).

The proposed implementation:

  • New consumer option MessageTimeout which specifies the number of seconds before we will let you know a message has "timed out". This is measured by the time from the message being returned in the Consume call.
  • New consumer option MessageTimeoutCallback which is called with a *proto.Message. When this callback returns, Marshal will consider the message to have been committed and knows it can advance the offset.

Thoughts?

cc @DrTall @basharal

Consumer "piecemeal claim"

With the Marshal algorithm it is possible to do partial claims. Either claiming a block of messages (i.e. everything from X offset to Y offset) or maybe even doing something like "claiming every modulo-2 offset". There are benefits to both approaches.

Anyway, it would be nice if it was possible for consumers to be able to do this and not have to worry about having exactly N partitions for scaling.

Clarify partition surrender logic

The README describes some kind of automatic load balancing of partitions to consumers via groups, wherein partitions are "surrendered" when Marshal notices that the consumer read counter falls too far behind the topic write counter. Does this surrendering always happen, or just when there are other connected consumers in the same group available to receive the surrendered partition?

Just double checking that a partition would not be surrendered in the case that there is only one connected consumer in a group, which would cause the offset to fall even further behind heh

NewMarshaler should block until rationalizers are up

As it says, creating a marshaler should block until the world state has been determine the first time. Otherwise, callers will immediately start making requests and will take actions based on incomplete world state. This is ultimately safe (the protocol protects against failures from this) but it does cause extraneous 'attempted to claim already claimed partition' errors.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.