Giter Site home page Giter Site logo

node-sinek's Introduction


node-sinek

license 5cf

kafka client (consumer + producer) polite out of the box

make it about them, not about you

  • Simon Sinek

info

  • promise based api
  • core builds kafka-node module (checkout for options & tweaking)
  • uses ConsumerGroup(s) means your kafka needs to be > 0.9.x ( - 0.10.2+)

offers

  • provides an incoming message flow control for consumers
  • provides a drain once for consumers
  • provides an easy api for producers
  • Documentation is still wip; checkout /test/int/Sinek.test.js

install

npm install --save sinek

test

//requires a localhost kafka broker + zookeeper @ localhost:2181
npm test

Usage

const {Kafka, Drainer, Publisher, PartitionDrainer} = require("sinek");

producer (Publisher)

const kafkaClient = new Kafka(ZK_CON_STR, LOGGER);
kafkaClient.becomeProducer([TEST_TOPIC], CLIENT_NAME, OPTIONS);

kafkaClient.on("ready", () => {
    producer = new Publisher(kafkaClient, PARTITION_COUNT); //partition count should be the default count on your brokers partiitons e.g. 30
    
    producer.send(topic, messages, partitionKey, partition, compressionType)
    producer.batch(topic, [])
    
    producer.appendBuffer(topic, identifier, object, compressionType)
    producer.flushBuffer(topic)
    
    //easy api that uses a KeyedPartitioner Type and identifies the
    //target partition for the object's identifier by itself
    //it also brings your payload (object) in perfect shape for 
    //a nicely consumeable topic
    //call producer.flushBuffer(topic) to batch send the payloads
    producer.bufferPublishMessage(topic, identifier, object, version, compressionType)
    producer.bufferUnpublishMessage(topic, identifier, object, version, compressionType)
    producer.bufferUpdatehMessage(topic, identifier, object, version, compressionType)
});

kafkaClient.on("error", err => console.log("producer error: " + err));

consumer (Drainer)

const kafkaClient = new Kafka(ZK_CON_STR, LOGGER);
kafkaClient.becomeConsumer([TEST_TOPIC], GROUP_ID, OPTIONS);

kafkaClient.on("ready", () => {
    consumer = new Drainer(kafkaClient, 1); //1 = thread/worker/parallel count
    
    consumer.drain((message, done) => {
        console.log(message);
        done();
    });
    
    consumer.stopDrain();
    
    consumer.drainOnce((message, done) => {
        console.log(message);
        done();
    }, DRAIN_THRESHOLD, DRAIN_TIMEOUT).then(r => {
        console.log("drain done: " + r);
    }).catch(e => {
        console.log("drain timeout: " + e);
    });
    
    consumer.resetConsumer([TEST_TOPIC]).then(_ => {});
});

kafkaClient.on("error", err => console.log("consumer error: " + err));

consumer (PartitionDrainer) [faster ~ runs a queue per topic partition]

const kafkaClient = new Kafka(ZK_CON_STR, LOGGER);
kafkaClient.becomeConsumer([TEST_TOPIC], GROUP_ID, OPTIONS);

kafkaClient.on("ready", () => {
    consumer = new PartitionDrainer(kafkaClient, 1); //1 = thread/worker/parallel count per partition
    
    //drain requires a topic-name and returns a promise 
    consumer.drain(TEST_TOPIC, (message, done) => {
        console.log(message);
        done();
    }).then(_ => ..).catch(e => console.log(e));
    
    consumer.stopDrain();
    
    //drainOnce requires a topic-name
    consumer.drainOnce(TEST_TOPIC, (message, done) => {
        console.log(message);
        done();
    }, DRAIN_THRESHOLD, DRAIN_TIMEOUT).then(r => {
        console.log("drain done: " + r);
    }).catch(e => {
        console.log("drain timeout: " + e);
    });
    
    consumer.resetConsumer([TEST_TOPIC]).then(_ => {});
});

kafkaClient.on("error", err => console.log("consumer error: " + err));

hints

  • interesting options for tweaking consumers
const OPTIONS = {
    sessionTimeout: 12500,
    protocol: ["roundrobin"],
    fromOffset: "latest", //earliest
    fetchMaxBytes: 1024 * 100,
    fetchMinBytes: 1,
    fetchMaxWaitMs: 100,
    autoCommit: true,
    autoCommitIntervalMs: 5000
};
  • remove and create topic api will require a special broker configuration or these will just result in nothing at all
drainer.removeTopics([]).then(..)
publisher.createTopics([]).then(..)
  • using the .getStats() functions on Drainer, Publisher or PartitionDrainer you can get some valueable insights into whats currently going on in your client

  • when using "Drainer" to consume and write upserts into a database that require ACID functionality and a build-up of models/message-payloads you must set the AsyncLimit of new Drainer(.., 1) to "1" or you will have trouble with data integrity

  • if your data is spread entity wise above partitions you can use the "PartitionDrainer" to drain multiple partitions at the same time

  • the "Publisher" offers a simple API to create such (keyed) partitioned topics

  • it is probably a good idea to spawn a Consumer per Topic

node-sinek's People

Contributors

krystianity avatar

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.