@wvanbergen @snormore @mkobetic (and anyone else who cares)
The current consumer works just fine, but is limited to fetching just a single partition of a single topic at once. You can run multiple consumers, but this has several issues:
- They can't use the same client, or else the broker connection has contention problems (brokers handle requests on a single connection serially, so if it has to wait for
MaxWaitTime
before responding, this introduces severe latency when multiplexing busy and non-busy topics). Using separate clients works, but generates a lot of extra connections and metadata traffic.
- You don't get messages from multiple partitions batched into a single response, when that makes sense for efficiency.
The ideal solution is to have a consumer capable of fetching from multiple topic/partitions at once. This has been at the back of my mind for a while now, so I already have a design ready; if the design makes sense to y'all, then somebody just needs to write it :)
While the user currently specifies a single topic/partition to the constructor, they should now be able to specify a set of topic/partitions. Since some of the configuration that is currently a singleton actually needs to be per-partition (OffsetMethod
and OffsetValue
at least), this would probably be of type map[string]map[int32]*ConsumerPartitionConfig
. I considered permitting the dynamic adding/removing of partitions to the set, but I don't see a strong use case and it complicated a bunch of things.
Events are returned to the user exactly the same way they are now, over a single channel. I considered a separate channel per topic/partition but it complicated the base case, and the events already contain information on topic/partition so it's not hard for the user to dispatch appropriately if they really want to.
The constructor starts up one "controller" goroutine, which starts up and manages one goroutine per broker-that-has-a-partition-we-care-about and is responsible for (initially) dispatching each topic/partition to the appropriate broker's goroutine. The broker goroutine looks a lot like the current fetchMessages
method with a few tweaks:
- Some minor work needed to handle multiple blocks in the requests and responses.
- When a topic/partition is reassigned to a new broker, that topic/partition gets returned to the controller via a channel; the goroutine tracks how many it is "responsible" for and exits if that reaches 0.
- Similarly when a broker goes down hard, all topic/partitions are returned to the controller for re-dispatching and the goroutine exits.
I expect the success case to be fairly straightforward - as always, the complexity will come when reasoning through the failure cases and ensuring that topic/partitions are redispatched correctly, messages are not accidentally skipped in that case, etc. etc.
When the consumer is closed, it signals the controller which cleans up its children before exiting.
Thoughts?