Giter Site home page Giter Site logo

kafka's Introduction

Build Status Test Coverage david Dependencies david Dev Dependencies license

no-kafka

no-kafka is Apache Kafka 0.9 client for Node.js with new unified consumer API support.

Supports sync and async Gzip and Snappy compression, producer batching and controllable retries, offers few predefined group assignment strategies and producer partitioner option.

All methods will return a promise

Please check a CHANGELOG for backward incompatible changes in version 3.x

Using

kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic kafka-test-topic --partitions 3 --replication-factor 1
  • install no-kafka
npm install no-kafka

Producer

Example:

var Kafka = require('no-kafka');
var producer = new Kafka.Producer();

return producer.init().then(function(){
  return producer.send({
      topic: 'kafka-test-topic',
      partition: 0,
      message: {
          value: 'Hello!'
      }
  });
})
.then(function (result) {
  /*
  [ { topic: 'kafka-test-topic', partition: 0, offset: 353 } ]
  */
});

Send and retry if failed within 100ms delay:

return producer.send(messages, {
  retries: {
    attempts: 2,
    delay: {
      min: 100,
      max: 300
    }
  }
});

Batching (grouping) produce requests

Accumulate messages into single batch until their total size is >= 1024 bytes or 100ms timeout expires (overwrite Producer constructor options):

producer.send(messages, {
  batch: {
    size: 1024,
    maxWait: 100
  }
});
producer.send(messages, {
  batch: {
    size: 1024,
    maxWait: 100
  }
});

Please note, that if you pass different options to the send() method then these messages will be grouped into separate batches:

// will be sent in batch 1
producer.send(messages, {
  batch: {
    size: 1024,
    maxWait: 100
  },
  codec: Kafka.COMPRESSION_GZIP
});
// will be sent in batch 2
producer.send(messages, {
  batch: {
    size: 1024,
    maxWait: 100
  },
  codec: Kafka.COMPRESSION_SNAPPY
});

Keyed Messages

Send a message with the key:

producer.send({
    topic: 'kafka-test-topic',
    partition: 0,
    message: {
        key: 'some-key'
        value: 'Hello!'
    }
});

Custom Partitioner

Example: override the default partitioner with a custom partitioner that only uses a portion of the key.

var util  = require('util');
var Kafka = require('no-kafka');

var Producer           = Kafka.Producer;
var DefaultPartitioner = Kafka.DefaultPartitioner;

function MyPartitioner() {
    DefaultPartitioner.apply(this, arguments);
}

util.inherits(MyPartitioner, DefaultPartitioner);

MyPartitioner.prototype.getKey = function getKey(message) {
    return message.key.split('-')[0];
};

var producer = new Producer({
    partitioner : new MyPartitioner()
});

return producer.init().then(function(){
  return producer.send({
      topic: 'kafka-test-topic',
      message: {
          key   : 'namespace-key',
          value : 'Hello!'
      }
  });
});

Producer options:

  • requiredAcks - require acknoledgments for produce request. If it is 0 the server will not send any response. If it is 1 (default), the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).
  • timeout - timeout in ms for produce request
  • clientId - ID of this client, defaults to 'no-kafka-client'
  • connectionString - comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'
  • reconnectionDelay - controls optionally progressive delay between reconnection attempts in case of network error:
    • min - minimum delay, used as increment value for next attempts, defaults to 1000ms
    • max - maximum delay value, defaults to 1000ms
  • partitioner - Class instance used to determine topic partition for message. If message already specifies a partition, the partitioner won't be used. The partitioner must inherit from Kafka.DefaultPartitioner. The partition method receives 3 arguments: the topic name, an array with topic partitions, and the message (useful to partition by key, etc.). partition can be sync or async (return a Promise).
  • retries - controls number of attempts at delay between them when produce request fails
    • attempts - number of total attempts to send the message, defaults to 3
    • delay - controls delay between retries, the delay is progressive and incrememented with each attempt with min value steps up to but not exceeding max value
      • min - minimum delay, used as increment value for next attempts, defaults to 1000ms
      • max - maximum delay value, defaults to 3000ms
  • codec - compression codec, one of Kafka.COMPRESSION_NONE, Kafka.COMPRESSION_SNAPPY, Kafka.COMPRESSION_GZIP
  • batch - control batching (grouping) of requests
    • size - group messages together into single batch until their total size exceeds this value, defaults to 16384 bytes. Set to 0 to disable batching.
    • maxWait - send grouped messages after this amount of milliseconds expire even if their total size doesn't exceed batch.size yet, defaults to 10ms. Set to 0 to disable batching.
  • asyncCompression - boolean, use asynchronouse compression instead of synchronous, defaults to false
  • connectionTimeout - timeout for establishing connection to Kafka in milliseconds, defaults to 3000ms
  • socketTimeout - timeout for Kafka connection socket in milliseconds, defaults to 0 (disabled)

SimpleConsumer

Manually specify topic, partition and offset when subscribing. Suitable for simple use cases.

Example:

var consumer = new Kafka.SimpleConsumer();

// data handler function can return a Promise
var dataHandler = function (messageSet, topic, partition) {
    messageSet.forEach(function (m) {
        console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
    });
};

return consumer.init().then(function () {
    // Subscribe partitons 0 and 1 in a topic:
    return consumer.subscribe('kafka-test-topic', [0, 1], dataHandler);
});

Subscribe (or change subscription) to specific offset and limit maximum received MessageSet size:

consumer.subscribe('kafka-test-topic', 0, {offset: 20, maxBytes: 30}, dataHandler)

Subscribe to latest or earliest offsets in the topic/parition:

consumer.subscribe('kafka-test-topic', 0, {time: Kafka.LATEST_OFFSET}, dataHandler)
consumer.subscribe('kafka-test-topic', 0, {time: Kafka.EARLIEST_OFFSET}, dataHandler)

Subscribe to all partitions in a topic:

consumer.subscribe('kafka-test-topic', dataHandler)

Commit offset(s) (V0, Kafka saves these commits to Zookeeper)

consumer.commitOffset([
  {
      topic: 'kafka-test-topic',
      partition: 0,
      offset: 1
  },
  {
      topic: 'kafka-test-topic',
      partition: 1,
      offset: 2
  }
])

Fetch commited offset(s)

consumer.fetchOffset([
  {
      topic: 'kafka-test-topic',
      partition: 0
  },
  {
      topic: 'kafka-test-topic',
      partition: 1
  }
]).then(function (result) {
/*
[ { topic: 'kafka-test-topic',
    partition: 1,
    offset: 2,
    metadata: null,
    error: null },
  { topic: 'kafka-test-topic',
    partition: 0,
    offset: 1,
    metadata: null,
    error: null } ]
*/
});

SimpleConsumer options

  • groupId - group ID for comitting and fetching offsets. Defaults to 'no-kafka-group-v0'
  • maxWaitTime - maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the fetch request is issued, defaults to 100ms
  • idleTimeout - timeout between fetch calls, defaults to 1000ms
  • minBytes - minimum number of bytes to wait from Kafka before returning the fetch call, defaults to 1 byte
  • maxBytes - maximum size of messages in a fetch response, defaults to 1MB
  • clientId - ID of this client, defaults to 'no-kafka-client'
  • connectionString - comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'
  • reconnectionDelay - controls optionally progressive delay between reconnection attempts in case of network error:
    • min - minimum delay, used as increment value for next attempts, defaults to 1000ms
    • max - maximum delay value, defaults to 1000ms
  • recoveryOffset - recovery position (time) which will used to recover subscription in case of OffsetOutOfRange error, defaults to Kafka.LATEST_OFFSET
  • asyncCompression - boolean, use asynchronouse decompression instead of synchronous, defaults to false
  • handlerConcurrency - specify concurrency level for the consumer handler function, defaults to 10
  • connectionTimeout - timeout for establishing connection to Kafka in milliseconds, defaults to 3000ms
  • socketTimeout - timeout for Kafka connection socket in milliseconds, defaults to 0 (disabled)

GroupConsumer (new unified consumer API)

Specify an assignment strategy (or use no-kafka built-in consistent or round robin assignment strategy) and subscribe by specifying only topics. Elected group leader will automatically assign partitions between all group members.

Example:

var Promise = require('bluebird');
var consumer = new Kafka.GroupConsumer();

var dataHandler = function (messageSet, topic, partition) {
    return Promise.each(messageSet, function (m){
        console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
        // commit offset
        return consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'});
    });
};

var strategies = [{
    subscriptions: ['kafka-test-topic'],
    handler: dataHandler
}];

consumer.init(strategies); // all done, now wait for messages in dataHandler

Assignment strategies

no-kafka provides three built-in strategies:

  • Kafka.WeightedRoundRobinAssignmentStrategy weighted round robin assignment (based on wrr-pool).
  • Kafka.ConsistentAssignmentStrategy which is based on a consistent hash ring and so provides consistent assignment across consumers in a group based on supplied metadata.id and metadata.weight options.
  • Kafka.DefaultAssignmentStrategy simple round robin assignment strategy (default).

Using Kafka.WeightedRoundRobinAssignmentStrategy:

var strategies = {
    subscriptions: ['kafka-test-topic'],
    metadata: {
        weight: 4
    },
    strategy: new Kafka.WeightedRoundRobinAssignmentStrategy(),
    handler: dataHandler
};
// consumer.init(strategies)....

Using Kafka.ConsistentAssignmentStrategy:

var strategies = {
    subscriptions: ['kafka-test-topic'],
    metadata: {
        id: process.argv[2] || 'consumer_1',
        weight: 50
    },
    strategy: new Kafka.ConsistentAssignmentStrategy(),
    handler: dataHandler
};
// consumer.init(strategies)....

Note that each consumer in a group should have its own and consistent metadata.id.

You can also write your own assignment strategy by inheriting from Kafka.DefaultAssignmentStrategy and overwriting assignment method.

GroupConsumer options

  • groupId - group ID for comitting and fetching offsets. Defaults to 'no-kafka-group-v0.9'
  • maxWaitTime - maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the fetch request is issued, defaults to 100ms
  • idleTimeout - timeout between fetch calls, defaults to 1000ms
  • minBytes - minimum number of bytes to wait from Kafka before returning the fetch call, defaults to 1 byte
  • maxBytes - maximum size of messages in a fetch response
  • clientId - ID of this client, defaults to 'no-kafka-client'
  • connectionString - comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'
  • reconnectionDelay - controls optionally progressive delay between reconnection attempts in case of network error:
    • min - minimum delay, used as increment value for next attempts, defaults to 1000ms
    • max - maximum delay value, defaults to 1000ms
  • sessionTimeout - session timeout in ms, min 6000, max 30000, defaults to 15000
  • heartbeatTimeout - delay between heartbeat requests in ms, defaults to 1000
  • retentionTime - offset retention time in ms, defaults to 1 day (24 * 3600 * 1000)
  • startingOffset - starting position (time) when there is no commited offset, defaults to Kafka.LATEST_OFFSET
  • recoveryOffset - recovery position (time) which will used to recover subscription in case of OffsetOutOfRange error, defaults to Kafka.LATEST_OFFSET
  • asyncCompression - boolean, use asynchronouse decompression instead of synchronous, defaults to false
  • handlerConcurrency - specify concurrency level for the consumer handler function, defaults to 10
  • connectionTimeout - timeout for establishing connection to Kafka in milliseconds, defaults to 3000ms
  • socketTimeout - timeout for Kafka connection socket in milliseconds, defaults to 0 (disabled)

GroupAdmin (consumer groups API)

Offers methods:

  • listGroups - list existing consumer groups
  • describeGroup - describe existing group by its id
  • fetchConsumerLag - fetches consumer lag for topics/partitions

listGroups, describeGroup:

var admin = new Kafka.GroupAdmin();

return admin.init().then(function(){
    return admin.listGroups().then(function(groups){
        // [ { groupId: 'no-kafka-admin-test-group', protocolType: 'consumer' } ]
        return admin.describeGroup('no-kafka-admin-test-group').then(function(group){
            /*
            { error: null,
              groupId: 'no-kafka-admin-test-group',
              state: 'Stable',
              protocolType: 'consumer',
              protocol: 'DefaultAssignmentStrategy',
              members:
               [ { memberId: 'group-consumer-82646843-b4b8-4e91-94c9-b4708c8b05e8',
                   clientId: 'group-consumer',
                   clientHost: '/192.168.1.4',
                   version: 0,
                   subscriptions: [ 'kafka-test-topic'],
                   metadata: <Buffer 63 6f 6e 73 75 6d 65 72 2d 6d 65 74 61 64 61 74 61>,
                   memberAssignment:
                    { _blength: 44,
                      version: 0,
                      partitionAssignment:
                       [ { topic: 'kafka-test-topic',
                           partitions: [ 0, 1, 2 ] },
                          ],
                      metadata: null } },
                  ] }
             */
        })
    });
});

fetchConsumerLag:

var admin = new Kafka.GroupAdmin();

return admin.init().then(function(){
    return admin.fetchConsumerLag('no-kafka-admin-test-group', [{
        topicName: 'kafka-test-topic',
        partitions: [0, 1, 2]
    }]).then(function (consumerLag) {
        /*
        [ { topic: 'kafka-test-topic',
            partition: 0,
            offset: 11300,
            highwaterMark: 11318,
            consumerLag: 18 },
          { topic: 'kafka-test-topic',
            partition: 1,
            offset: 10380,
            highwaterMark: 10380,
            consumerLag: 0 },
          { topic: 'kafka-test-topic',
            partition: 2,
            offset: -1,
            highwaterMark: 10435,
            consumerLag: null } ]
         */
    });
});

Note that group consumer has to commit offsets first, in order for consumerLag to be available. Otherwise the offset will be set to -1.

Compression

no-kafka supports both SNAPPY and Gzip compression. To use SNAPPY you must install the snappy NPM module in your project.

Enable compression in Producer:

var Kafka = require('no-kafka');

var producer = new Kafka.Producer({
    clientId: 'producer',
    codec: Kafka.COMPRESSION_SNAPPY // Kafka.COMPRESSION_NONE, Kafka.COMPRESSION_SNAPPY, Kafka.COMPRESSION_GZIP
});

Alternatively just send some messages with specified compression codec (overwrites codec set in contructor):

return producer.send({
    topic: 'kafka-test-topic',
    partition: 0,
    message: { value: 'p00' }
}, { codec: Kafka.COMPRESSION_SNAPPY })

By default no-kafka will use asynchronous compression and decompression. Disable async compression/decompression (and use sync) with asyncCompression option (synchronous Gzip is not availble in node < 0.11):

Producer:

var producer = new Kafka.Producer({
    clientId: 'producer',
    asyncCompression: false, // use sync compression/decompression
    codec: Kafka.COMPRESSION_SNAPPY
});

Consumer:

var consumer = new Kafka.SimpleConsumer({
    idleTimeout: 100,
    clientId: 'simple-consumer',
    asyncCompression: true
});

Connection

Initial Brokers

no-kafka will connect to the hosts specified in connectionString constructor option unless it is omitted. In this case it will use KAFKA_URL environment variable or fallback to default kafka://127.0.0.1:9092. For better availability always specify several initial brokers: 10.0.1.1:9092,10.0.1.2:9092,10.0.1.3:9092. The / prefix is optional.

Disconnect / Timeout Handling

All network errors are handled by the library: producer will retry sending failed messages for configured amount of times, simple consumer and group consumer will try to reconnect to failed host, update metadata as needed as so on.

SSL

To connect to Kafka with SSL endpoint enabled specify SSL certificate and key options to load cert/key from files or provide certificate/key directly as strings:

Loading certificate and key from file:

var producer = new Kafka.Producer({
  connectionString: 'kafka://127.0.0.1:9093', // should match `listeners` SSL option in Kafka config
  ssl: {
    cert: '/path/to/client.crt',
    key: '/path/to/client.key'
  }
});

Specifying certificate and key directly as strings:

var producer = new Kafka.Producer({
  connectionString: 'kafka://127.0.0.1:9093', // should match `listeners` SSL option in Kafka config
  ssl: {
    cert: '-----BEGIN CERTIFICATE-----\nMIIChTCCAe4C...............',
    key: '-----BEGIN RSA PRIVATE KEY-----\nMIIEowIBA.......'
  }
});

Other Node.js SSL options are available such as rejectUnauthorized, secureProtocol, ciphers, etc. See Node.js tls.createServer method documentation for more details.

It is also possible to use KAFKA_CLIENT_CERT and KAFKA_CLIENT_CERT_KEY environment variables to specify SSL certificate and key:

KAFKA_URL=kafka://127.0.0.1:9093 KAFKA_CLIENT_CERT=./test/ssl/client.crt KAFKA_CLIENT_CERT_KEY=./test/ssl/client.key node producer.js

Or as text strings:

KAFKA_URL=kafka://127.0.0.1:9093 KAFKA_CLIENT_CERT=`cat ./test/ssl/client.crt` KAFKA_CLIENT_CERT_KEY=`cat ./test/ssl/client.key` node producer.js

Using a self signed certificate:

Kafka.Producer({
  connectionString: 'kafka://127.0.0.1:9093', // should match `listeners` SSL option in Kafka config
  ssl: {
    ca: '/path/to/my-cert.crt' // or fs.readFileSync('my-cert.crt')
  }
});

It is also possible to use KAFKA_CLIENT_CA environment variable to specify a self signed SSL certificate:

KAFKA_URL=kafka://127.0.0.1:9093 KAFKA_CLIENT_CA=./test/ssl/my-cert.crt node producer.js

Remapping Broker Addresses

Sometimes the advertised listener addresses for a Kafka cluster may be incorrect from the client, such as when a Kafka farm is behind NAT or other network infrastructure. In this scenario it is possible to pass a brokerRedirection option to the Producer, SimpleConsumer or GroupConsumer.

The value of the brokerDirection can be either:

  • A function returning a tuple of host (string) and port (integer), such as:

    brokerRedirection: function (host, port) {
        return {
            host: host + '.somesuffix.com', // Fully qualify
            port: port + 100,               // Port NAT
        }
    }
    
  • A simple map of connection strings to new connection strings, such as:

    brokerRedirection: {
        'some-host:9092': 'actual-host:9092',
        'kafka://another-host:9092': 'another-host:9093',
        'third-host:9092': 'kafka://third-host:9000'
    }
    

A common scenario for this kind of remapping is when a Kafka cluster exists within a Docker application, and the internally advertised names needed for container to container communication do not correspond to the actual external ports or addresses when connecting externally via other tools.

Reconnection delay

In case of network error which prevents further operations no-kafka will try to reconnect to Kafka brokers in a endless loop with the optionally progressive delay which can be configured with reconnectionDelay option.

Logging

You can differentiate messages from several instances of producer/consumer by providing unique clientId in options:

var consumer1 = new Kafka.GroupConsumer({
    clientId: 'group-consumer-1'
});
var consumer2 = new Kafka.GroupConsumer({
    clientId: 'group-consumer-2'
});

=>

2016-01-12T07:41:57.884Z INFO group-consumer-1 ....
2016-01-12T07:41:57.884Z INFO group-consumer-2 ....

Change the logging level:

var consumer = new Kafka.GroupConsumer({
    clientId: 'group-consumer',
    logger: {
        logLevel: 1 // 0 - nothing, 1 - just errors, 2 - +warnings, 3 - +info, 4 - +debug, 5 - +trace
    }
});

Send log messages to Logstash server(s) via UDP:

var consumer = new Kafka.GroupConsumer({
    clientId: 'group-consumer',
    logger: {
        logstash: {
            enabled: true,
            connectionString: '10.0.1.1:9999,10.0.1.2:9999',
            app: 'myApp-kafka-consumer'
        }
    }
});

You can overwrite the function that outputs messages to stdout/stderr:

var consumer = new Kafka.GroupConsumer({
    clientId: 'group-consumer',
    logger: {
        logFunction: console.log
    }
});

Topic Creation

There is no Kafka API call to create a topic. Kafka supports auto creating of topics when their metadata is first requested (auto.create.topic option) but the topic is created with all default parameters, which is useless. There is no way to be notified when the topic has been created, so the library will need to ping the server with some interval. There is also no way to be notified of any error for this operation. For this reason, having no guarantees, no-kafka won't provide topic creation method until there will be a specific Kafka API call to create/manage topics.

License: MIT

kafka's People

Contributors

apalchys avatar blgm avatar cjlarose avatar cmaddalozzo avatar foosvald avatar frosas avatar hunterloftis avatar jdconley avatar juliuszaromskis avatar karneyeu avatar mnorkin avatar oleksiyk avatar p-thorpe avatar shaharmor avatar shvetsm avatar stas-demydiuk avatar steve-gray avatar tulios avatar wdullaer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka's Issues

Implement batching of produce requests

This issue is to collection ideas and details of further implementation.

Basically, what I propose is using two (configurable) characteristics which would control batching:

  1. batchSize Number. Max size of batch buffer. Once current buffer hits this size, we send batch to Broker. Default: 16384 or 16kb. In environment where amount of produce requests is unpredictable this value should be lowered.
  2. batchLinger Number. Amount of time in ms to wait before sending batch to broker. If batchSize is not hit and timeout reached its end, batch is sent. Default: 0 meaning that such timeout is disabled.

Considerations:

  • Maybe we want to disable batching by default? Currently I'm sticking to Stream-like behavior ( highWaterMark option). As pointed above, there are environment where one can't predict amount of produce requests. But in such case batchLinger can be set to 100.
  • Move options to batch object?

Batch commiting

I see you updated the docs to suggest not using Promise.map on a messageSet in the Group Consumer handler (1a63120), but the commitOffset method signature still allows for an array of commits.

/**
 * Commit (save) processed offsets to Kafka
 *
 * @param  {Object|Array} commits [{topic, partition, offset, metadata}]
 * @return {Promise}
 */
GroupConsumer.prototype.commitOffset = function (commits) {
    var self = this;
    return self.client.offsetCommitRequestV2(self.options.groupId, self.memberId, self.generationId,
        self._prepareOffsetRequest('commit', commits));
};

Is it not safe, or otherwise undesirable, to batch commit the entire messageSet in the handler function for the same reason you suggest not doing concurrent commits of individual messages?

Queue Shift on .end() causes null dereference

There's a small glitch when closing a connection shortly after it opens where Connection._recieve will get shift().resolve() called on a null.

The error stack looks like:

stream.js:74
throw er; // Unhandled stream error in pipe.
^
TypeError: Cannot read property 'resolve' of undefined
at Connection._receive (d:\Github\kafka-poc\node_modules\no-kafka\lib\connection.js:167:11)
at emitOne (events.js:77:13)
at Socket.emit (events.js:169:7)
at readableAddChunk (_stream_readable.js:146:16)
at Socket.Readable.push (_stream_readable.js:110:10)
at TCP.onread (net.js:523:20)

Use of 'strategy' on GroupConsumer

I'm working on porting my application from kafka-node to no-kafka. So far everything has been much easier to use. Thanks!

I'm a little uncertain what the strategy property of the GroupConsumer init is for. I've looked through the tests and code a little bit, and it looks like it is just used as a key when storing multiple consumer strategies and handlers, but all of the tests use 'TestStrategy'. It would be a nice addition to the docs to explain what this is for.

MIT License

Shall we clearly state in LICENSE & README that this module is licensed under MIT? The only reference I find now is in package.json.

Collaboration

Hello @oleksiyk!

I'm moving discussion from SOHU-Co/kafka-node#309 here. At least I came to thinking that author of kafka-node has no time or wish to re-organize his project for the good of everyone. Also I'm seeing kind of regress in code and quality of module after 0.3.0 was released. And your module seems to be very promising. So we'd like to help ya with kind of roadmap, splitting tasks, covering with tests, review, etc. Are you interested or not? ๐Ÿ˜‰

/cc @ismriv @paddy3883

Too many times of `Rejoining group` lead to `Maximum call stack size exceeded`

In the beginning, it is confuse me when i open my laptop the error down here was always in the console

RangeError: Maximum call stack size exceeded

But, just now , when i rejoin the wifi network, the error reappear again.
After couple times testing, I am sure that the rejoin would cause the error.

There are some console info.

2016-03-04T08:53:59.064Z INFO group-name-56d94c8a737bb0fb808ade52 Joined group group-name generationId 3 as group-name-56d94c8a737bb0fb808ade52-2fc5cefc-c9b4-41d6-91e0-438075a26395
2016-03-04T08:53:59.382Z ERROR group-name-56d94c8a737bb0fb808ade55 Sending heartbeat failed:  KafkaError: UnknownMemberId: Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.
{ [KafkaError: Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.]
  name: 'KafkaError',
  code: 'UnknownMemberId',
  message: 'Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.' }
    at Object.exports.byCode (/path/to/program/node_modules/no-kafka/lib/errors.js:84:12)
    at _Reader.Protocol.define.read (/path/to/program/node_modules/no-kafka/lib/protocol/common.js:87:23)
    at _Reader._read (/path/to/program/node_modules/bin-protocol/lib/reader.js:41:18)
    at apply (/path/to/program/node_modules/bin-protocol/node_modules/lodash/lodash.js:419:27)
    at _Reader.wrapper [as ErrorCode] (/path/to/program/node_modules/bin-protocol/node_modules/lodash/lodash.js:4499:16)
    at _Reader.Protocol.define.read (/path/to/program/node_modules/no-kafka/lib/protocol/group_membership.js:275:14)
    at _Reader._read (/path/to/program/node_modules/bin-protocol/lib/reader.js:41:18)
    at apply (/path/to/program/node_modules/bin-protocol/node_modules/lodash/lodash.js:418:27)
    at _Reader.wrapper [as HeartbeatResponse] (/path/to/program/node_modules/bin-protocol/node_modules/lodash/lodash.js:4499:16)
    at /path/to/program/node_modules/no-kafka/lib/client.js:541:61
From previous event:
    at /path/to/program/node_modules/no-kafka/lib/client.js:540:40
    at processImmediate [as _immediateCallback] (timers.js:383:17)
From previous event:
    at Client.heartbeatRequest (/path/to/program/node_modules/no-kafka/lib/client.js:531:48)
    at GroupConsumer._heartbeat (/path/to/program/node_modules/no-kafka/lib/group_consumer.js:201:24)
    at /path/to/program/node_modules/no-kafka/lib/group_consumer.js:209:25
    at Timer.listOnTimeout (timers.js:92:15)
[at apply (/path/to/program/node_modules/no-kafka/node_modules/lodash/lodash.js:419:27)]
2016-03-04T08:54:00.060Z INFO group-name-56d94c8a737bb0fb808ade52 Rejoining group on RebalanceInProgress
/path/to/program/node_modules/bluebird/js/release/promise.js:533
Promise.prototype._settlePromise = function(promise, handler, receiver, value) {
                                           ^

RangeError: Maximum call stack size exceeded
    at Promise._settlePromise (/path/to/program/node_modules/bluebird/js/release/promise.js:533:44)
    at Promise._settlePromise0 (/path/to/program/node_modules/bluebird/js/release/promise.js:605:10)
    at Promise._settlePromises (/path/to/program/node_modules/bluebird/js/release/promise.js:684:18)
    at Promise._fulfill (/path/to/program/node_modules/bluebird/js/release/promise.js:629:18)
    at Promise._resolveCallback (/path/to/program/node_modules/bluebird/js/release/promise.js:424:57)
    at Promise._settlePromiseFromHandler (/path/to/program/node_modules/bluebird/js/release/promise.js:515:17)

Doesn't know it is related about he newest change log of [email protected], which have said something about stack overflow.

Auto create topic

Does there is any plan or possibility that no-kafka could support auto.create.topic like the old time?

producer.send function options

Thank you for the awesome module supporting so many neat features!

I'm struggling with its API though. It's not clear from the README what are the producer.send() function options.

  1. The first argument is a message. Although, the message can have property message. So it's unclear what "message" is. Either the first argument, or the property of the first argument.
  2. Sometimes it says that it's not message but messages. I assume, but not sure, that the first argument can be an array.
  3. And the most important! The second options argument possible properties are not listed anywhere. All I see is the Producer options. Could you update the README with the .send function options please?

Thank you so much

group consumers getting the same messages

I was trying this out - running the group consumer example in the README.md, so the default round-robin strategy. Running the consumer in two different processes - the console messages seem reasonable, joining and electing leaders. But pushing messages through tends to show up in both consumers - is this expected?

Low producer request timeout default?

I'm wondering why the default producer timeout is set to 100ms.

This timeout is passed to the client and then on to the ProduceRequest call. According to the Kafka Protocol guide and the documentation page:

The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include the network latency of the request.

Returning an error to the client before all brokers ACK may lead developers to manually retry the write and cause data duplication.

The default Kafka setting is 30 seconds. Why set it so low in this driver?

Producer retry logic does not preserve order (during a leader failure/election)

Steps to reproduce

  1. start 2 kafka brokers (ports 9092, 9093)

  2. create mytopic with replication-factor of 2

     ./bin/kafka-topics.sh --zookeeper localhost --topic mytopic --create --partitions 1 --replication-factor 2
  3. start a consumer in a loop

     var consumer = new Kafka.SimpleConsumer({connectionString: 'localhost:9092, localhost:9093'})
     consumer.init().then(function () {
       consumer.subscribe('mytopic', [], function (messageSet, topic, partition) {
         messageSet.forEach(function (m) {
           console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
         });
       });
     });
  4. configure no-kafka producer with max attempts and delay long enough to survive rebalancing

     var producer = new NoKafka.Producer({
       connectionString: 'localhost:9092, localhost:9093',
       clientId: 'producer',
       timeout: 100,
       retries: {
           attempts: 10,
           delay: 1000
       }
     });
  5. run no-kafka producer in loop

     var counter = 0;
     setInterval(function() {
       producer.send({
           topic: 'mytopic',
           message: { value: 'message #' + counter },,
           partition: 0
       })
       .catch(function(err) { console.log('promise rejection', err); })
       .then(function(err) { console.log('promise fulfillment', err.error); });
     }, 1000)
  6. kill -9 whichever broker is the leader for mytopic:0

  7. Observe the messages be sent in reverse order when the new leader is elected

image

Topic Creation/Management API's

Request for some API's for topic introspection and management, such as:

  • Creation
  • Partition management
  • Compaction
  • Viewing state of broker farm.

Or do you feel that no-kafka is more a consumption/production solution? Interested in what the roadmap might be.

Error Handling / Reconnect

Thanks for the awesome library!

Question applies for both Publisher and Subscriber. What would be the best way to:

  1. Handle Errors (connection drop, network failure)
  2. Reconnect on Error (or check connection every xxx period)

.end() causes exceptions to be raised to console during shutdown

When calling .end(), there's lots of error logging that ends up in the console, such as:

2016-01-07T23:27:11.935Z ERROR Error: Kafka server has closed connection
[Error: Kafka server has closed connection]
at d:\Github\kafka-poc\node_modules\no-kafka\lib\connection.js:81:30
at Array.forEach (native)
at Connection._disconnect (d:\Github\kafka-poc\node_modules\no-kafka\lib\connection.js:80:16)
... blah blah

I was thinking perhaps for the .end() method we might pass a 'skipRejections' flag to _disconnect and make it not bother calling the explicit rejections for promises. Ordinarily on a disconnect we'd want to maintain state, but it seems redundant if we are doing a shutdown since there is nothing we should want users to care about on the instance after an .end(). Something like:

// events call directly to make sure internal state is maintained
Connection.prototype._disconnect = function (err, skipRejections) {
    if (!this.connected) {
        return;
    }

    this.socket.end();
    this.connected = false;

    if (!skipRejections) {
      this.queue.forEach(function (t) {
          t.reject(err ? err : new Error('Kafka server has closed connection'));
      });
    }

    this.queue = [];
};

Then:

Connection.prototype.close = function () {
    // console.log('Closing connection', this.host + ':' + this.port);
    this.auto_connect = false;
    this._disconnect(null, true);
};

Producer memory leak when connection to Kafka is lost

If a producer suddenly loses connection to all brokers:

  1. There's a memory leak - the producer will continue to allocate a _try closure for each call to producer.send
  2. Meanwhile, the client is in a loop for updateMetadata (but doesn't throw any errors AKA "reject any promises")

Ideally (and maybe this is already possible), we could subscribe (or catch) to those failures after a circuit breaker pattern has been tripped and then stop calling producer.send

OR... the producer.send would reject a promise after the circuit breaker has been tripped (might be the simplest and most intuitive).

If we don't have a mechanism for subscribing to connection failures, our no-kafka processes are just going to hit the VM memory limit until the broker(s) is/are available again.

Manually close GroupConsumer

Is there any way to force a consumer to close connection?
Something like what kafka-node does.
The use case is to be able to capture SIGINT|SIGTERM and close the GroupConsumer before baling.

Thanks!

Promise.map() without "return"

Version: 2.0.1
Use-case: using SimpleCustomer to receive events
Platform: Windows
Kafka: 0.9.0

Stack-trace:
Warning: a promise was created in a Promise.map handler but was not returned from it
at [...]\node_modules\no-kafka\lib\base_consumer.js:69:23
From previous event:
at d:\dev\attractions-reboot.git\attractions-service\node_modules\no-kafka\lib\base_consumer.js:59:28
at processImmediate as _immediateCallback

It seems like base_consumer.js:69 sould be prefixed by a "return"

Stream interface?

Producer / Consumers could benefit from a stream interface to increase interop with any NodeJS stream.

Thoughts?

Accept Delegate for Logger Activity

In some cases it's desirable to have a custom/central logging component for an application that allows collection of messages from the external/third-party libraries (such as no-kafka). Suggestion here is to make the constructors of the no-kafka library accept an additional parameter on:

  • GroupConsumer
  • SimpleConsumer
  • Producer
    etc

This parameter takes the form of an object with .log/.warn/.debug methods - and then defaults back to nice-simple-logger when no value for this property is passed. This allows consuming code to pass in custom logging handlers, such as:

const csmr = new GroupConsumer({
    logger: {
          warn: function (msg) {
                  // lorem ipsum     
          }
    }
});

The object should be validated to ensure that all required operations are present as typeof === 'function' and then any missing members should be defaulted to passing through to NSL, allowing only specific types of events to be collected.

Handling long leader election and/or broker outages for the PRODUCER

Steps to reproduce

  1. start 2 kafka brokers (ports 9092, 9093)

  2. create mytopic with replication-factor of 2

     ./bin/kafka-topics.sh --zookeeper localhost --topic mytopic --create --partitions 1 --replication-factor 2
  3. configure no-kafka producer with max attempts of 1

     var producer = new NoKafka.Producer({
       connectionString: 'localhost:9092, localhost:9093',
       clientId: 'producer',
       timeout: 50,
       retries: {
           attempts: 1,
           delay: 50
       }
     });
  4. run no-kafka producer in loop

     setInterval(function() {
       producer.send({
           topic: 'mytopic',
           message: { value: 'value' },
           partition: 0
       })
       .catch(function(err) { console.log('promise rejection', err); })
       .then(function(err) { console.log('promise fulfillment', err.error); });
     }, 1000)
  5. kill -9 whichever broker is the leader for mytopic:0

  6. witness the producer loop get into a dead/inert failed state

Explanation

The simple consumer's fetch loop ensures that metadata is updated despite arbitrarily long broker outages.

The producer, however, gets into an inert state after a long leader election or broker outage (as in, the outage lasts longer than the default of 3 attempts with 1000ms delay) in which case the promise is fulfilled with an error NoKafkaConnectionError (rather than rejected) and repeated calls to producer.send continues to fail (because the metadata is not updated).

If the original broker comes back online, the error, expectedly changes to: NotLeaderForPartition (but still as a promise fulfillment rather than promise rejection)

Do you have any recommendations on how best to handle this? I can think of a few:

  • solutions by changing the caller
    1. merely increase the attempts for the producer to something arbitrarily large
    2. update no-kafka to reject the promise allowing the caller to decide what to do (reconnect or fail permanently) (or... was it intentional to return the error via the promise fulfillment?) in either case, reconnect or fetch metadata on failure at caller's discretion
  • solutions by changing no-kafka
    1. update no-kafka to apply a throttled metadata update on ALL producer.send's
    2. update no-kafka to apply a throttled metadata update on ALL ERROR'D producer.send's (seemingly most sensible)
    3. update no-kafka to apply a circuit breaker like pattern on ALL ERROR'D producer.sends

Producer sometimes throws (rejects promise) `UnknownTopicOrPartition`

When a Kafka leader is kill -9'd, the producer has inconsistent behavior.

  • most of the time the producer.send() promise is fulfilled and no error is returned
  • some of the time the producer.send() promise is rejected with UnknownTopicOrPartition. It looks like it might be coming from here
    return self.updateMetadata().then(_try);
    (the metadata had just finished updating but was set to the now kill -9'd broker)

Assuming my observations are correct, I would further assume that Producer.prototype._send needs some kind of wrapper/retry around _prepareProduceRequest - but maybe this design was intentional.

No clean way to close client/consumer?

Hey,

When writing unit tests, there's no obviously documented way to close the consumers. This is fine for how the tests are written for the library itself, but if you're running them as part of a gulp job then the tests will never let the node process exit, because there's callbacks hanging around.

-Steve

Consumer commits offset regardless of handler failures

s.handler(p.messageSet, p.topic, p.partition, p.highwaterMarkOffset)
.catch(function (err) {
self.client.warn('Handler for', p.topic + ':' + p.partition, 'failed with', err);
})
.finally(function () {
s.paused = false;
s.offset = _.last(p.messageSet).offset + 1; // advance offset position
});

It seems like the finally handler there is incrementing offsets regardless of handler success. This means that if the handler chooses not to commit offsets to kafka due to failures in the handler itself, the consumer will still continue to consume messages, leading to potential divergence in persisted offsets and processed offsets.
What is the recommended way of not advancing the offset on failure?

Moreover, if a batch of payloads fails some of the way through, a handler can commit the last successful offset. In this scenario, shouldn't the consumer continue from the offset after this last successful one until the handler commits a greater offset?

Reuse same Client instance

Would it be possible to implement the solution to reuse the Client instance between Consumer and Producers?

For example, right now I'm looking at the topicMetadata and need that before I can make one of my Consumers, so I thought it'd be nice to simply make a new Client, wait for it to init, and grab the metadata, and then reuse the same client for my new Consumer.

No initial hosts to connect

Connection string needs to be now kafka://host:port otherwise it fails in parsing the host and port:

lib/client.js
`
self.initialBrokers = self.options.connectionString.split(',').map(function (hostStr) {
var parsed = url.parse(hostStr);
var config = {
host: parsed.hostname,
port: parsed.port
};

    return config.host && config.port ? new Connection(config) : undefined;
});

`

Keyed Messages - Expects partition?

AFAIK, while sending out keyed messages, it should automatically be distributed to a partition based on the key, and I guess we don't need to specify the partition explicitly?

So, while sending out keyed messages, via no-kafka it's expecting a partition, I get an error Missing or invalid partition.

Is this expected?

Support for Keyed Messages in Producer

The current Producer doesn't support Keyed Messages. We use log compaction in a number of topics, and this would prevent us from fully moving to the new driver. Is this feature in the roadmap?

Great improvements in the last weeks @oleksiyk!

crash on producer creation

When creating a new producer without options like in the example, my API crashes with this error:

[...]/node_modules/no-kafka/lib/producer.js:28
    this.partitioner = options.partitioner;
                              ^

TypeError: Cannot read property 'partitioner' of undefined
    at new Producer ([...]/node_modules/no-kafka/lib/producer.js:28:31)
    at ...

GroupConsumer does not support starting group at offset 0

Hello,

I've noticed when adding new GroupConsumers to a topic for watching, they start watching from the current position of the topic and not the start position. This means only new data from the time the consumer is added is reported on, which is often not the requirement (i.e. Adding a new consumer to read a log from start to finish, and then watch the tail).

Is there a flag or option to make the initial offset be 0 for a new consumer?

-Steve

When connection disappears the `retries` option is not taken into account

When connection disappears or if never was on the first place then the NoKafkaConnectionError is thrown.
ERROR no-kafka-client Metadata request failed: NoKafkaConnectionError [127.0.0.1:9092]: Error: connect ECONNREFUSED

The retries.delay and retries.attempts are not used in that case. The default 1000 value is used instead.
https://github.com/oleksiyk/kafka/blob/master/lib/client.js#L155

Can submit a PR which uses the delay provided in the Producer options. Otherwise, please fix. Thanks!

logFunction implementation

I've tried a few things, and can't seem to find anywhere this is implemented with the simple logger.

Would it be possible to use something like winston, or bunyan, and simply just give it the logger object, rather than a logger function?

Simple producer throws error - Trying to access buffer beyond range

I am trying a simple Producer example with no-kakfa versioned 2.4.1. The entire code is as,

var Kafka = require('no-kafka');
var producer = new Kafka.Producer()

return producer.init().then(function(){
  return producer.send({
      topic: 'kafka-test-topic',
      partition: 0,
      message: {
          value: 'Hello!'
      }
  });
})
.then(function (result) {
  console.log('the kafka s post send result is ', result)
});

And it throws exception as,

Unhandled rejection RangeError: Trying to access beyond buffer length
    at checkOffset (buffer.js:582:11)
    at Buffer.readInt32BE (buffer.js:667:5)
    at _Reader.define.read (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bin-protocol/lib/index.js:101:47)
    at _Reader._read (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/lodash/lodash.js:427:27)
    at _Reader.wrapper (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/lodash/lodash.js:4675:16)
    at _Reader.Protocol.define.read (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/lib/protocol/produce.js:72:14)
    at _Reader._read (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/lodash/lodash.js:426:27)
    at _Reader.wrapper [as ProduceResponse] (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/lodash/lodash.js:4675:16)
    at /home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/lib/client.js:294:67
    at tryCatcher (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/util.js:16:23)
    at Promise._settlePromiseFromHandler (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/promise.js:503:31)
    at Promise._settlePromise (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/promise.js:560:18)
    at Promise._settlePromise0 (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/promise.js:605:10)
    at Promise._settlePromises (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/promise.js:684:18)
    at Async._drainQueue (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/async.js:126:16)
    at Async._drainQueues (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/async.js:136:10)
    at Async.drainQueues (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/async.js:16:14)
    at process._tickCallback (node.js:419:13)

Am i missing something here?

Possible race when connecting with Kafka/ZK (while Kafka/ZK are booting)

If I have a docker-compose configuration that has 3 containers I can reproduce a connection issue where nokafkaclient can't subscribe to anything:

services:
  zookeeper:
    image: "zookeeper:latest"
    ports:
      - "2181:2181"
  kafka:
    image: "kafka:latest"
    depends_on:
      - "zookeeper"
    ports:
      - "9092:9092"
  nokafkaclient:
    depends_on:
      - "zookeeper"
      - "kafka"

(the "depends_on" feature of docker-compose doesn't introduce delays or "waits", all it does is control order)

If docker-compose up nokafkaclient is run, the simple consumer's init's promise returns successfully, yet calling subscribe promise returns an error:

This request is for a topic or partition that does not exist on this broker.

  • despite the existence of that topic
  • and even with the logLevel at 5, there's no debug output at all
  • no amount of time passing auto resolves the issue

restarting the nokafkaclient will resolve it.

If using the simple consumer, should the metadata be manually refreshed at certain times, events or errors?

Group consumer question

Hi,

Im playing around with this lib but im not sure how the group consumers are supposed to work. Is it just to group a few partitions based on the selected strategy?

How can I round robin one producer over multiple consumers? Must I increment the partition after every message on the producer side?

Thank you!

Regards,
Riaan

Updating offset because of OffsetOutOfRange after every handler call

Hi Oleksiyk

I've been using no-kafka for couple of months and by far this is the best npm package for kafka.
I've recently switched to 2.2.1 and noticed one small issue with base_consumer (used as SimpleConsumer).

Each time dataHandler is invoked i receive a warning "Updating offset because of OffsetOutOfRange"
It's probably nothing but I checked that since 2.2.1 I don't have to add +1 to offset when subscribing to kafka partition.
I noticed that +1 is still used in finally() handler in base_consumer::_fetch.

If this is not an issue then sorry for trouble.

Thanks for the great work
Tomasz

Broker throws SchemaException: Error reading field 'topics'

Hi @oleksiyk, I've tried to run a producer + group consumer, but I'm running into some problems with the consumer. Using the same setup, the simple consumer works and I'm able to receive messages.

The error in question is:

[2016-02-03 23:04:34,155] ERROR Closing socket for /172.17.42.1 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
    at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
    at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
    at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
    at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
    at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
    at kafka.network.Processor.read(SocketServer.scala:450)
    at kafka.network.Processor.run(SocketServer.scala:340)
    at java.lang.Thread.run(Thread.java:745)

What I've done to setup the environment is:

$ docker run -d --name zookeeper jplock/zookeeper:3.4.6
$ docker run -d --name kafka --link zookeeper:zookeeper ches/kafka

$ ZK_IP=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' zookeeper)
$ KAFKA_IP=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' kafka)

$ docker run --rm ches/kafka kafka-topics.sh --create --topic test --replication-factor 1 --partitions 1 --zookeeper $ZK_IP:2181
Created topic "test".

My producer is:

var Kafka = require('no-kafka');
var producer = new Kafka.Producer({
  connectionString: process.env.KAFKA_IP + ':9092'
});

return producer.init().then(function(){
  return producer.send({
      topic: 'test',
      partition: 0,
      message: {
          value: 'Hello!'
      }
  });
}).then(console.log).then(process.exit);

And my group consumer:

var Kafka = require('no-kafka');
var consumer = new Kafka.GroupConsumer({
  connectionString: process.env.KAFKA_IP + ':9092'
});
var strategies = {
    strategy: 'TestStrategy',
    subscriptions: ['test']
};

consumer.on('data', function (messageSet, topic, partition) {
    messageSet.forEach(function (m) {
        console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
        // process each message and commit its offset
        consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'});
    });
});

return consumer.init(strategies).then(function() {
  // all done, now wait for messages in event listener
});

Running the consumer throws the following error: NoKafkaConnectionError (172.17.0.45:9092): Kafka server [172.17.0.45:9092] has closed connection, which is due to the above error taken from the Kafka error log file.

Any help or ideas would be appreciated!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.