Giter Site home page Giter Site logo

karafka / waterdrop Goto Github PK

View Code? Open in Web Editor NEW
241.0 241.0 36.0 973 KB

Standalone Karafka library for producing Kafka messages

Home Page: https://karafka.io

License: MIT License

Ruby 100.00%
apache-kafka apache-kafka-producer kafka karafka karafka-application karafka-framework ruby ruby-kafka rubygem rubygems waterdrop

waterdrop's Introduction

karafka logo

Build Status Gem Version Join the chat at https://slack.karafka.io

About Karafka

Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework that:

# Define what topics you want to consume with which consumers in karafka.rb
Karafka::App.routes.draw do
  topic 'system_events' do
    consumer EventsConsumer
  end
end

# And create your consumers, within which your messages will be processed
class EventsConsumer < ApplicationConsumer
  # Example that utilizes ActiveRecord#insert_all and Karafka batch processing
  def consume
    # Store all of the incoming Kafka events locally in an efficient way
    Event.insert_all messages.payloads
  end
end

Karafka uses threads to handle many messages simultaneously in the same process. It does not require Rails but will integrate tightly with any Ruby on Rails applications to make event processing dead simple.

Getting started

karafka web ui

If you're entirely new to the subject, you can start with our "Kafka on Rails" articles series, which will get you up and running with the terminology and basic ideas behind using Kafka:

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to visit our Getting started guides and the example apps repository.

We also maintain many integration specs illustrating various use-cases and features of the framework.

TL;DR (1 minute from setup to publishing and consuming messages)

Prerequisites: Kafka running. You can start it by following instructions from here.

  1. Add and install Karafka:
# Make sure to install Karafka 2.4
bundle add karafka --version ">= 2.4.0"

bundle exec karafka install
  1. Dispatch a message to the example topic using the Rails or Ruby console:
Karafka.producer.produce_sync(topic: 'example', payload: { 'ping' => 'pong' }.to_json)
  1. Run Karafka server and see the consumption magic happen:
bundle exec karafka server

[86d47f0b92f7] Polled 1 message in 1000ms
[3732873c8a74] Consume job for ExampleConsumer on example started
{"ping"=>"pong"}
[3732873c8a74] Consume job for ExampleConsumer on example finished in 0ms

Want to Upgrade? LGPL is not for you? Want to help?

I also sell Karafka Pro subscriptions. It includes a commercial-friendly license, priority support, architecture consultations, enhanced Web UI and high throughput data processing-related features (virtual partitions, long-running jobs, and more).

10% of the income will be distributed back to other OSS projects that Karafka uses under the hood.

Help me provide high-quality open-source software. Please see the Karafka homepage for more details.

Support

Karafka has Wiki pages for almost everything and a pretty decent FAQ. It covers the installation, setup, and deployment, along with other useful details on how to run Karafka.

If you have questions about using Karafka, feel free to join our Slack channel.

Karafka has priority support for technical and architectural questions that is part of the Karafka Pro subscription.

waterdrop's People

Contributors

agwozdowski avatar bruno-b-martins avatar dependabot[bot] avatar filiptepper avatar isturdy avatar konalegi avatar kylekthompson avatar mach-kernel avatar mensfeld avatar nijikon avatar olleolleolle avatar pavlo-vavruk avatar renovate[bot] avatar sandipsubedi avatar solnic avatar stellarxo avatar tabdollahi avatar wallin avatar webandtech avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

waterdrop's Issues

Provide a way to connect and disconnect from the cluster for both sync and async producers

  DeliveryBoy
    .send(:instance)
    .send(:async_producer)
    .instance_variable_get('@worker')
    .instance_variable_get('@producer')
    .instance_variable_get('@cluster')
    .disconnect
rescue
  false
end

on_worker_boot do
  DeliveryBoy
    .send(:instance)
    .send(:async_producer)
    .instance_variable_get('@worker')
    .instance_variable_get('@producer')
    .instance_variable_get('@cluster')
    .topics

Is instrumentation per producer only? + recovery question

Instrumentation Question:
I'm planning to do the production of messages in Sidekiq and I'd have lots of jobs doing the produce, I am planning on creating a producer on every job and then closing it after.

Is it actually recommended to create a lot of producers just as long as I close them?

In the documentation, the subscription to the producer events are per producer...I'm wondering if there's a way for it to be hooked into some initializer once and not per producer.

Recovery Question:
Currently, I am rescuing WaterDrop::Errors::MessageInvalidError and WaterDrop::Errors::ProducerClosedError on every call to produce_many_sync ... These are the 2 errors that I saw that would make the whole batch stop. Is there anything else that would make a whole batch stop? I understand that the rest would be an array of Rdkafka::Producer::DeliveryReport and individually can have an .error.

Thank you.

Unfortunate error message when missing ACLs

When the producer uses ruby-kafka and it is not authorized to publish messages to a certain topic (because of Kafka's ACLs) the Kafka::TopicAuthorizationFailed exception is raised:

Traceback (most recent call last):
	4: from produce_sasl.rb:16:in `<main>'
	3: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.9/lib/kafka/client.rb:152:in `deliver_message'
	2: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.9/lib/kafka/cluster.rb:145:in `partitions_for'
	1: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.9/lib/kafka/protocol/metadata_response.rb:134:in `partitions_for'
/Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.9/lib/kafka/protocol.rb:160:in `handle_error': Kafka::TopicAuthorizationFailed (Kafka::TopicAuthorizationFailed)

However when using waterdrop we get Kafka::DeliveryFailed with the message such as Failed to assign partitions to 1 messages in test-topic exception which hides the real reason behind the failure:

E, [2019-10-01T17:00:44.672771 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:44.672910 #35917]  WARN -- : Failed to send all messages to ; attempting retry 1 of 2 after 1s
E, [2019-10-01T17:00:45.687680 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:45.687861 #35917]  WARN -- : Failed to send all messages to ; attempting retry 2 of 2 after 1s
E, [2019-10-01T17:00:46.705185 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
E, [2019-10-01T17:00:46.705252 #35917] ERROR -- : Failed to send all messages to ; keeping remaining messages in buffer
E, [2019-10-01T17:00:46.720882 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:46.720940 #35917]  WARN -- : Failed to send all messages to ; attempting retry 1 of 2 after 1s
E, [2019-10-01T17:00:47.736301 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:47.736369 #35917]  WARN -- : Failed to send all messages to ; attempting retry 2 of 2 after 1s
E, [2019-10-01T17:00:48.750387 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
E, [2019-10-01T17:00:48.750574 #35917] ERROR -- : Failed to send all messages to ; keeping remaining messages in buffer
E, [2019-10-01T17:00:48.762185 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:48.762278 #35917]  WARN -- : Failed to send all messages to ; attempting retry 1 of 2 after 1s
E, [2019-10-01T17:00:49.785085 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:49.785191 #35917]  WARN -- : Failed to send all messages to ; attempting retry 2 of 2 after 1s
E, [2019-10-01T17:00:50.810873 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
E, [2019-10-01T17:00:50.811024 #35917] ERROR -- : Failed to send all messages to ; keeping remaining messages in buffer
Traceback (most recent call last):
	7: from produce_sasl_waterdrop.rb:28:in `<main>'
	6: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/bundler/gems/waterdrop-74b2c35bdd82/lib/water_drop/sync_producer.rb:19:in `call'
	5: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/bundler/gems/delivery_boy-d875e1e791cc/lib/delivery_boy.rb:28:in `deliver'
	4: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/bundler/gems/delivery_boy-d875e1e791cc/lib/delivery_boy/instance.rb:14:in `deliver'
	3: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/producer.rb:246:in `deliver_messages'
	2: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/instrumenter.rb:23:in `instrument'
	1: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/producer.rb:253:in `block in deliver_messages'
/Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/producer.rb:426:in `deliver_messages_with_retries': Failed to assign partitions to 1 messages (Kafka::DeliveryFailed)

I reproduced this issue locally on confluentinc/cp-kafka:5.1.2 after configuring SASL and ACLs as well as on Confluent Cloud Enterprise cluster.

Fix deprecations for Ruby 2.7.0 in Waterdrop 2.0

.........../home/mencio/Software/Github/karafka/waterdrop/lib/water_drop/producer/buffer.rb:95: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call
/home/mencio/Software/Github/karafka/waterdrop/.bundle/ruby/2.7.0/bundler/gems/rdkafka-ruby-d07c9595a063/lib/rdkafka/producer.rb:61: warning: The called method `produce' is defined here
..................................................................................................................................................................................../home/mencio/Software/Github/karafka/waterdrop/lib/water_drop/producer/async.rb:24: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call
/home/mencio/Software/Github/karafka/waterdrop/.bundle/ruby/2.7.0/bundler/gems/rdkafka-ruby-d07c9595a063/lib/rdkafka/producer.rb:61: warning: The called method `produce' is defined here
...../home/mencio/Software/Github/karafka/waterdrop/lib/water_drop/producer/async.rb:46: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call
/home/mencio/Software/Github/karafka/waterdrop/.bundle/ruby/2.7.0/bundler/gems/rdkafka-ruby-d07c9595a063/lib/rdkafka/producer.rb:61: warning: The called method `produce' is defined here
....................................................../home/mencio/Software/Github/karafka/waterdrop/lib/water_drop/producer/sync.rb:28: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call

Missing config options

waterdrop documentation has options to setup kafka.ssl.ca_cert and kafka.ssl.client_cert, but Waterdrop::Config is missing these options under setting :kafka. it only allows setting kafka.hosts as a result.

This raises an error in the waterdrop config initializer:

/config/initializers/waterdrop.rb:7:in `block in <top (required)>': undefined method `ssl' for #<#<Class:0x007fd3912c0580>:0x007fd3912c0350> (NoMethodError)

Waterdrop's Performance Advantage?

Hello! Came by to ask the advantages of this Kafka client compared to the others. I'm looking for a performant producer which I can't achieve in my current producer client unfortunately.

Some of my questions:

  • do you have some performance metrics to easily compare at a glance?
  • what was your main motivation for this gem? How is it different from the others?
  • what was the best producer throughput that you have ever achieved?
  • why did you shift to rdkafka?

Thanks in advance.

Can't use waterdrop in aspect way

After removing to_json from message:

@message = message.respond_to?(:to_json) ? message.to_json : message.to_s

We can not send messages using aspects. Even if I manually change message which want to send to json, waterdrop send hash using Formatter class:

def message
  {
    topic:  @options[:topic],
    method: @options[:method],
    message: @result,
    args: @args
  }
end

Then appear error:
NoMethodError: undefined method `bytesize' for #Hash:0x007fe5841235d8

New release

Any chance we could get a new release for #109 ? I'd love to not have to fork this until there's another one!

Cheers 🍻

Sender option

I think it's useful to know who(what kind of app) was a sender. Of course, I can just add sender: :api_name to message, but I'd be great to have it on waterdrop internals level.

Logger for Aspector

Each time I start waterdrop I receive those messages in console:

2015-12-28 12:42:13 +0100 | Aspector | INFO  | WaterDrop::Aspects::AfterAspect | define-advice | advice 1: AFTER [#<Aspector::DeferredOption:0x007fae0a137b70 @key=:method>] DO stuff in block WITH OPTIONS {:result_arg=>true, :interception_arg=>true}
2015-12-28 12:42:13 +0100 | Aspector | INFO  | WaterDrop::Aspects::AroundAspect | define-advice | advice 1: AROUND [#<Aspector::DeferredOption:0x007fae0a134f88 @key=:method>] DO stuff in block WITH OPTIONS {:interception_arg=>true}
2015-12-28 12:42:13 +0100 | Aspector | INFO  | WaterDrop::Aspects::BeforeAspect | define-advice | advice 1: BEFORE [#<Aspector::DeferredOption:0x007fae0a13ee98 @key=:method>] DO stuff in block WITH OPTIONS {:interception_arg=>true, :skip_if_false=>false}

I think there should be a way to resolve it without configuring aspector directly in my app.

ssl_verify_hostname sync with ruby-kafka/delivery_boy

Hello,
(Follow up from this thread

Here are the specs i'm using in my app: karafka 1.2.13, ruby-kafka 0.7.9, rails 5.2.3, ruby 2.6.3

Karafka depends on waterdrop which depends on delivery_boy.
There is a new option added quite recently in ruby-kafka: ssl_verify_hostname which defaults to true ( both consumers and producers). On the consumer side, everything works good, karafka is in sync with ruby-kafka. And this can be disabled by changing the config in karafka.rb

I have a similar error message as the one described in this issue.
OpenSSL::SSL::SSLError: SSL_connect returned=1 errno=0 state=error: certificate verify failed (unspecified certificate verification error)

Basically the idea is to sync waterdrop the same way karafka was synced by changing the "dry config" file and adding the same setting (ssl_verify_hostname). so that when the async producer is created, it can pass a different value other than the default.

Now delivery_boy has this "fix" but only on master (not released yet :/).

So to make this work with karafka, all I had to do was fork delivery_boy, change the default from true to false, until delivery_boy releases the patch, and waterdrop gem bumps delivery boy. and karafka gem bumps waterdrop lol.

Add some basic validations of the kafka scope of the config

Unfortunately, we need to add basic rdkafka validations as for seed brokers, those fail silently async.

Let's start simple: seed brokers should have a format like so: host1:9092,host2:9092,192.168.1.2:9091. String with coma separated hosts with ports

Kafka::DeliveryFailed: Failed to send messages to topic/partition

Keep getting this error. Not able to send messages to kafka cluster.

This is what I'm testing:

message = WaterDrop::Message.new('hits', {test: 76}.to_json)
message.send!

And this is what it results in:

Kafka::DeliveryFailed: Failed to send messages to hits/0
from /path/to/gem/ruby-kafka-0.3.15/lib/kafka/producer.rb:332:in `deliver_messages_with_retries'

WaterDrop.config

#<#<Class:0x007ffc8356a090>:0x007ffc835698e8
 @config={:connection_pool_timeout=>5, :send_messages=>true, :raise_on_failure=>true, :connection_pool_size=>25, :kafka=>#<Dry::Configurable::NestedConfig:0x007ffc850ca768 @klass=#<Class:0x007ffc850ca740>>}>

WaterDrop.config.kafka.hosts

["kafka://ubuntudock:9092", "kafka://ubuntudock:9093", "kafka://ubuntudock:9094"]

Result of operation.execute from /ruby-kafka-0.3.15/lib/kafka/producer.rb:`deliver_messages_with_retries'

Attempt 1
{#<Kafka::Broker:0x007ffe1e811a90 @connection=#<Kafka::Connection:0x007ffe1e811bf8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1e858a80 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe1e811860 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe1e8125f8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:05 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 2
{#<Kafka::Broker:0x007ffe1e811a90 @connection=#<Kafka::Connection:0x007ffe1e811bf8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1e858a80 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe187eeb68 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe1e8125f8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:05 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 3
{#<Kafka::Broker:0x007ffe1e811a90 @connection=#<Kafka::Connection:0x007ffe1e811bf8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1e858a80 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe1d880590 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe1e8125f8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:05 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 1
{#<Kafka::Broker:0x007ffe187833b8 @connection=#<Kafka::Connection:0x007ffe187834a8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1878db10 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe18782ff8 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe18783bd8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:08 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 2
{#<Kafka::Broker:0x007ffe187833b8 @connection=#<Kafka::Connection:0x007ffe187834a8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1878db10 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe17691f50 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe18783bd8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:08 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 3
{#<Kafka::Broker:0x007ffe187833b8 @connection=#<Kafka::Connection:0x007ffe187834a8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1878db10 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe16d24698 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe18783bd8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:08 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Kafka::DeliveryFailed: Failed to send messages to hits/0
from /path/to/gem/ruby-2.3.3@goliath/gems/ruby-kafka-0.3.15/lib/kafka/producer.rb:332:in `deliver_messages_with_retries'

Ability to validate successful responder call

Is your feature request related to a problem? Please describe.

I want the ability to confirm that a message was successfully produced to the topic on the broker. I don't care or want to rely on an acknowledgement from a downstream consumer. I simply wish to know if I successfully posted in the first place with some sort of return value or callback. It looks like neither responders nor WaterDrop::SyncProducer provide return values to validate the successful delivery of your message to the topic.

Describe the solution you'd like

Consider the scenario where perhaps the topic does not exist or there's an ACL error, etc.

Topic does not exist:

irb(main):001:0> WaterDrop::SyncProducer.call({'test' => 'test'}.to_json, topic: 'not.a.real.topic')
I, [2019-11-21T17:05:45.387698 #2436]  INFO -- : New topics added to target list: not.a.real.topic
I, [2019-11-21T17:05:45.388066 #2436]  INFO -- : Fetching cluster metadata from kafka://broker.kafka:9093
I, [2019-11-21T17:05:51.377430 #2436]  INFO -- : Discovered cluster metadata; nodes: broker2.kafka:9093 (node_id=218), broker3.kafka:9093 (node_id=254), broker1.kafka:9093 (node_id=175)
E, [2019-11-21T17:05:51.378117 #2436] ERROR -- : Failed to assign partitions to 1 messages in not.a.real.topic
W, [2019-11-21T17:05:51.378265 #2436]  WARN -- : Failed to send all messages to ; attempting retry 1 of 2 after 1s
I, [2019-11-21T17:05:52.380422 #2436]  INFO -- : Fetching cluster metadata from kafka://broker.kafka:9093
I, [2019-11-21T17:05:52.692198 #2436]  INFO -- : Discovered cluster metadata; nodes: broker3.kafka:9093 (node_id=254), broker2.kafka:9093 (node_id=218), broker1.kafka:9093 (node_id=175)
I, [2019-11-21T17:05:52.692766 #2436]  INFO -- : Sending 1 messages to broker1.kafka:9093 (node_id=175)
=> nil

We can see above that ruby kafka logs out some useful information denoting that the message fails to send. But functionality which should behave one way or another based on if the message successfully sent, has no way to rollback if this is the case. Ideally I'd want something to the effect of:

ActiveRecord::Base.transaction do
  # modify some local data

  # notify downstream services of this change
  responder_call = Responders::MyCoolResponder.call(message)
  
  # rollback i.e. do not commit local changes if unable to publish message for downstream 
  consumption
  raise ActiveRecord::Rollback unless responder_call.successful?
end

Describe alternatives you've considered

I could in theory convert this to a fully async process which then waits for the consuming service to publish an acknowledgement message to a topic I consume from. But in a workflow like this I don't actually care or need to take action based on the downstream systems receiving it. I simply wish to know that I posted the original message to a topic for their consumption. The current error handling for responders as I understand it protects against responding to a topic which has not been registered or total broker failures but not whether or not you can successfully produce a message in an acks=all model.

Release 2.0-beta1

We're using it for months now. I think it's time to craft the 2.0 release.

kafka_host, kafka_hosts and kafka_ports options

The problem is that Karafka and Waterdrop requires options in different format. Karafka wants options in this format:

config.kafka_hosts = ["123.123.123.23:9092"]

and waterdrop in this one:

config.kafka_host = "123.123.123.23"
config.kafka_ports = ["9092"]

I feel that it's not unified enough and there should be the same option names and structure. Then I'd be able to use the same settignslogic namespace for both karafka and waterdrop.

Introduce batch API

I would like to be able to provide a batch of messages that are suppose to be sent, not looping and sending one after another for both sync and async

Cannot send message with producer

I have followed the example app instructions and gotten the Ping Pong game to work. However, when I try to send a message with either WaterDrop::SyncProducer.call('message', topic: 'my-topic') or WaterDrop::AsyncProducer.call('message', topic: 'my-topic'), I receive the error syntax error near unexpected token 'message','.

How do I resolve this?

Motivation/example

Sometimes when you look at a project with very little context, you want to know what problem it solves from 1000 meters (metaphor).

I sometimes add a "Motivation" section which explains what problem I was trying to solve, and sometimes include a short example of code in the usage, e.g. here is a particular problem you can solve and here is the code to do it. Not just a single line, but an actual working script.

Finally people coming to your project may not know what Kafka is (I don't). So linking to it and explaining it in half a sentence could be great, e.g.

Waterdrop is a messaging system for Kafka (link), which is an open source job processing tool.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.