takebayashi / fluent-plugin-avro Goto Github PK
View Code? Open in Web Editor NEWLicense: MIT License
License: MIT License
From fluentd, I would like to publish message with key and value...the key will be used for partitioning the message. Wondering how to configure the keyschema and valueschema as part of avro formatter ?
We are successfully formatting our message to Kafka using the fluentd-plugin-kafka and this formatter plugin, but we would like to have it send the avro record to Kafka with the 5 "magic" bytes as defined by Confluent for associated the avro messages with a schema in the schema registry.
https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format
Is this possible?
Current version in use is 0.0.2 of fluent-plugin-avro, and td-agent is version 0.14.16.
Here's the block from the config (but somehow this editor is not showing the <format> and </format> (ah, finally got them to show using some escapes) around what should be 2 lines below where it says "@type avro" which should be on one line followed by 'schema_json {"type":"record","name":"ipdr","namespace":"arris","fields":[{"name":"message","type":"string"}]}', so I converted it to a code block:
`<match ipdr.system.**>
@type kafka_buffered
brokers 10.185.45.121:9092
flush_interval 3s
default_topic test
get_kafka_client_log
<format>
@type avro
schema_json {"type":"record","name":"ipdr","namespace":"arris","fields":[{"name":"message","type":"string"}]}
</format>
output_data_type avro
compression_codec gzip
max_send_retries 1
required_acks -1
</match>`
Here's the full log of the error:
2017-10-25 15:01:57 +0000 [info]: gem 'fluent-plugin-avro' version '0.0.2'
2017-10-25 15:01:57 +0000 [info]: gem 'fluent-plugin-elasticsearch' version '1.10.1'
2017-10-25 15:01:57 +0000 [info]: gem 'fluent-plugin-elasticsearch' version '1.9.5'
2017-10-25 15:01:57 +0000 [info]: gem 'fluent-plugin-formatter_sprintf' version '0.1.0'
2017-10-25 15:01:57 +0000 [info]: gem 'fluent-plugin-grep' version '0.3.4'
2017-10-25 15:01:57 +0000 [info]: gem 'fluent-plugin-kafka' version '0.6.1'
2017-10-25 15:01:57 +0000 [info]: gem 'fluent-plugin-kafka' version '0.5.5'
2017-10-25 15:01:57 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.5'
2017-10-25 15:01:57 +0000 [info]: gem 'fluent-plugin-s3' version '1.0.0.rc3'
2017-10-25 15:01:57 +0000 [info]: gem 'fluent-plugin-td' version '1.0.0.rc1'
2017-10-25 15:01:57 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2'
2017-10-25 15:01:57 +0000 [info]: gem 'fluent-plugin-webhdfs' version '1.1.1'
2017-10-25 15:01:57 +0000 [info]: gem 'fluentd' version '0.14.16'
2017-10-25 15:01:57 +0000 [info]: gem 'fluentd' version '0.12.40'
2017-10-25 15:01:57 +0000 [info]: adding match pattern="system." type="grep"
2017-10-25 15:01:57 +0000 [info]: adding match pattern="ipdr.system." type="kafka_buffered"
2017-10-25 15:01:57 +0000 [info]: #0 brokers has been set directly: ["10.185.45.121:9092"]
2017-10-25 15:01:57 +0000 [error]: #0 unexpected error error_class=NameError error="uninitialized constant Fluent::TextFormatter::Formatter"
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-avro-0.0.2/lib/fluent/plugin/formatter_avro.rb:5:in <module:TextFormatter>' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-avro-0.0.2/lib/fluent/plugin/formatter_avro.rb:4:in
module:Fluent'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-avro-0.0.2/lib/fluent/plugin/formatter_avro.rb:3:in <top (required)>' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/site_ruby/2.4.0/rubygems/core_ext/kernel_require.rb:55:in
require'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/site_ruby/2.4.0/rubygems/core_ext/kernel_require.rb:55:in require' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/registry.rb:102:in
block in search'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/registry.rb:99:in each' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/registry.rb:99:in
search'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/registry.rb:44:in lookup' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/plugin.rb:146:in
new_impl'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/plugin.rb:128:in new_formatter' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-kafka-0.6.1/lib/fluent/plugin/out_kafka_buffered.rb:239:in
setup_formatter'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-kafka-0.6.1/lib/fluent/plugin/out_kafka_buffered.rb:153:in configure' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/plugin.rb:164:in
configure'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/agent.rb:130:in add_match' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/agent.rb:72:in
block in configure'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/agent.rb:64:in each' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/agent.rb:64:in
configure'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/root_agent.rb:109:in configure' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/engine.rb:127:in
configure'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/engine.rb:92:in run_configure' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/supervisor.rb:733:in
run_configure'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/supervisor.rb:502:in block in run_worker' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/supervisor.rb:661:in
main_process'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/supervisor.rb:498:in run_worker' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/lib/fluent/command/fluentd.rb:316:in
<top (required)>'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/site_ruby/2.4.0/rubygems/core_ext/kernel_require.rb:55:in require' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/site_ruby/2.4.0/rubygems/core_ext/kernel_require.rb:55:in
require'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-0.14.16/bin/fluentd:5:in <top (required)>' 2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/bin/fluentd:22:in
load'
2017-10-25 15:01:57 +0000 [error]: #0 /opt/td-agent/embedded/bin/fluentd:22:in <top (required)>' 2017-10-25 15:01:57 +0000 [error]: #0 /usr/sbin/td-agent:7:in
load'
2017-10-25 15:01:57 +0000 [error]: #0 /usr/sbin/td-agent:7:in `
In current, this plugin does not write avro header to buffer.
Because of it, I cannot parse buffer file by avro-tools or so.
Do you have a reason for this behavior?
Hi, thanks for your contribution. Also I had read your code but not found it support Kafka's producer , also it seems not support Avro block message. So I want to upgrade these features what should I do? Thanks.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.