Giter Site home page Giter Site logo

ekaf's Introduction

ekaf

An advanced but simple to use, Kafka producer written in Erlang

Build Status

Highlights

  • An erlang implementation of a Producer as per 0.8 Kafka wire protocol
  • Produce to a Topic syncronously and asynchronously
  • Option to batch messages and customize the concurrency
  • Several lazy handling of both metadata requests as well as connection pool creation
  • Will automatically buffer when broker goes down and resend, with max downtime size configurable
  • Will automatically start/stop workers based on kafka broker additions/changes/downtime
  • Callbacks to instrument into your monitoring system

ekaf also powers kafboy, the webserver based on Cowboy capable of publishing to ekaf via simple HTTP endpoints that handles more than 100 million events/day as of mid 2014 at Helpshift.

Add a feature request at https://github.com/helpshift/ekaf or check the ekaf web server at https://github.com/helpshift/kafboy

##Features## ###Simple API###

Topic is a binary. and the payload can be a list, a binary, a key-value tuple, or a combination of all the above.

%%------------------------
%% Start
%%------------------------
application:load(ekaf),

%%%% mandatory. ekaf needs atleast 1 broker to connect.
%% To eliminate a SPOF, use an IP to a load balancer to a bunch of brokers
application:set_env(ekaf, ekaf_bootstrap_broker, {"localhost", 9091}),


{ok, _} = application:ensure_all_started(ekaf),

Topic = <<"ekaf">>,

%%------------------------
%% Send message to a topic
%%------------------------

%% sync
ekaf:produce_sync(Topic, <<"foo">>).

%% async
ok = ekaf:produce_async(Topic, jsx:encode(PropList) ).

%%------------------------
%% Send events in batches
%%------------------------

%% send many messages in 1 packet by sending a list to represent a payload
{{sent, _OnPartition, _ByPartitionWorker}, _Response }  =
    ekaf:produce_sync(
        Topic,
        [<<"foo">>, {<<"key">>, <<"value">>}, <<"back_to_binary">> ]
    ),

%%------------------------
%% Send events in batches
%%------------------------

%% sync
{buffered, _Partition, _BufferSize} =
    ekaf:produce_sync_batched(
        Topic,
        [ekaf_utils:itob(X) || X<- lists:seq(1,1000) ]
    ).

%% async
{buffered, _Partition, _BufferIndex} =
    ekaf:produce_async_batched(
        Topic,
        [<<"foo">>, {<<"key">>, <<"value">>}, <<"back_to_binary">> ]
    ).

%% send entire batch as a list of events
application:set_env(ekaf, ?EKAF_CALLBACK_MASSAGE_BUFFER,
                          {ekaf_callbacks, 
                           encode_messages_as_one_large_json}),


%% route to a partition based on a key or list of tuples ( see below on default, custom logic)
ekaf:produce_async(<<"topic">>, {<<"user1">>,<<"foobar">>}). 
ekaf:produce_sync(<<"topic">>, [{<<"user1">>, <<"foo">>},  {<<"user2">>, <<"bar">>}]).
ekaf:produce_async_batched(...


%%------------------------
%% Other helpers that are used internally
%%------------------------
%% metadata
%% if you don't want to start workers on app start, make sure to
%%               first get metadata before any produce/publish
ekaf:metadata(Topic).

%% pick a worker, and directly communicate with it
ekaf:pick(Topic). %synchronous
ekaf:pick(Topic,Callback). %asynchronous

%% see the tests for a complete API, and `ekaf.erl` and `ekaf_lib` for more

See test/ekaf_tests.erl for more

Quickstart

Here's a quick ekaf demo to show broker resiliance, buffering, and instrumenting with callbacks.

on terminal 1

git clone https://github.com/helpshift/ekaf
rebar get-deps compile
cd deps/kafkamocker
rebar compile skip_deps=true && erl -pa ebin -pa deps/*/ebin -s kafkamocker_demo
kafka_consumer_loop INCOMING [<<"1">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
                          <<"7">>,<<"8">>,<<"9">>,<<"10">>,<<"11">>,
                          <<"12">>,<<"13">>,<<"14">>,<<"15">>,<<"16">>,
                          <<"17">>,<<"18">>,<<"19">>,<<"20">>]

and on terminal 2

cd ekaf
rebar compile skip_deps=true && erl -pa ebin -pa deps/*/ebin -s ekaf_demo
> request 1
> ekaf.ekaf_callback_downtime_saved => 1
> ekaf.mainbroker_unreachable => 1
> ....
> request 20
> ekaf.ekaf_callback_downtime_saved => 1
> ekaf.mainbroker_unreachable => 1
> ekaf.ekaf_callback_time_down => 59280
> ekaf.ekaf_callback_time_to_connect.broker1.0 => 1
> ekaf.ekaf_callback_time_to_connect.broker1.0 => 1
> ekaf.ekaf_callback_downtime_replayed => 1 during ready
> ekaf.ekaf_callback_worker_up.broker1.0 1
> ekaf.ekaf_callback_worker_up.broker1.0 1
> ekaf.ekaf_callback_flush.broker1.0 20
> ekaf.ekaf_callback_flushed_replied.broker1.0.diff 0

To create your own topics and test them see the Kafka Quickstart at http://kafka.apache.org/08/quickstart.html

$ bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic ekaf

Then change the line in ekaf_demo.erl to point to your kafka server instead of kafkamocker.

Tunable Concurrency

Each worker represents a connection to a broker + topic + partition combination. You can decide how many workers to start for each partition

%% a single topic with 3 partitions will have 15 workers per node
%% all able to batch writes
{ekaf,[{ekaf_per_partition_workers, 5},
       {ekaf_per_partition_workers_max, 10}]}.

You can also set different concurrency strategies for different topics

{ekaf,[{ekaf_per_partition_workers, [
   {<<"topic1">>, 5},              % lazily start a connection pool of 5 workers
   {<<"topic2">>, 1},              % per partition
   {ekaf_per_partition_workers, 1000}    % for remaining topics
]}]}.

By lazily starting a connection pool, these workers only are spawned on receipt of the first produce message received by that worker. More on this below.

Batch writes

You can batch async and sync calls until they reach 1000 for a partition, or 5 seconds of inactivity, which ever comes first.

{ekaf,[
    {ekaf_max_buffer_size, 100},      %% default
    {ekaf_buffer_ttl     , 5000}      %% default
]}.

You can also set different batch sizes for different topics.

{ekaf,[{ekaf_max_buffer_size, [
   {<<"topic1">>, 500},            % topic specific
   {<<"topic2">>, 10000},
   {ekaf_max_buffer_size, 1000}    % for remaining topics
]}]}.

You can also change the default buffer flush on inactivity from 1 second. Topic specific is again possible.

{ekaf,[
    {ekaf_partition_strategy, 1000}  % if a buffer exists after 1 sec on inactivity, send them
]}.

Partition choosing strategy

random ( Faster, since order not maintained among partition workers )

screenshot-strategy-random

Will deliver to kafka ordered within the same partition worker, but produced messages can go to any random partition worker. As a result, ordering finally at the kafka broker cannot be ensured.

sticky_round_robin

screenshot-strategy-sticky_round_robin

Will attempt to deliver to kafka in the same order than was published by sharding 1000 writes at a time to the same partition worker before switching to another worker. The same partition worker order need not be chosen when run over long durations, but whichever partition worker is picked, its writes will be in order.

strict_round_robin

Every publish will go to a different partition worker. Helps for in-frequent events that must necessarily not go to the same consumer quickly.

eg: if you want every messages to go to a different partition, you may need 1 worker, and use strict_round_robin

eg: if you have 100 workers per partition and have chosen strict_round_robin, the first 100 events will go to partition1, the next 100 to partition 2, etc

custom

If this strategy has been decided (can be configured for all topics, or for specific topics) then all messages of a tuple form {Key,Bin} will be passed to a function to decide the partition based on Key

ekaf:produce_async_batched(<<"topic">>, {<<"user1">>,<<"foobar">>}).
ekaf:produce_sync(<<"topic">>, [{<<"user1">>, <<"foo">>},  {<<"user2">>, <<"bar">>}]).
% internally if there are 5 partitions then `erlang:phash2(<<"user1">>) rem 5` is done
% to choose a partition. within the partition, workers are again 
% round robin'd even when publishing to the same Key. But you can over-ride the
% default ekaf_callbacks:default_custom_partition_picker/3 implementation

NOTE: You can configure the same strategy for all topics, or pick different for different topics

{ekaf_partition_strategy, [
 {<<"heavy_job">>, strict_round_robin},
 {<<"other_event">>, sticky_round_robin},
 {<<"user_actions">>, custom}, %route to partition based on message key
 {ekaf_partition_strategy,  random}      %% default
]}

No need for Zookeeper

Does not need a connection to Zookeeper for broker info, etc. This adopts the pattern encouraged from the 0.8 protocol

No linked drivers, No NIF's, Minimal Deps.

Deals with the protcol completely in erlang. Pattern matching FTW, see the blogpost that inspired this project ( also see https://coderwall.com/p/1lyfxg ). Uses gproc for process registry.

Option to re-use worker pool as statsd worker pool to push metrics

In production having when 100's of workers pushing to your favorite statsd client, you may find your statsd client becomeing a bottleneck. Enabling the ?EKAF_PUSH_TO_STATSD_ENABLED at a global or topic level, allows each worker to maintain reference to a UDP socket, so that pushing metrics is naturally load balanced.Enabling this option does not begin sending metrics automatically. Your callback needs to do so like shown below.

% Set the ekaf app options before ekaf starts (or in your config file)
% to enable the push to statsd option (since 1.5.4), register your callback
application:set_env(ekaf, ?EKAF_PUSH_TO_STATSD_ENABLED, true),
application:set_env(ekaf, ?EKAF_CALLBACK_FLUSH_ATOM,  {ekaf_demo, demo_callback}),

% Then to get the metric ekaf.events.broker1.0 => N do

demo_callback(Event, _From, _StateName, 
            #ekaf_fsm{ topic = Topic, partition = PartitionId, last_known_size = BufferLength, leader = Leader} = _State,
            Extra)->

    Stat = <<Topic/binary,".",  Event/binary, ".broker", (ekaf_utils:itob(Leader))/binary, ".", (ekaf_utils:itob(PartitionId))/binary>>,
    case Event of
    ?EKAF_CALLBACK_FLUSH ->
        ekaf_stats:udp_gauge(_State#ekaf_fsm.statsd_socket,
                             Stat,
                             BufferLength),
        ok;

Optimized for using on a cluster

  • Works well if embedding into other OTP/rebar style apps ( eg: tested with kafboy)
  • Only gets metadata for the topic being published. ekaf does not start workers until a produce is called, hence easily horizontally scalable - can be added to a load balancer without worrying about creating holding up valuable connections to a broker on bootup. Queries metadata directly from a {ekaf_bootstrap_broker,Broker} during the first produce to a topic, and then caches this data for that topic.
  • Extensive use of records for O(1) lookup
  • By using binary as the preferred format for Topic, etc, - lists are avoided in all places except the {BrokerHost,_Port}.
  • All pg2 and gproc names are prefixed with <<"ekaf.",Topic/binary>> for better namespacing

Concurrency when publishing to multiple topics

Each Topic, will have a pg2 process group, but maintaining the pool is done internally for maintaining round-robin, etc. You can pick a random partition worker for a topic via

ekaf:pick(<<"topic_foo">>, fun(Worker) ->
    case Worker of
        {error, try_again}->
            %% need to get some metadata first for this topic
            %% see how `ekaf:produce_sync/2` does this
        SomeWorkerPid ->
            %% gen_fsm:sync_send_event(SomeWorker, pool_name)
            %% SomeWorker ! info
    end
end).

%% pick/1 and pick/2 also exist for synchronously choosing a worker
%% picking a worker based on the data is a new functionality added in 1.6.0

Fault tolerant

Features you gain by using ekaf for pushing to kafka:

  • Handles changes in kafka metadata auto-magically. Metadata is queried on connection losses and at regular intervals
  • If one partition/broker goes down, messages will go to other available partitions/brokers (as configured on kafka)
  • If all brokers/partitions are unavailable, will start buffering messages in-memory, and replay them when the broker is back. See ekaf_max_downtime_buffer_size for configuring this options. By default this is not set, since kafka restarts are usually quick, and possibility of all brokers down is low.
  • All these network conditions are simulated in the tests. You're covered!

Instrumentable

Current callbacks include when the buffer is flushed.

{ekaf,[
    {ekaf_callback_flush, {mystats, callback_flush}}
    %% where the callback is fun mystats:callback_flush/5
]}.

%% mystats.erl
demo_callback(Event, _From, _StateName,
	#ekaf_fsm{ topic = Topic, broker = _Broker, partition = PartitionId, last_known_size = BufferLength, cor_id = CorId, leader = Leader},
	Extra)->
Stat = <<Topic/binary,".",  Event/binary, ".broker", (ekaf_utils:itob(Leader))/binary, ".", (ekaf_utils:itob(PartitionId))/binary>>,
case Event of
  ?EKAF_CALLBACK_FLUSH ->
	io:format("~n ~p flush broker~w#~p when size was ~p corid ~p via:~p",
                      [Topic, Leader, PartitionId, BufferLength, CorId, _From]);
...

The first argument being binary, can easily be pushed into statsite/statsd/graphite/grafana

State Machines

Each worker is a finite state machine powered by OTP's gen_fsm as opposed to gen_server which is more of a client-server model. Which makes it easy to handle connections breaking, and adding more features in the future. In fact every new topic spawns a worker that first starts in a bootstrapping state until metadata is retrieved. This is a blocking call.

Choosing a worker is done by a worker of ekaf_server for every topic. It looks at the strategy and decides how often to choose a worker and is used internally by ekaf:pick

An example ekaf config

{ekaf,[

    % required.
    {ekaf_bootstrap_broker, {"localhost", 9091} },
    % pass the {BrokerHost,Port} of atleast one permanent broker. Ideally should be
    %       the IP of a load balancer so that any broker can be contacted


    % optional.
    {ekaf_bootstrap_topics, [ <<"topic">> ]},
    % will start workers for this topic when ekaf starts
    % a lazy and more recommended approach is to ignore this config

    % optional
    {ekaf_per_partition_workers,100},
    % how big is the connection pool per partition
    % eg: if the topic has 3 partitions, then with this eg: 300 workers will be started


    % optional
    {ekaf_max_buffer_size, [{<<"topic">>,10000},                % for specific topic
                            {ekaf_max_buffer_size,100}]},       % for other topics
    % how many events should the worker wait for before flushing to kafka as a batch


    % optional
    {ekaf_partition_strategy, random},
    % if you are not bothered about the order, use random for speed
    % else the default is random

    % optional
    {ekaf_callback_flush, {mystats,callback_flush}},
    % can be used for instrumentating how how batches are sent & hygeine

    % optional
    {ekaf_callback_custom_partition_picker, {ekaf_callbacks, 
                                             default_custom_partition_picker}} 
    % to always route messages with keys to the same partition

]},

Powering kafboy, a HTTP gateway to ekaf

kafboy ( https://github.com/helpshift/kafboy ) sits on top of ekaf and is powered by Cowboy to send JSON posted to it directly to kafka

POST /async/topic_name
POST /batch/async/topic
POST /safe/batch/async/topic
etc

Benchmarks

Running the test on a 2GB RAM vagrant VM, where the broker was local

Test_Async_Multi_Batched = fun(N) ->Seq = lists:seq(1,N), N1 = now(),  ekaf:produce_async_batched(<<"ekaf">>, [ ekaf_utils:itob(X) || X <- Seq]), N2 = now(), N/(timer:now_diff(N2,N1)/1000000) end.

([email protected])28> Test_Async_Multi_Batched(1000).
202429.14979757086
([email protected])29> Test_Async_Multi_Batched(10000).
438981.56277436344
([email protected])30> Test_Async_Multi_Batched(100000).
440893.7798705536

test multiple calls to the async batch

Test_Async_Batched = fun(N) ->Seq = lists:seq(1,N), N1 = now(), [ ekaf:produce_async_batched(<<"ekaf">>, ekaf_utils:itob(X)) || X <- Seq], N2 = now(), N/(timer:now_diff(N2,N1)/1000000) end

([email protected])24> Test_Async_Batched(1000).
9628.623973347969
([email protected])25> Test_Async_Batched(10000).
6209.853298425678
([email protected])26> Test_Async_Batched(100000).
6156.06385217173

test multiple calls to the async without batch

Test_Async = fun(N) -> Seq = lists:seq(1,N), N1 = now(), [ ekaf:produce_async(<<"ekaf">>, ekaf_utils:itob(X)) || X <- Seq], N2 = now(), N/(timer:now_diff(N2,N1)/1000000) end.
([email protected])31>     Test_Async = fun(N) -> Seq = lists:seq(1,N), N1 = now(), [ ekaf:produce_async(<<"ekaf">>, ekaf_utils:itob(X)) || X <- Seq], N2 = now(), N/(timer:now_diff(N2,N1)/1000000) end.
([email protected])32> Test_Async(1000).
5030.155783924628
([email protected])33> Test_Async(10000).
2771.468068807792
([email protected])34> Test_Async(100000).
2427.0491521382896

Tests

ekaf comes embedded with kafkamocker - an erlang/otp app that any app can embed to simulate a kafka broker. This means that an actual consumer has verified the receipt of the published messages in our eunit tests.

ekaf works well with rebar.

$ rebar get-deps clean compile eunit

==> ekaf (eunit)
test/ekaf_tests.erl:87:<0.174.0>: t_reading_topic_specific_envs ( ) = ok
test/ekaf_tests.erl:89:<0.181.0>: t_pick_from_new_pool ( ) = ok
test/ekaf_tests.erl:91:<0.195.0>: t_request_metadata ( ) = ok
test/ekaf_tests.erl:93:<0.199.0>: t_request_worker_state ( ) = ok
test/ekaf_tests.erl:96:<0.203.0>: t_produce_sync_to_topic ( ) = ok
test/ekaf_tests.erl:98:<0.209.0>: t_produce_sync_multi_to_topic ( ) = ok
test/ekaf_tests.erl:100:<0.215.0>: t_produce_sync_in_batch_to_topic ( ) = ok
test/ekaf_tests.erl:102:<0.222.0>: t_produce_sync_multi_in_batch_to_topic ( ) = ok
test/ekaf_tests.erl:105:<0.229.0>: t_produce_async_to_topic ( ) = ok
test/ekaf_tests.erl:107:<0.236.0>: t_produce_async_multi_to_topic ( ) = ok
test/ekaf_tests.erl:109:<0.243.0>: t_produce_async_in_batch_to_topic ( ) = ok
test/ekaf_tests.erl:111:<0.251.0>: t_produce_async_multi_in_batch_to_topic ( ) = ok
test/ekaf_tests.erl:114:<0.259.0>: t_max_messages_to_save_during_kafka_downtime ( ) = ok
test/ekaf_tests.erl:116:<0.277.0>: t_restart_kafka_broker ( ) = ok
test/ekaf_tests.erl:118:<0.290.0>: t_change_kafka_config ( ) = ok
test/ekaf_tests.erl:120:<0.324.0>: t_massage_buffer_encode_messages_as_one_large_message ( ) = ok
All 32 tests passed.
Cover analysis: /Users/bosky/testbed/ekaf/.eunit/index.html

Code Coverage:
ekaf                   : 51%
ekaf_callbacks         : 87%
ekaf_demo              :  0%
ekaf_fsm               : 53%
ekaf_lib               : 62%
ekaf_picker            : 68%
ekaf_protocol          : 70%
ekaf_protocol_metadata : 78%
ekaf_protocol_produce  : 68%
ekaf_server            : 43%
ekaf_server_lib        : 64%
ekaf_socket            : 57%
ekaf_sup               : 33%
ekaf_utils             : 14%

Total                  : 50%

License

Copyright 2015, Helpshift, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Goals for v2.0.0

ekaf's People

Contributors

bosky101 avatar kzemek avatar manuelolmos avatar mousavian avatar wanlitian avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ekaf's Issues

Forcefully disconnect from old brokers, if not present in new metadata

When doing something like a migration of kafka clusters - ekaf does not need to be restarted.

doing a dns change of ekaf_bootstrap_broker will suffice and ekaf picks this up automatically and starts sending to new brokers without any downtime. however if the older brokers are still up - it continues sending to them.

eg:
suppose metadata1 comprises of broker1,broker2
and metadata2 comprises of broker2, broker3

ekaf currently does not stop workers of broker1 which it should broker1 is not in metadata2 although the connection is still up.

on the other hand this is interesting because, ekaf is able to send to 2 kafka clusters. but still marking this as a bug that should be fixed.

~B

Random partition selection despite having custom selected

Hello there,

I am having issues about writing to same partition for different KEYs (should definitely not write to that partition because I have logged ChoosenPartition in logs) and writing to different partitions for same KEY and I checked that I have
'kafka_bridge.ekaf_partition_strategy = custom'
set on config

What do you suggest for me to do to be ensure if I have problem somewhere in my code or detect where might be not working as intended on ekaf library if any

Thank you for your time, have a good day

Feature request: connection to multiple brokers dynamically

Hi.
I am looking for a feature that would allow connecting to a broker specified at runtime.
Currently, the broker has to be pre-configured. In my use case, the broker would be defined at runtime. I will be publishing messages to potentially hundreds of brokers.

Would something like that be reasonable/useful?

Thanks.

suggestion

it's better to add the project name for registering process name when using gproc, like this:

gproc:reg({n,l,{ProjectName, TopicName}}).

How does `ekaf_max_buffer_size` work?

I changed this parameter to 2, but after 2 messages emitted I don't see messages in kafka topic.
However ekaf_buffer_ttl does work: when I change ttl from 2 seconds to 10 seconds I see the difference when messages are flushed. I checked using application:get_env that both parameters are set.

Thanks.

when topic not exists, ekaf:pick/1 return error, while ekaf:pick/2 return ok and create a topic

1> application:set_env(ekaf, ekaf_bootstrap_broker, {"localhost", 9092}).
ok
2> application:ensure_all_started(ekaf). 
{ok,[gproc,kafkamocker,ekaf]}
3> ekaf:produce_sync(<<"dsf">>, <<"DSF">>).  %% topic dsf exists 
{{sent,1,<0.198.0>},
 {produce_response,1,0,
                   [{topic,<<"dsf">>,0,[{partition,0,0,0,[],[],0,[]}]}]}}
4> ekaf:produce_sync(<<"Dsf">>, <<"DSF">>).  %% topic Dsf not exists
{error,bootstrapping}
5> ekaf:pick(<<"Dsf">>).           %% this operator not success         
{error,bootstrapping}
6> ekaf:pick(<<"Dsf">>, fun(W) -> W end). %% this success
{ok,<0.264.0>}
7> ekaf:produce_sync(<<"Dsf">>, <<"DSF">>). %% then this success.
{{sent,0,<0.324.0>},
 {produce_response,1,0,
                   [{topic,<<"Dsf">>,0,[{partition,0,0,0,[],[],0,[]}]}]}}
8> 

Do you use ekaf? Help spread the word at Kafka Summit

ekaf has now crossed 70 stars, and averages over 100 clones every week.

In the spirit of the open source community, please drop a comment if your company uses ekaf in production. Any additional information you can provide will be greatly appreciated.

This will particularly be useful when anyone gives a talk about ekaf and erlang/kafka, and in turn getting more active participation and interest in using and evolving ekaf.

At Helpshift, we push around 300 million events every day to our events topic that contains 4-5 partitions. Kafka (4-5 brokers) has been solid as a rock and the central glue across 100's of repos/micro-services written across erlang, golang and clojure. It has also helped that ekaf can (& has) provided the offline backup & replay functionality. The metrics are something that has particularly delighted our ops team - info about distribution among partitions, buffer size when flushed, broker/partition intermittent downtime are all possible by adding callbacks that push to statsite/graphite.

We use https://github.com/lpgauth/statsderl by @lpgauth, a prolific erlang contributor to push metrics. ekaf's dependency include @uwiger 's https://github.com/uwiger/gproc. Our tests uses @ninenine's https://github.com/ninenines/ranch to mock a kafka broker.

Hat-tip & thanks to those in all in community who have raised issues, cloned or favourited the ekaf repo including in no particular order @WanliTian (ekaf's most recent contributor from Baidu) @qrilka @mitchellwrosen @vglukhovskiy @RomanShestakov @jccampagne @belltoy @chengat1314 @cybergrind @dbdn @ericliang @gaynetdinov @harekumar @jwilberding @kachayev @layerhq @linearregression @mubarak @sagelywizard @joestein @a13x @sdanzan @smarin @wrw @xwiz @ysantos @zhangxinrun @xpd54 @ates @flyjack @wdshin @sipims @thatJoeGuy16 @ameensol @mjs2600 @shabankarumba @Lavrin @vamsimokari

And last but not least, many thanks to @ghoseb (CTO at Helpshift) & my colleagues @helpshift for allowing ekaf to be written in erlang, reviewing, QA'ing, debugging, and deploying ekaf.
Feel free to share how you use ekaf or submit a talk of your own.
I am considering submitting a talk on how ekaf is used by the community, and perhaps its internals.

Kafka Summit is happening in April 2016 in San Francisco
More at http://kafka-summit.org

Hope to see you there. And thank you for the continued encouragement and support.

Some questions for custom partition strategy

i found that the custom partition strategy is ineffective, and sometimes it's not a strict custom policy. i think people always want a strict hash policy when they use it.
the code here:
https://github.com/helpshift/ekaf/blob/master/src/ekaf_server.erl#L360
i think it's better to return an error code than a wrong process

in my situation, there are so many partitions. if processes are ask to determine a specified partition, it's really ineffective. why not just use gproc to regist a name for that process, like this:
gproc:reg({n,l,{Topic, Partition}}).

i think there are no need multiple processes for one partition when using custom partition strategy. But it you want, you can like this:
gproc:reg({p, l, {Topic, Partition}})
Pids = gproc:lookup_pids({p,l,{Topic,Partitions}})

Unexpected behavior when calling process is actively receiving messages.

This is done with every call to kafka server processes - should probably be moved to inside the server process itself.

https://github.com/helpshift/ekaf/blob/master/src/ekaf_lib.erl#L137-L151

The receive blocks calling process until message is received - if the calling process is actively receiving messages unexpected behavior will occur (dropped messages, sending data to the wrong topic, etc...).

This would affect any producer function.

Partitioning strategy working with Kafka 0.10.0?

I tried to use different built-in partitioning strategy by setting "ekaf_partition_strategy" as ekaf's application environment variable in my app. But no strategy worked as expected. I also tried to set them via config file but that didn't help either. The end result is inconsistent distribution of messages onto partitions without any pattern. Most of the time ekaf does not write to all partitions but just some of them.

fetch api?

Hi,

Are you working on implementing the fetch api? Not even the complex consumer behavior, just the kafka fetch protocol.

INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

Hi,

I noticed from the driver that once connected to the Kafka Server (0.8.2), will produce this disconnection log every 5 seconds. Now from what I have collected online, this seems to be the metadata api to retrieve information per topic on the kafka cluster. What I want to know is, is this normal? Is there no option to make the connection persistent rather than establishing/disconnecting the socket every 5 seconds per topic?

Byron

Error for produced messages are ignored

I haven't found any error handling logic for "produce" responses and current design allows e.g. messages being send to stale partition leader.
This makes sending messages with ekaf unreliable

Kafka plug-in make error

OS: centos7
Erlang/OTP: 21.3
EMQ: 3.1.0

make[1]: 进入目录“/root/emqtt/emqx-rel/deps/emqx_plugin_kafka”
make[2]: 进入目录“/root/emqtt/emqx-rel/deps/ekaf”
make[3]: 进入目录“/root/emqtt/emqx-rel/deps/kafkamocker”
make[3]: 离开目录“/root/emqtt/emqx-rel/deps/kafkamocker”
make[2]: 离开目录“/root/emqtt/emqx-rel/deps/ekaf”
ERLC emqx_plugin_kafka.erl
src/emqx_plugin_kafka.erl:50: record emqx_client undefined
src/emqx_plugin_kafka.erl:51: variable 'ClientId' is unbound
src/emqx_plugin_kafka.erl:55: record emqx_client undefined
src/emqx_plugin_kafka.erl:56: variable 'ClientId' is unbound
src/emqx_plugin_kafka.erl:85: record emqx_message undefined
src/emqx_plugin_kafka.erl:175: record emqx_message undefined
src/emqx_plugin_kafka.erl:176: record emqx_message undefined
src/emqx_plugin_kafka.erl:177: record emqx_message undefined
src/emqx_plugin_kafka.erl:178: record emqx_message undefined
src/emqx_plugin_kafka.erl:179: record emqx_message undefined
src/emqx_plugin_kafka.erl:180: record emqx_message undefined
src/emqx_plugin_kafka.erl:181: record emqx_message undefined
src/emqx_plugin_kafka.erl:182: record emqx_message undefined
src/emqx_plugin_kafka.erl:175: Warning: variable 'Id' is unused
src/emqx_plugin_kafka.erl:182: Warning: variable 'Timestamp' is unused
src/emqx_plugin_kafka.erl:212: Warning: variable 'Username' is unused
src/emqx_plugin_kafka.erl:214: Warning: variable 'ClientId' is unused
src/emqx_plugin_kafka.erl:222: Warning: function ekaf_set_topic/1 is unused
make[2]: *** [ebin/emqx_plugin_kafka.app] 错误 1
make[1]: *** [app] 错误 2
make[1]: 离开目录“/root/emqtt/emqx-rel/deps/emqx_plugin_kafka”
make: *** [deps] 错误 2

Reconnect issues after broker restart

It appears from our environment that when brokers goes down and comes back up, ekaf client is keep trying to create more connections, we have seen around 16K ekaf connections to brokers (TCP connection state is ESTABLISHED) and we have to restart an app (which embeds ekaf client, number of ekaf connections went down to around 100 after restart of brokers).

Are we missing any config(s) to reconnect ekaf client after broker restart or is it an expected behavior?
Please let us know. Thanks.

ekaf tcp_closed

I got weird error on connecting my erlang application to Kafka cluster. Please look into below errors.

ekaf_server:472 <0.901.0>:dont know how to handle {tcp_closed,#Port<0.344719>} during ready
ekaf_server:472 <0.901.0>:dont know how to handle {tcp_closed,#Port<0.345988>} during ready
ekaf_server:472 <0.901.0>:dont know how to handle {tcp_closed,#Port<0.343701>} during ready
ekaf_server:472 <0.901.0>:dont know how to handle {tcp_closed,#Port<0.346711>} during ready
ekaf_server:472 <0.901.0>:dont know how to handle {tcp_closed,#Port<0.345919>} during ready
ekaf_server:472 <0.901.0>:dont know how to handle {tcp_closed,#Port<0.343695>} during ready
ekaf_server:472 <0.901.0>:dont know how to handle {tcp_closed,#Port<0.343845>} during ready
ekaf_server:472 <0.901.0>:dont know how to handle {tcp_closed,#Port<0.343866>} during ready
ekaf_server:472 <0.901.0>:dont know how to handle {tcp_closed,#Port<0.343674>} during ready
ekaf_server:472 <0.901.0>:dont know how to handle {tcp_closed,#Port<0.344163>} during ready

Can you tell me what is a connection timeout of ekaf with Kafka cluster and if timeout occurs do this automatically connect.

`ekaf_picker:pick_sync` does not consider custom strategy

ekaf_lib:common_sync/4 uses ekaf:pick(Topic) -> pg2:get_closest_pid(Topic) which does not take custom strategies into account. This makes ekaf:produce_sync* functions work differently for a binary key vs a {binkey, binval} tuple:

158> ekaf:produce_sync_batched(<<"hai">>,<<"value">>). 
{buffered,0,1}
159> ekaf:produce_sync_batched(<<"hai">>,{<<>>,<<"value">>}).
>>>>>>>>>>>>>>>>>>> {<<"hai">>,
                     {<<>>,<<"value">>},
                     [<0.4644.1>,<0.5064.1>,<0.8221.1>]}
[{buffered,0,1}]

The async variants already work as expected in all cases. The partitioner might want to do it's own default thing for all ekaf:produce_* calls.

Reproduce by setting ekaf_callback_custom_partition_picker to:

partition_picker(Topic, Data, State) ->                                         
  io:format(">>>>>>>>>>>>>>>>>>> ~p~n",[{Topic, Data, State}]),                 
  {partition, 0}                                                                
.

No way to limit message queue when Kafka is down?

I see max_buffer_size but it has a different meaning than just maximum buffer size (rather maximum number of async messages to buffer).
We had Kafka outage and that resulted in our server crashing with OOM because messages just kept collected in memory.
Should we have some workaround in ekaf for this problem?
I.e. some "total_buffer_size" limiting number of messages kept in memory (ingoring other messages if they appear after hitting that limit).
Normally Kafka should be very reliable but that is used for logging only in our system so it make sense to keep working even if logging has some problems (but warning about that should be issued of course).
Any opinion on this?

Incorrect instructions in ekaf_demo?

Following https://github.com/helpshift/ekaf/blob/master/src/ekaf_demo.erl#L6 I get:

kafkamocker $  rebar compile skip_deps=true && erl -pa ebin -pa deps/*/ebin -s kafkamocker_demo
==> kafkamocker (compile)
Erlang/OTP 17 [erts-6.2] [source] [64-bit] [smp:4:4] [async-threads:10] [kernel-poll:false]

{"init terminating in do_boot",{undef,[{kafkamocker_demo,start,[],[]},{init,start_it,1,[]},{init,start_em,1,[]}]}}

Crash dump was written to: erl_crash.dump
init terminating in do_boot ()

Kafka lose data

By testing concurrency, it was found that Kafka would lose data

Messages never sent to kafka cluster after broker restart

I'm having issues where after a one by one restart of all brokers in a cluster messages go missing without any errors (apart from a few messages saying the broker is down during the restart). The first time I noticed this around 30% of the messages never arrive at the Kafka cluster. After restarting the application in which i use ekaf all messages arrive properly.

This is reproducable. I tried restarting the kafka brokers again and this time 50% of the messages went missing. I think this is somehow related to issue #9, though I'm not entirely sure.

screen shot 2016-03-07 at 18 19 20

@bosky101 any thoughts?

application:set_env results in undef error

Hi

I am trying to set env variable as mentioned in readme doc. For reference adding the code.
application:load(ekaf).
application:set_env(ekaf, {ekaf_bootstrap_broker,{Server,Port}}).

But doing that results in undef

error: undef
[{application,set_env,
[ekaf,{ekaf_bootstrap_broker,{"localhost",<<"9092">>}}],
[]},

Can anyone please suggest what changes are required to get it working?

produce_async vs produce_sync partition strategy

I'm trying to use this library in Elixir and while experimenting with the produce_async and produce_sync functions I found out that

Enum.map(1..10000, fn _ -> :ekaf.produce_sync("partitioning", "test") end)

did randomly distribute the messages while

Enum.map(1..10000, fn _ -> :ekaf.produce_async("partitioning", "test") end)

does not (all messages seem to end up in the same partition).

Am I missing some configuration? Or is this expected behaviour?

produce_async_batched

Hello.

I'm a bit confused with the documentation regarding the way how to produce messages async and in batches.

The README says the following

%% sync
{buffered, _Partition, _BufferSize} =
    ekaf:produce_async_batched(
        Topic,
        [ekaf_utils:itob(X) || X<- lists:seq(1,1000) ]
    ).

%% async
{buffered, _Partition, _BufferIndex} =
    ekaf:produce_async_batched(
        Topic,
        [<<"foo">>, {<<"key">>, <<"value">>}, <<"back_to_binary">> ]
    ).

It's a bit weird to me that both sync and async ways to send messages in batches are done using the same method produce_async_batched. Is that expected to be so or it is a typo?

Also in this example to send messages asynchronous in batches I don't quite get the values which are used. What is <<"foo">>? As I understand <<"key">> is routing key for the message which is represented as <<"value">> here, right? What is <<"back_to_binary">>? Also if I open ekaf_tests.erl I see that produce_async_batched is called as the following:

Sent = [ <<(ekaf_utils:itob(X))/binary,".async multi batch">> || X<- lists:seq(9,19)],
Response  = ekaf:produce_async_batched(?TEST_TOPIC, Sent ),

or

Sent = <<"20.async in batch">>,
Response  = ekaf:produce_async_batched(?TEST_TOPIC, Sent),

As I can see in the examples from the test file there are no <<"foo">>, <<"key">>, <<"value">> or <<"back_to_binary">>, there are just topic name and message(or list of messages), that's it.
Could you please clarify how should I send messages asynchronous in batches?

Thank you so much!

Why only 1 bootstrap broker?

As far as I understand ekaf_bootstrap_broker assumes only 1 host:port pair - why is it so?
Other client libs (at least the one for Java and kafka-python) use a list of brokers.
Is this feature planned for later releases or there is some other reason to go with only 1 broker?

Compression

Any updates on the payload compression please?
I see it in Goals for v 2.0.0...

Thanks

ekaf app not restart when 6 erl node restart in turn

I have 6 node online,I want to update kafka from 8 to 10.2.So I change the configure for ekaf,but ekaf didi not produce to new kafka broker. And then I add log to ekaf_server mode generic_init function to get the topic and broker info. But I get nothing,so I know the ekaf application didi not restart.But why

ekaf is not honoring broker's partition change

It appears from our production environment that broker's partition change is not being honored in ekaf and we are seeing

Produce request with correlation id 2628 from client ekaf on partition [layer-insights,0] failed due to Leader not local for partition [layer-insights,0] on broker 2 (kafka.server.KafkaApis)

From kafka protocol:

The client does not need to keep polling to see if the cluster has changed; it can fetch metadata once when it is instantiated cache that metadata until it receives an error indicating that the metadata is out of date. This error can come in two forms: (1) a socket error indicating the client cannot communicate with a particular broker, (2) an error code in the response to a request indicating that this broker no longer hosts the partition for which data was requested.

seems like (2) (invalid partition error code) is not handled in ekaf.

kafka close connection

i use kafka to transfer message and just one broker
exception is :
close connection due to error handing producer request with correlation id 2
i want to know if my config is wrong and how to change it.

Bad behaviour when partitioning messages by key.

Config

application:set_env(ekaf, ekaf_partition_strategy, [{ekaf_partition_strategy, custom}])

All messages are produced with a key.

What happens:

  1. ekaf_server.erl:439 handle_info({worker, up, WorkerUp, WorkerUpStateName, WorkerUpState, _}, StateName, #ekaf_server { topic = Topic, messages = OfflineMessages } = State)
  2. ekaf_server_lib.erl:118 send_messages(StateName, #ekaf_server{ topic = Topic } = State, Messages)
  3. ekaf.erl:66 produce_async_batched(Topic, Data)
  4. ekaf_lib.erl:79 common_async(Event, Topic, [{Key,Data}|Rest])

The code waits for a reply from the TopicWorker, but the current process is the topic worker. The code would deadlock if we used gen_fsm:sync_send_all_state_event as in #50 . Here instead, we send {pick, {Key,Data}, self()} to self(), immediately receive it in clause _E and the message is discarded.

Async produce Message accumulation

30k/s msg async write kafka

([email protected])3> erlang:process_info(pid(0, 1767, 0)).
[{current_function,{erlang,spawn,3}},
 {initial_call,{proc_lib,init_p,5}},
 {status,running},
 {message_queue_len,13758967},
 {messages,[{tcp,#Port<0.3122>,
                 <<0,27,132,33,0,0,0,1,0,15,109,101,115,115,97,103,101,95,
                   ...>>},
            {tcp,#Port<0.3122>,
                 <<0,27,151,198,0,0,0,1,0,15,109,101,115,115,97,103,101,
                   ...>>},
            {'$gen_event',{produce_async,<<"{\"client_id\":\"pub_1a06b4cd44374f3a947\",\"username\":\"undefined"...>>}},
            {'$gen_event',{produce_async,<<"{\"client_id\":\"pub_f562fb58af2c4ce1aef\",\"username\":\"undef"...>>}},
            {'$gen_event',{produce_async,<<"{\"client_id\":\"pub_f3375a2c3fa74a419f0\",\"username\":\"u"...>>}},
            {'$gen_event',{produce_async,<<"{\"client_id\":\"pub_8952b84cc06046ed9a0\",\"username"...>>}},
            {'$gen_event',{produce_async,<<"{\"client_id\":\"pub_0a41e3193db64fbea0a\",\"user"...>>}},
            {tcp,#Port<0.3122>,<<0,27,143,58,0,0,0,1,0,15,109,...>>},
            {tcp,#Port<0.3122>,<<0,27,194,227,0,0,0,1,0,15,...>>},
            {tcp,#Port<0.3122>,<<0,27,204,77,0,0,0,1,0,...>>},
            {tcp,#Port<0.3122>,<<0,27,72,71,0,0,0,1,...>>},
            {tcp,#Port<0.3122>,<<0,27,204,83,0,0,0,...>>},
            {tcp,#Port<0.3122>,<<0,27,193,96,0,0,...>>},
            {tcp,#Port<0.3122>,<<0,27,132,34,0,...>>},
            {tcp,#Port<0.3122>,<<0,26,247,60,...>>},
            {tcp,#Port<0.3122>,<<0,27,129,...>>},
            {tcp,#Port<0.3122>,<<0,27,...>>},
            {tcp,#Port<0.3122>,<<0,...>>},
            {tcp,#Port<0.3122>,<<...>>},
            {tcp,#Port<0.3122>,...},
            {tcp,...},
            {...}|...]},
 {links,[<0.1709.0>,#Port<0.3122>]},
 {dictionary,[{'$initial_call',{ekaf_fsm,init,1}},
              {'$ancestors',[ekaf_sup,<0.1708.0>]},
              {rand_seed,{#{bits => 58,jump => #Fun<rand.8.77346176>,
                            next => #Fun<rand.5.77346176>,type => exrop,
                            uniform => #Fun<rand.6.77346176>,
                            uniform_n => #Fun<rand.7.77346176>,weak_low_bits => 1},
                          [228663813211387744|65157769469894326]}}]},
 {trap_exit,false},
 {error_handler,error_handler},
 {priority,normal},
 {group_leader,<0.1707.0>},
 {total_heap_size,431772712},
 {heap_size,137319567},
 {stack_size,16},
 {reductions,345736510},
 {garbage_collection,[{max_heap_size,#{error_logger => true,kill => true,size => 0}},
                      {min_bin_vheap_size,46422},
                      {min_heap_size,233},
                      {fullsweep_after,1000},
                      {minor_gcs,1}]},
 {suspending,[]}]

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.