Giter Site home page Giter Site logo

karafka / rdkafka-ruby Goto Github PK

View Code? Open in Web Editor NEW
336.0 12.0 117.0 21.59 MB

Modern and performant Kafka client library for Ruby based on librdkafka

Home Page: https://karafka.io

License: Other

Ruby 100.00%
ruby kafka librdkafka apache-kafka apache-kafka-consumer apache-kafka-producer

rdkafka-ruby's Introduction

karafka logo

Build Status Gem Version Join the chat at https://slack.karafka.io

About Karafka

Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework that:

# Define what topics you want to consume with which consumers in karafka.rb
Karafka::App.routes.draw do
  topic 'system_events' do
    consumer EventsConsumer
  end
end

# And create your consumers, within which your messages will be processed
class EventsConsumer < ApplicationConsumer
  # Example that utilizes ActiveRecord#insert_all and Karafka batch processing
  def consume
    # Store all of the incoming Kafka events locally in an efficient way
    Event.insert_all messages.payloads
  end
end

Karafka uses threads to handle many messages simultaneously in the same process. It does not require Rails but will integrate tightly with any Ruby on Rails applications to make event processing dead simple.

Getting started

karafka web ui

If you're entirely new to the subject, you can start with our "Kafka on Rails" articles series, which will get you up and running with the terminology and basic ideas behind using Kafka:

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to visit our Getting started guides and the example apps repository.

We also maintain many integration specs illustrating various use-cases and features of the framework.

TL;DR (1 minute from setup to publishing and consuming messages)

Prerequisites: Kafka running. You can start it by following instructions from here.

  1. Add and install Karafka:
# Make sure to install Karafka 2.4
bundle add karafka --version ">= 2.4.0"

bundle exec karafka install
  1. Dispatch a message to the example topic using the Rails or Ruby console:
Karafka.producer.produce_sync(topic: 'example', payload: { 'ping' => 'pong' }.to_json)
  1. Run Karafka server and see the consumption magic happen:
bundle exec karafka server

[86d47f0b92f7] Polled 1 message in 1000ms
[3732873c8a74] Consume job for ExampleConsumer on example started
{"ping"=>"pong"}
[3732873c8a74] Consume job for ExampleConsumer on example finished in 0ms

Want to Upgrade? LGPL is not for you? Want to help?

I also sell Karafka Pro subscriptions. It includes a commercial-friendly license, priority support, architecture consultations, enhanced Web UI and high throughput data processing-related features (virtual partitions, long-running jobs, and more).

10% of the income will be distributed back to other OSS projects that Karafka uses under the hood.

Help me provide high-quality open-source software. Please see the Karafka homepage for more details.

Support

Karafka has Wiki pages for almost everything and a pretty decent FAQ. It covers the installation, setup, and deployment, along with other useful details on how to run Karafka.

If you have questions about using Karafka, feel free to join our Slack channel.

Karafka has priority support for technical and architectural questions that is part of the Karafka Pro subscription.

rdkafka-ruby's People

Contributors

abicky avatar aboutnisblee avatar breunigs avatar bruce-szalwinski-he avatar colindkelley avatar dmexe avatar ferrous26 avatar gaffneyc avatar geoff2k avatar gremerritt avatar gvisokinskas avatar jvortmann avatar jychen7 avatar koenrh avatar leemeichin avatar leonmaia avatar maeve avatar malandrina avatar mensfeld avatar methodmissing avatar mgrosso avatar mjkillough avatar mollyegibson avatar nijikon avatar nurse avatar piotaixr avatar renovate[bot] avatar robbiepaul avatar thijsc avatar tombruijn avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rdkafka-ruby's Issues

Unable to install rdkafka-0.3.4 - error during native extension

I am unable to install gem rdkafka-0.3.4 --local, failure comes when native extension is being built. The process attempts to download file from github. The host where I am trying to install this can pull file down from internet during install. Is there any work-around where I can provide file needed locally?

> td-agent-enterprise-gem install rdkafka-0.3.4.gem --local                                                                         
Building native extensions.  This could take a while...                                                                                                                                                                                         
ERROR:  Error installing rdkafka-0.3.4.gem:                                                                                                                                                                                                     
        ERROR: Failed to build gem native extension.                                                                                                                                                                                            
                                                                                                                                                                                                                                                
    current directory: /opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/gems/rdkafka-0.3.4/ext                                                                                                                                             
/opt/td-agent-enterprise/embedded/bin/ruby -rubygems /opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/gems/rake-12.0.0/exe/rake RUBYARCHDIR=/opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/extensions/x86_64-linux/2.4.0/rdkafka-0.3
.4 RUBYLIBDIR=/opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/extensions/x86_64-linux/2.4.0/rdkafka-0.3.4                                                                                                                                 
2 retrie(s) left for v0.11.3.tar.gz                                                                                                                                                                                                             
1 retrie(s) left for v0.11.3.tar.gz                                                                                                                                                                                                             
0 retrie(s) left for v0.11.3.tar.gz                                                                                                                                                                                                             
execution expired                                                                                                                                                                                                                               
Extracting v0.11.3.tar.gz into tmp/x86_64-redhat-linux/ports/librdkafka/0.11.3... ERROR, review '/opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/gems/rdkafka-0.3.4/ext/tmp/x86_64-redhat-linux/ports/librdkafka/0.11.3/extract.log' to se
e what happened. Last lines are:                                                                                                                                                                                                                
========================================================================                                                                                                                                                                        
tar (child): ports/archives/v0.11.3.tar.gz: Cannot open: No such file or directory                                                                                                                                                              
tar (child): Error is not recoverable: exiting now                                                                                                                                                                                              
gtar: Child returned status 2                                                                                                                                                                                                                   
gtar: Error is not recoverable: exiting now                                                                                                                                                                                                     
========================================================================                                                                                                                                                                        
rake aborted!                                                                                                                                                                                                                                   
Failed to complete extract task                                                                                                                                                                                                                 
                                                                                                                                                                                                                                                
Tasks: TOP => default                                                                                                                                                                                                                           
(See full trace by running task with --trace)                                                                                                                                                                                                   
                                                                                                                                                                                                                                                
rake failed, exit code 1                                                                                                                                                                                                                        
                                                                                                                                                                                                                                                
Gem files will remain installed in /opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/gems/rdkafka-0.3.4 for inspection.                                                                                                                     
Results logged to /opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/extensions/x86_64-linux/2.4.0/rdkafka-0.3.4/gem_make.out                                                                                                                
root@docker[2d107b36627c]:/apps/sre/pcx/tools/plugins/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/cache>         

Error compiling librdkafka

I tried using this gem but I got this error message when compiling:

Downloading v0.11.5.tar.gz ( 98%) 
Downloading v0.11.5.tar.gz (100%)
Extracting v0.11.5.tar.gz into tmp/x86_64-apple-darwin17.7.0/ports/librdkafka/0.11.5... OK
Running 'configure' for librdkafka 0.11.5... OK
Running 'compile' for librdkafka 0.11.5... ERROR, review '/Users/thedude/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/bundler/gems/rdkafka-ruby-86ee6932b2e2/ext/tmp/x86_64-apple-darwin17.7.0/ports/librdkafka/0.11.5/compile.log' to see
what happened. Last lines are:
========================================================================
      _rd_kafka_sasl_scram_conf_validate in rdkafka_sasl_scram.o
  "_SHA512", referenced from:
      _rd_kafka_sasl_scram_conf_validate in rdkafka_sasl_scram.o
  "_X509_STORE_set_flags", referenced from:
      _rd_kafka_transport_ssl_ctx_init in rdkafka_transport.o
  "_X509_free", referenced from:
      _rd_kafka_transport_ssl_ctx_init in rdkafka_transport.o
      _rd_kafka_transport_io_serve in rdkafka_transport.o
  "_X509_new", referenced from:
      _rd_kafka_transport_ssl_ctx_init in rdkafka_transport.o
  "_X509_verify_cert_error_string", referenced from:
      _rd_kafka_transport_io_serve in rdkafka_transport.o
  "_d2i_PKCS12_fp", referenced from:
      _rd_kafka_transport_ssl_ctx_init in rdkafka_transport.o
  "_sk_pop_free", referenced from:
      _rd_kafka_transport_ssl_ctx_init in rdkafka_transport.o
ld: symbol(s) not found for architecture x86_64
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make[1]: *** [librdkafka.1.dylib] Error 1
make: *** [libs] Error 2
========================================================================
rake aborted!
Failed to complete compile task

Tasks: TOP => default
(See full trace by running task with --trace)

rake failed, exit code 1

Gem files will remain installed in /Users/thedude/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/bundler/gems/rdkafka-ruby-86ee6932b2e2 for inspection.
Results logged to /Users/thedude/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/bundler/gems/extensions/x86_64-darwin-17/2.5.0-static/rdkafka-ruby-86ee6932b2e2/gem_make.out

Using latest mac os and xcode. Also I got the same message on the release and latest master.

RdkafkaError should have message with more information

For example when querying watermarks the error message is this for example:

Local: Unknown partition (unknown_partition)

This doesn't tell you the topic and offset, which would be very handy to debug. RdkafkaError should have an optional extra message to add this kind of context when appropriate.

Rollback the offset if an error is raised during processing

In the following code, the next offset is fetched even though an error is raised during processing of the previous message.

Should we change this behavior and rescue any exceptions to mark it as unprocessed even with auto commits or leave this to the users?

def rdkafka_config(config_overrides={})
  config = {
    :"api.version.request" => false,
    :"broker.version.fallback" => "1.0",
    :"bootstrap.servers" => "localhost:9092",
    :"group.id" => "ruby-test-#{Random.new.rand(0..1_000_000)}",
    :"auto.offset.reset" => "earliest",
    :"enable.partition.eof" => false,
  }
  if ENV["DEBUG_PRODUCER"]
    config[:debug] = "broker,topic,msg"
  elsif ENV["DEBUG_CONSUMER"]
    config[:debug] = "cgrp,topic,fetch"
  end
  config.merge!(config_overrides)
  Rdkafka::Config.new(config)
end

producer = rdkafka_config.producer
consumer = rdkafka_config.consumer

topic = "test1234"
consumer.subscribe(topic)

4.times { |i| producer.produce(topic: topic, payload: (i+1).to_s) }

consumer.each  {|m| p m; raise }

Random installation failures

I was attempting to install rdkafka-ruby with gem install rdkafka -v '0.3.5'. It failed several times in a row with:

Building native extensions. This could take a while...
ERROR:  Error installing rdkafka:
	ERROR: Failed to build gem native extension.

    current directory: /home/lukas/.rvm/gems/ruby-2.5.0/gems/rdkafka-0.3.5/ext
/usr/share/rvm/rubies/ruby-2.5.0/bin/ruby -rrubygems /home/lukas/.rvm/gems/ruby-2.5.0@global/gems/rake-12.3.0/exe/rake RUBYARCHDIR=/home/lukas/.rvm/gems/ruby-2.5.0/extensions/x86_64-linux/2.5.0/rdkafka-0.3.5 RUBYLIBDIR=/home/lukas/.rvm/gems/ruby-2.5.0/extensions/x86_64-linux/2.5.0/rdkafka-0.3.5
2 retrie(s) left for v0.11.3.tar.gz
1 retrie(s) left for v0.11.3.tar.gz
0 retrie(s) left for v0.11.3.tar.gz
nil can't be coerced into Integer
Extracting v0.11.3.tar.gz into tmp/x86_64-linux-gnu/ports/librdkafka/0.11.3... ERROR, review '/home/lukas/.rvm/gems/ruby-2.5.0/gems/rdkafka-0.3.5/ext/tmp/x86_64-linux-gnu/ports/librdkafka/0.11.3/extract.log' to see what happened. Last lines are:
========================================================================
tar (child): ports/archives/v0.11.3.tar.gz: Cannot open: No such file or directory
tar (child): Error is not recoverable: exiting now
tar: Child returned status 2
tar: Error is not recoverable: exiting now
========================================================================
rake aborted!
Failed to complete extract task
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:400:in `block in execute'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:371:in `chdir'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:371:in `execute'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:365:in `extract_file'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:61:in `block in extract'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:60:in `each'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:60:in `extract'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:150:in `cook'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/rdkafka-0.3.5/ext/Rakefile:10:in `block in <top (required)>'
/home/lukas/.rvm/gems/ruby-2.5.0@global/gems/rake-12.3.0/exe/rake:27:in `<main>'
Tasks: TOP => default
(See full trace by running task with --trace)

rake failed, exit code 1

Gem files will remain installed in /home/lukas/.rvm/gems/ruby-2.5.0/gems/rdkafka-0.3.5 for inspection.
Results logged to /home/lukas/.rvm/gems/ruby-2.5.0/extensions/x86_64-linux/2.5.0/rdkafka-0.3.5/gem_make.out

Out of the blue it ended up working with the same command:

gem install rdkafka -v '0.3.5'
Building native extensions. This could take a while...
Successfully installed rdkafka-0.3.5
Parsing documentation for rdkafka-0.3.5
Installing ri documentation for rdkafka-0.3.5
Done installing documentation for rdkafka after 0 seconds
1 gem installed

It may or may not be worth noting that I'm running elementary OS 0.4.1 Loki (Ubuntu).

New release with changes in master

Would it be possible to make a new release from master? I'm particularly interested in the changes from #31, which haven't been released yet.

We're currently depending on rdkafka-ruby using a git dependency in our Gem file, but would prefer to fetch it from RubyGems as we start to use it in more services.

Thanks again for doing all the work to maintain this! It's a huge help. :-)

Poll in batches

Handling the offset right is cumbersome. Especially, when I want to consume a few messages, do a batch-insert into the database and don't want an offset change before the batch-insert has succeeded. To do so I've disabled the auto.commit and do the commit manually after the batch got processed successfully (see zendesk/racecar@d00dbc7).

This can probably be implemented using the rd_kafka_consume_batch binding, which allows to auto commit after a callback got processed successfully.

Allow establishing TCP connection for producer before the delivery of a first message

When starting to produce messages, there's a certain delay upon establishing the TCP connection with Kafka. It would be really good if this library if possible provide a way to have this connection established prior to sending the first message.

For a high volume system with a background producer, this delay can cause buffer overflows when a background queue is enough to handle the regular traffic but not enough when the initial delay is introduced.

Release new version

Hi!

Is it possible to release a new version? The last version was released on September according to RubyGems.

We are currently using the master version (because of #86)

Support Time for timestamp attribute on produce

I tried to produce messages with timestamps but timestamp needs to be an integer:

2018-03-07 13:01:19 - TypeError - no implicit conversion of Time into Integer:
        /usr/local/bundle/gems/ffi-1.9.23/lib/ffi/variadic.rb:56:in `invoke'
        /usr/local/bundle/gems/ffi-1.9.23/lib/ffi/variadic.rb:56:in `call'
        (eval):3:in `rd_kafka_producev'

I would expect this library to support Time instances instead of Integers (which should be millisecond unix timestamps I guess?).

Producing not working + crash for forked processes

A C rdkafka instance does not survive a fork. Producing or polling does not work and rd_kafka_destroy has a failing assertion that crashes the process when it is called via https://github.com/appsignal/rdkafka-ruby/blob/master/lib/rdkafka/config.rb#L149

A fix to not let rd_kafka_destroy crash in this scenario was added in librdkafka: confluentinc/librdkafka@8c67e42

We should move to this release when it's there end of Februari. For the failing produces and polls I see a few options:

  • Raise an exception if our pid changed
  • Recreate the rdkafka instance if our pid changed
  • Add a forked hook you need to call after forking, possibly with a Unicorn integration

Consumer defaults encourage bad design

While the producer defaults and examples correctly encourage batching out of the gate, the current consumer defaults (and all readily accessible examples) encourage a non-batch design. These defaults and examples should be updated to support the 99% use case, which is batching.

Encouraging handling messages one at a time, even if the polling piece is batched under the hood, in turn encourages secondary side effects like writing to a database, writing to another kafka topic, etc to be performed one at a time rather than in batches, which would be a bad design choice for users of this library.

I propose rethinking the defaults / examples of the consumer to be batch-based, like those of the producer.

Delivery callback called with an invalid error when producing without ACL permissions

Version: current master

Reproduction:

config = {
    :'bootstrap.servers' => '127.0.0.1:9092',
    :'request.required.acks' => 1,
    :'sasl.mechanisms' => 'PLAIN',
    :'sasl.username' => 'edward',
    :'sasl.password' => 'edward',
    :'security.protocol' => 'sasl_plaintext',
    :'message.timeout.ms' => 100
  }
producer = Rdkafka::Config.new(config).producer
delivery_handles = []

100.times do |i|
  puts "Producing message #{i}"
  delivery_handles << producer.produce(
      topic:   "ruby-test-topic",
      payload: "Payload #{i}",
      key:     "Key #{i}"
  )
end

delivery_handles.each(&:wait)

Rdkafka::RdkafkaError (Local: Message timed out (msg_timed_out))

We never get a proper failure (something like authorization failed). Instead, we /librdkafka wait forever (till the message.timeout.ms).

This behavior can be really confusing as one would expect to get a clear error message indicating a lack of permissions.

ref karafka/waterdrop#108

rwlock_wrlock assertion fails on Mac OS

I'm using the following docker-compose.yml

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka1:
    image: wurstmeister/kafka:1.0.1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.xx.xx.xxx
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "consume_test_topic:3:1,empty_test_topic:3:1,load_test_topic:3:1,produce_test_topic:3:1,rake_test_topic:3:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  kafka2:
    image: wurstmeister/kafka:1.0.1
    ports:
      - "9093:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.xx.xx.xxx
      KAFKA_ADVERTISED_PORT: 9093
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "consume_test_topic:3:1,empty_test_topic:3:1,load_test_topic:3:1,produce_test_topic:3:1,rake_test_topic:3:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

and then running the specs with bundle exec rspec and I'm getting the following error

Assertion failed: (r == 0), function rwlock_wrlock, file tinycthread.c, line 1019.

The issue is happening in the spec/rdkafka/consumer_spec.rb. I'm running MacOS 10.12.6.

Ability to pause partitions

When there is an business logic error (e.g. due to unexpected message content) I would like to make progress at least on other partitions. Therefore I propose the following pseudo-code as a draft, implementing the pause method on Rdkafka::Consumer:

loop do
  message = consumer.poll
  process(message) if message
rescue Rdkafka::RdkafkaError => e
  raise unless e.is_partition_eof?
rescue => e
  consumer.pause(message.topic, message.partition)
end

For reference:

Consumer.each does not return after call to consumer.close

Giving rdkafka-ruby a try and I was surprised that consumer.each didn't return after calling consumer.close or consumer.unsubscribe.

config = Rdkafka::Config.new({
  "bootstrap.servers": brokers,
  "security.protocol": "ssl",
  "ssl.ca.location": File.expand_path("../ca.pem", __FILE__),
  "ssl.certificate.location": File.expand_path("../client.pem", __FILE__),
  "ssl.key.location": File.expand_path("../client.key", __FILE__),
  "group.id": prefix + "rdkafka-test",
  "enable.partition.eof": false,
})

consumer = config.consumer
consumer.subscribe("topic")

trap("INT")  { consumer.close }
trap("TERM") { consumer.unsubscribe }

# Never exits
consumer.each do |m|
  puts m.inspect
end

It looks like Consumer#each is an infinite loop and never checks to see if the connection is still open or valid.

See: https://github.com/appsignal/rdkafka-ruby/blob/master/lib/rdkafka/consumer.rb#L244

How are offset commits handled?

It looks, from reading the code, like Consumer#each does not do any committing -- if I call Consumer#commit with no arguments... what gets committed?

Get position of a consumer

Support getting the position of a consumer, rd_kafka_position in librdkafka. Result should be supported in lag.

Consume In Batches API

I know that it's currently possible to consume in batches poll in batches issue; however, I believe the API for this could be significantly improved. One of the main draws of this library was the simplicity with which a Kafka consumer could be implemented. Consuming messages individually has become too slow for my current use case. I began the transition to consuming in batches, and have found the work required to be more challenging than I feel is strictly necessary.

I think the users of this library would find an API similar to the code below far superior to the current interface.

consumer.each_in_batches(<batch size>, <timeout>) do |batch|
  batch.each |message|
     puts "Message received: #{message}"
  end   
end
  • batch size -> The number of messages to be consumed before the block of code is run.
  • timeout -> The time in milliseconds waited before a batch of greater than 0 messages and less than batch size messages is passed to the code block.
  • It would be expected that any errors that occur would propagate up and the offset for those messages would not be committed.

dyld: Symbol not found: _timespec_get

I'm getting the following error when trying to open a rails console in a project with rdkafka 0.6.0 and macOS 10.14.6 .

Could it be related to a recent macOS update from 10.14.4/10.14.5?

  Referenced from: /Users/USER/.rvm/gems/ruby-2.6.0@service/gems/rdkafka-0.6.0/lib/rdkafka/../../ext/librdkafka.dylib
  Expected in: /usr/lib/dyld: lazy symbol binding failedlibSystem.B.dylib

: Symbol not found: _timespec_get
  Referenced from: /Users/USERdyld: Symbol not found: _timesperUSER/.rvm/gems/ruby-2.6.0@indc_get
  Referenced from: /Users/exing-service/gems/rdkafka-0.6.0USER/.rvm/gems/ruby-2.6.0/lib/rdkafka/../../ext/librdkafk@service/gems/rdkafka-0a.dylib
  Expected in: /usr/lib/.6.0/lib/rdkafka/../../ext/librdlibSystem.B.dylib

kafka.dylib
  Expected in: /usr/lib/libSystem.B.dylib

dyld: Symbol not found: _timespec_get
  Referenced from: /Users/USER/.rvm/gems/ruby-2.6.0@indexing-service/gems/rdkafka-0.6.0/lib/rdkafka/../../ext/librdkafka.dylib
  Expected in: /usr/lib/libSystem.B.dylib

dyld: lazy symbol binding failedAbort trap: 6```

About performance

I recently did some tests for performance, but the results were very unsatisfactory.

My config file is as follows:
"batch.num.messages": 1000000,
"queue.buffering.max.ms": 15000,
"request.required.acks": 1,
"queue.buffering.max.messages":10000000,
"queue.buffering.max.kbytes": 4000000,
"socket.keepalive.enable": true,
"socket.send.buffer.bytes": 0,
"socket.blocking.max.ms": 2,
"message.max.bytes": 1000000000

The producer and kafka server are on the same network, so there is no network delay,
The test loads 400,000 pieces of data from a file, each piece of data is about 500 bytes, the average transmission time of the test results is 35s, and only more than 10,000 pieces of data are processed per second, and I found from the trace log print that most of the time is used for data After processing, it will take very little time to send to Kafka after packaging into a MessageSet. Does anyone have experience in this area, please help

Building Native Extensions Fails on Alpine 3.8

Terminal Output:

bash-4.4$ gem install rdkafka --version="0.6.0"
Building native extensions. This could take a while...
/usr/lib/ruby/2.5.0/rubygems/ext/builder.rb:76: warning: Insecure world writable dir /usr/local/bundle in PATH, mode 040777
ERROR:  Error installing rdkafka:
	ERROR: Failed to build gem native extension.

    current directory: /usr/local/bundle/gems/rdkafka-0.6.0/ext
/usr/bin/ruby -rrubygems /usr/local/bundle/gems/rake-12.3.2/exe/rake RUBYARCHDIR=/usr/local/bundle/extensions/x86_64-linux/2.5.0/rdkafka-0.6.0 RUBYLIBDIR=/usr/local/bundle/extensions/x86_64-linux/2.5.0/rdkafka-0.6.0
/usr/local/bundle/gems/mini_portile2-2.4.0/lib/mini_portile2/mini_portile.rb:351: warning: Insecure world writable dir /usr/local/bundle in PATH, mode 040777

Extracting v1.1.0 into tmp/x86_64-alpine-linux-musl/ports/librdkafka/1.1.0... OK
Running 'configure' for librdkafka 1.1.0... OK
Running 'compile' for librdkafka 1.1.0... ERROR, review '/usr/local/bundle/gems/rdkafka-0.6.0/ext/tmp/x86_64-alpine-linux-musl/ports/librdkafka/1.1.0/compile.log' to see what happened. Last lines are:
========================================================================
Creating static library librdkafka++.a
ar rcs librdkafka++.a RdKafka.o ConfImpl.o HandleImpl.o ConsumerImpl.o ProducerImpl.o KafkaConsumerImpl.o TopicImpl.o TopicPartitionImpl.o MessageImpl.o HeadersImpl.o QueueImpl.o MetadataImpl.o
Creating librdkafka++.so symlink
rm -f "librdkafka++.so" && ln -s "librdkafka++.so.1" "librdkafka++.so"
Generating pkg-config file rdkafka++.pc
Generating pkg-config file rdkafka++-static.pc
Checking librdkafka++ integrity
librdkafka++.so.1              OK
librdkafka++.a                 OK
make[1]: Leaving directory '/usr/local/bundle/gems/rdkafka-0.6.0/ext/tmp/x86_64-alpine-linux-musl/ports/librdkafka/1.1.0/librdkafka-1.1.0/src-cpp'
make -C examples
make[1]: Entering directory '/usr/local/bundle/gems/rdkafka-0.6.0/ext/tmp/x86_64-alpine-linux-musl/ports/librdkafka/1.1.0/librdkafka-1.1.0/examples'
gcc -g -O2 -fPIC -Wall -Wsign-compare -Wfloat-equal -Wpointer-arith -Wcast-align  -I../src rdkafka_example.c -o rdkafka_example  \
	../src/librdkafka.a -lm -lssl  -lcrypto  -L/lib -lz  -ldl -lpthread -lrt -lpthread -lrt
../src/librdkafka.a(rdkafka_ssl.o): In function `rd_kafka_transport_ssl_set_endpoint_id':
/usr/local/bundle/gems/rdkafka-0.6.0/ext/tmp/x86_64-alpine-linux-musl/ports/librdkafka/1.1.0/librdkafka-1.1.0/src/rdkafka_ssl.c:429: undefined reference to `SSL_set1_host'
collect2: error: ld returned 1 exit status
make[1]: *** [Makefile:18: rdkafka_example] Error 1
make[1]: Leaving directory '/usr/local/bundle/gems/rdkafka-0.6.0/ext/tmp/x86_64-alpine-linux-musl/ports/librdkafka/1.1.0/librdkafka-1.1.0/examples'
make: *** [Makefile:42: examples] Error 2
========================================================================
rake aborted!
Failed to complete compile task
/usr/local/bundle/gems/mini_portile2-2.4.0/lib/mini_portile2/mini_portile.rb:402:in `block in execute'
/usr/local/bundle/gems/mini_portile2-2.4.0/lib/mini_portile2/mini_portile.rb:373:in `chdir'
/usr/local/bundle/gems/mini_portile2-2.4.0/lib/mini_portile2/mini_portile.rb:373:in `execute'
/usr/local/bundle/gems/mini_portile2-2.4.0/lib/mini_portile2/mini_portile.rb:115:in `compile'
/usr/local/bundle/gems/mini_portile2-2.4.0/lib/mini_portile2/mini_portile.rb:154:in `cook'
/usr/local/bundle/gems/rdkafka-0.6.0/ext/Rakefile:36:in `block in <top (required)>'
/usr/local/bundle/gems/rake-12.3.2/exe/rake:27:in `<main>'
Tasks: TOP => default
(See full trace by running task with --trace)

rake failed, exit code 1

Gem files will remain installed in /usr/local/bundle/gems/rdkafka-0.6.0 for inspection.
Results logged to /usr/local/bundle/extensions/x86_64-linux/2.5.0/rdkafka-0.6.0/gem_make.out

gem_make.out.txt
compile.log.txt

ruby 2.3.7p456 run crash

ENV:

Debian GNU/Linux 8
ruby 2.3.7p456


/usr/local/bundle/gems/rdkafka-0.3.5/lib/rdkafka/consumer.rb:33: [BUG] Segmentation fault at 0x0000000000000000
ruby 2.3.7p456 (2018-03-28 revision 63024) [x86_64-linux]

-- Control frame information -----------------------------------------------
c:0042 p:---- s:0194 e:000193 CFUNC  :rd_kafka_topic_partition_list_add
c:0041 p:0022 s:0188 e:000187 BLOCK  /usr/local/bundle/gems/rdkafka-0.3.5/lib/rdkafka/consumer.rb:33 [FINISH]
c:0040 p:---- s:0185 e:000184 CFUNC  :each
c:0039 p:0031 s:0182 e:000181 METHOD /usr/local/bundle/gems/rdkafka-0.3.5/lib/rdkafka/consumer.rb:32
c:0038 p:0013 s:0176 e:000175 BLOCK  /app/lib/kafka_client/consumer.rb:31 [FINISH]
c:0037 p:---- s:0173 e:000172 CFUNC  :each
c:0036 p:0010 s:0170 e:000169 METHOD /app/lib/kafka_client/consumer.rb:31
c:0035 p:0071 s:0166 e:000165 METHOD /app/lib/kafka_client/consumer.rb:6
c:0034 p:0030 s:0157 e:000156 METHOD /app/app/daemons/pdm_sync_daemon/task.rb:4
c:0033 p:0124 s:0154 e:000153 METHOD /app/app/daemons/pdm_sync_daemon/control.rb:17
c:0032 p:0017 s:0151 e:000150 TOP    scripts/pdm_sync.rb:1 [FINISH]
c:0031 p:---- s:0149 e:000148 CFUNC  :load
c:0030 p:0123 s:0145 e:000144 METHOD /usr/local/bundle/gems/railties-5.1.4/lib/rails/commands/runner/runner_command.rb:34
c:0029 p:0078 s:0139 e:000138 METHOD /usr/local/bundle/bundler/gems/thor-0b137514427a/lib/thor/command.rb:27
c:0028 p:0058 s:0132 e:000131 METHOD /usr/local/bundle/bundler/gems/thor-0b137514427a/lib/thor/invocation.rb:126
c:0027 p:0303 s:0126 E:000f58 METHOD /usr/local/bundle/bundler/gems/thor-0b137514427a/lib/thor.rb:387
"/tmp/2" 2813L, 221648C

Instrumentation support

To do some deeper introspection on what is going on when receiving or publishing messages it would be useful to have an instrumentation interface compatible to Active Support Instrumentation, default might be just a NullInstrumenter which is just discarding information. To have an idea what might be actually useful to instrument be inspired by ruby-kafka:

  • message producing
  • message delivery
  • message polling
  • join/leave consumer group
  • (re-)assign partitions within consumer group
  • offset changes
  • consumer heartbeat
  • connection updates
  • probably more...

Abort trap: 6 Assertion failed: (r == 0), function rwlock_wrlock, file tinycthread_extra.c

I run into this issue when running specs locally. I checked on 1.2.2-RC1, 1.2.0 and 1.1.0. Any ideas?

balrog:rdkafka-ruby n$ be rspec --profile 20 ./spec/rdkafka/consumer_spec.rb
.......F.......F.....FFFFFFF..F.....F....Assertion failed: (r == 0), function rwlock_wrlock, file tinycthread_extra.c, line 138.
Abort trap: 6
balrog:rdkafka-ruby n$ ruby -v
ruby 2.6.5p114 (2019-10-01 revision 67812) [x86_64-darwin18]

balrog:rdkafka-ruby n$ brew config
HOMEBREW_VERSION: 2.1.15-107-g5da322d
ORIGIN: https://github.com/Homebrew/brew
HEAD: 5da322d6be314e5f8b1729e91e60be743a1e7d8e
Last commit: 31 hours ago
Core tap ORIGIN: https://github.com/Homebrew/homebrew-core
Core tap HEAD: 28d42cb0f3160a0c8e169ce1a3aa30578d901c19
Core tap last commit: 26 hours ago
HOMEBREW_PREFIX: /usr/local
HOMEBREW_AWS_ACCESS_KEY_ID: set
HOMEBREW_AWS_SECRET_ACCESS_KEY: set
HOMEBREW_BINTRAY_KEY: set
HOMEBREW_BINTRAY_USER: nijikon
HOMEBREW_DEVELOPER: 1
HOMEBREW_SANDBOX: 1
HOMEBREW_VERBOSE: 1
CPU: quad-core 64-bit haswell
Homebrew Ruby: 2.6.3 => /usr/local/Homebrew/Library/Homebrew/vendor/portable-ruby/2.6.3/bin/ruby
Clang: 11.0 build 1100
Git: 2.23.0 => /usr/local/bin/git
Curl: 7.54.0 => /usr/bin/curl
Java: 13, 11.0.1, 1.8.0_45
macOS: 10.14.6-x86_64
CLT: 10.3.0.0.1.1562985497
Xcode: 11.2
CLT headers: 10.3.0.0.1.1562985497
XQuartz: 2.7.11 => /opt/X11

No provider for SASL mechanism GSSAPI

The ruby script is like this:

require 'rdkafka'

config = {
        :"bootstrap.servers" => "192.168.0.238:9092" ,
        :"security.protocol" => "SASL_PLAINTEXT" ,
        :"sasl.mechanisms"   => "GSSAPI" ,
        :"sasl.kerberos.principal" => "kafka/[email protected]" ,
        :"sasl.kerberos.keytab" => "/fluentd/etc/ssl/project_c-gwqnp_project-4nsmt_kafka.keytab" ,
}

rdkafka = Rdkafka::Config.new(config)
producer = rdkafka.producer
producer.produce(topic: "test", payload: "Hello World!")

And when I exec this script, some errors occured as below,

root@fluentd-t9nqb:/fluentd/etc/ssl# ruby ~/test.rb
Traceback (most recent call last):
        2: from /root/test.rb:12:in `<main>'
        1: from /var/lib/gems/2.5.0/gems/rdkafka-0.4.1/lib/rdkafka/config.rb:114:in `producer'
/var/lib/gems/2.5.0/gems/rdkafka-0.4.1/lib/rdkafka/config.rb:164:in `native_kafka': No provider for SASL mechanism GSSAPI: recompile librdkafka with libsasl2 or openssl support. Current build options: PLAIN (Rdkafka::Config::ClientCreationError)

So I need some help for this.

Non global logger and statistics callback

Majority of the settings of this lib are per instance based.

Is there any reason why logger and statistics_callback aren't? It is really problematic for systems where you may have several instances of rdkafka running for producing and consuming with different policies around instrumentation.

Useless pending delivery waiting upon unsendable message

Given a setup:

  • message.send.max.retries set to 2
  • bootstrap.servers set to invalid:9002

What happens upon producing and waiting is:

E, [2019-08-07T18:01:48.096491 #23972] ERROR -- : rdkafka: [thrd:invalid:9092/bootstrap]: invalid:9092/bootstrap: Failed to resolve 'invalid:9092': Name or service not known (after 21ms in state CONNECT)
E, [2019-08-07T18:01:48.096606 #23972] ERROR -- : rdkafka: [thrd:invalid:9092/bootstrap]: invalid:9092/bootstrap: Failed to resolve 'invalid:9092': Name or service not known (after 21ms in state CONNECT)
E, [2019-08-07T18:01:48.096654 #23972] ERROR -- : rdkafka: [thrd:invalid:9092/bootstrap]: 1/1 brokers are down

After that, rdkafka will not retry to send again, while rdkafka-ruby will wait (here: https://github.com/appsignal/rdkafka-ruby/blob/master/lib/rdkafka/producer/delivery_handle.rb#L49) until the default timeout of 60 seconds.

Even if the invalid url would be resolved by the DNS, the librdkafka won't retry sending it again as it reached the max number of attempts.

code:

config = {:"bootstrap.servers" => "invalid:9092"}
delivery_handles = []
producer = Rdkafka::Config.new(config).producer

producer.produce(
  topic:   "ruby-test-topic",
  payload: "Payload" * 1000,
  key:     "Key"
).wait

There is no point in waiting when we've used max retry attempts.

What I would suggest doing instead: raise a RdKafka error when we've run out of attempts.

Verify the integrity of the downloaded 'librdkafka' tarball

I think it would be a good thing (at least from a security point of view) to verify the integrity of resources downloaded from external (potentially untrusted) sources. Think, for example, of a CDN compromise.

It would be as easy as adding a sha256 checksum to the file hash:

recipe.files << {
  url: "https://github.com/edenhill/librdkafka/archive/v#{Rdkafka::LIBRDKAFKA_VERSION}.tar.gz",
  sha256: '2b96d7ed71470b0d0027bd9f0b6eb8fb68ed979f8092611c148771eb01abb72c'
}

Unfortunately, 'librdkafka' does not publish "official" checksums (confluentinc/librdkafka#1759), but a hash could be trivially calculated when updating the 'librdkafka' version.

Support manual offset store

When using autocommit, but not auto offset store you can determine when a message is done. rd_kafka_offset_store in librdkafka.

How to enable “gzip” function at AMI Linux2 os?

I try to use fluentd in AMI Linux2 os.
And I need use kafka output plugin for push messages to kafka.
The "gzip" should be good for performance for pushing.
So I use gem to install the rdkaka-ruby.
But the rdkafka don't enable gzip at install/compile stage.
So when I add "gzip" option into fluentd config, it will say "wrong value for compress.type".
How could I enable the gzip feature at "gem install rdkafka" step? Or other way to enable the gzip function?
I do have zlib-devel on the linux, "Package zlib-devel-1.2.8-7.18.amzn1.x86_64 already installed and latest version".

Session timeout for long message processing

Hi,

The processing time of one message can be several minutes in my app, should I define a higher session.timeout.ms in order to avoid multiple consumers processing the same partition? I really need to be sure that my messages are processed one after the other, and if there is an error in message processing it is retried until I fix the bug that causes the error.

Right now I'm seeing that sometimes the processing of next message starts before the processing of the previous message has ended. I suspect that it is due to the session timeout.

Handle EOF partition error properly in Consumer#each

When trying this library I got the following error:

/app/vendor/bundle/ruby/2.4.0/gems/rdkafka-0.3.3/lib/rdkafka/consumer.rb:186:in `poll': Broker: No more messages (partition_eof) (Rdkafka::RdkafkaError)
	from /app/vendor/bundle/ruby/2.4.0/gems/rdkafka-0.3.3/lib/rdkafka/consumer.rb:207:in `block in each'
	from /app/vendor/bundle/ruby/2.4.0/gems/rdkafka-0.3.3/lib/rdkafka/consumer.rb:206:in `loop'
	from /app/vendor/bundle/ruby/2.4.0/gems/rdkafka-0.3.3/lib/rdkafka/consumer.rb:206:in `each'

I think the #each method should handle these errors (ignore them, or pass these notifications to the block). Currently the each method itself raises an exception when you reach the end of the partition which is probably not what people expect from this library.

Got a log error when close consumer

Environment

OS type: MacOS
OS version: 10.14.5
rdkafka version: 0.7.0
Kafka docker image: confluentinc/cp-kafka:5.0.0
Zookeeper docker image: zookeeper:3.5
Cluster topology: 1 zookeeper, 1 broker

Current behavior

When I close the consumer I got this error:

E, [2019-11-03T18:23:32.298450 #53262] ERROR -- : rdkafka: [thrd:GroupCoordinator]: 1/1 brokers are down

Expected behavior

Don't have error log when I close the consumer

Reproduction

config = Rdkafka::Config.new
config["enable.auto.commit"] = false
config["group.id"] = "test"
config["session.timeout.ms"] = 10_000
config["enable.partition.eof"] = false
config["bootstrap.servers"] = "localhost:9092"

consumer = config.consumer
consumer.subscribe("topic.a", "topic.b")

10.times { consumer.poll(250) }

consumer.close

I have clone the repository and run rspec same error appear, and this error appear in the travis log too. For example this build line 582.

After investigation, I don't understand why I get this error log. Have you any information about this error log? Let's me know if you need more information.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.