Giter Site home page Giter Site logo

confluentinc / confluent-kafka-javascript Goto Github PK

View Code? Open in Web Editor NEW
32.0 83.0 5.0 2.2 MB

Confluent's Apache Kafka JavaScript client

Home Page: https://www.npmjs.com/package/@confluentinc/kafka-javascript

License: MIT License

Makefile 0.29% JavaScript 70.49% Shell 0.38% Python 0.55% C++ 28.27% PowerShell 0.01%
confluent consumer javascript kafka kafka-client librdkafka producer

confluent-kafka-javascript's Introduction

Confluent's Javascript Client for Apache KafkaTM

confluent-kafka-javascript is Confluent's JavaScript client for Apache Kafka and the Confluent Platform. This is an early access library. The goal is to provide an highly performant, reliable and easy to use JavaScript client that is based on node-rdkafka yet also API compatible with KafkaJS to provide flexibility to users and streamline migrations from other clients.

This library leverages the work and concepts from two popular Apache Kafka JavaScript clients: node-rdkafka and KafkaJS. The core is heavily based on the node-rdkafka library, which uses our own librdkafka library for core client functionality. However, we leverage a promisified API and a more idiomatic interface, similar to the one in KafkaJS, making it easy for developers to migrate and adopt this client depending on the patterns and interface they prefer. This library currently uses librdkafka based off of the master branch.

This library is currently in early access and not meant for production use

This library is in active development, pre-1.0.0, and it is likely to have many breaking changes.

For this early-access release, we aim to get feedback from JavaScript developers within the Apache Kafka community to help meet your needs. Some areas of feedback we are looking for include:

  • Usability of the API compared to other clients
  • Migration experience from the node-rdkafka and KafkaJs
  • Overall quality and reliability

We invite you to raise issues to highlight any feedback you may have.

Within the early-access, only basic produce and consume functionality as well as the ability to create and delete topics are supported. All other admin client functionality is coming in future releases. See INTRODUCTION.md for more details on what is supported.

To use Schema Registry, use the existing kafkajs/confluent-schema-registry library that is compatible with this library. For a simple schema registry example, see sr.js. DISCLAIMER: Although it is compatible with confluent-kafka-javascript, Confluent does not own or maintain kafkajs/confluent-schema-registry, and the use and functionality of the library should be considered "as is".

Requirements

The following configurations are supported for this early access preview:

  • Any supported version of Node.js (The two LTS versions, 18 and 20, and the latest versions, 21 and 22).
  • Linux (x64 and arm64) - both glibc and musl/alpine.
  • macOS - arm64/m1.
  • Windows - x64 (experimentally available in EA).

Installation on any of these platforms is meant to be seamless, without any C/C++ compilation required.

In case your system configuration is not within the supported ones, a supported version of Python must be available on the system for the installation process. This is required for the node-gyp build tool..

$ npm install @confluentinc/kafka-javascript

Yarn and pnpm support is experimental.

Getting Started

Below is a simple produce example for users migrating from KafkaJS.

// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
const { Kafka } = require("@confluentinc/kafka-javascript").KafkaJS;

async function producerStart() {
    const kafka = new Kafka({
        kafkaJS: {
            brokers: ['<fill>'],
            ssl: true,
            sasl: {
                mechanism: 'plain',
                username: '<fill>',
                password: '<fill>',
            },
        }
    });

    const producer = kafka.producer();

    await producer.connect();

    console.log("Connected successfully");

    const res = []
    for (let i = 0; i < 50; i++) {
        res.push(producer.send({
            topic: 'test-topic',
            messages: [
                { value: 'v222', partition: 0 },
                { value: 'v11', partition: 0, key: 'x' },
            ]
        }));
    }
    await Promise.all(res);

    await producer.disconnect();

    console.log("Disconnected successfully");
}

producerStart();
  1. If you're migrating from kafkajs, you can use the migration guide.
  2. If you're migrating from node-rdkafka, you can use the migration guide.
  3. If you're starting afresh, you can use the quickstart guide.

An in-depth reference may be found at INTRODUCTION.md.

Contributing

Bug reports and early-access feedback is appreciated in the form of Github Issues. For guidelines on contributing please see CONTRIBUTING.md

confluent-kafka-javascript's People

Contributors

0x7f avatar alexander-alvarez avatar ankon avatar atamon avatar battlecow avatar cjlarose avatar codeburke avatar confluentjenkins avatar dchesterton avatar edoardocomar avatar emasab avatar fabianschmitthenner avatar garywilber avatar geoffreyhervet avatar iradul avatar jaaprood avatar jpdstan avatar macabu avatar mccaig avatar milindl avatar mimaison avatar nhaq-confluent avatar pchelolo avatar pthm avatar rusty0412 avatar sam-github avatar sgenoud avatar tvainika avatar webmakersteve avatar yunnysunny avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

confluent-kafka-javascript's Issues

Confluent connections interfere with KafkaJS connections when SSL is involved

This is an odd one.

We're attempting to phase-in use of the Confluent library to our app for new functionality. This app has existing functionality using KafkaJS, and the existing functionality is in production. Given the "early access" nature of the Confluent library, it makes sense to adopt it for the new functionality, but retain KafkaJS for the existing functionality for now.

The new functionality was added, integration tests showed everything working, but as soon as we deployed to an environment using external Kafka cluster(s) the KafkaJS connections immediately started failing with:

Connection error: Client network socket disconnected before secure TLS connection was established {"broker":"XXXX.westus2.azure.confluent.cloud:9092","clientId":"kafkajs"}: Error: Client network socket disconnected before secure TLS connection was established

I narrowed it down to starting multiple consumers for both KafkaJS and Confluent concurrently... with SSL connections.

I created a test that replicates the scenario by creating consumers for 5 topics each with KafkaJS and Confluent concurrently on a Confluent Cloud cluster. There are also tests that run just 5 KafkaJS consumers or just 5 Confluent consumers to prove there is no issue when run independently.

Notes:

  • Topics must already exist.
  • This connects both Confluent and KafkaJS to the same cluster; but I see the same behavior when Confluent and KafkaJS are used to connect to different clusters.
  • 5 seems like a magic number, in that for me I can repro consistently with 5. If I run the test with less than 5, results are inconsistent (sometimes there is no issue). Our app has 10's of consumers on both sides.
  • Used XXXX in place of identifying details/credentials.
  • Tests should be run with an extended timeout (I was using 60s).
import {KafkaJS as Confluent} from "@confluentinc/kafka-javascript";
import {Logger} from "@confluentinc/kafka-javascript/types/kafkajs.js";
import {fail} from "assert";
import {Consumer, ConsumerGroupJoinEvent, Kafka, logLevel} from "kafkajs";

const KAFKA_JS_TOPICS: string[] = [
  "test-kafkajs-topic",
  "test-kafkajs-topic-2",
  "test-kafkajs-topic-3",
  "test-kafkajs-topic-4",
  "test-kafkajs-topic-5"
];

const CONFLUENT_TOPICS: string[] = [
  "test-confluent-topic",
  "test-confluent-topic-2",
  "test-confluent-topic-3",
  "test-confluent-topic-4",
  "test-confluent-topic-5"
];

describe("Supports KafkaJS and Confluent consumers", async () => {
  let confluentConsumers: Confluent.Consumer[] = [];
  let kafkaJSConsumers: Consumer[] = [];

  afterEach(async () => {
    const promises: Promise<void>[] = [];
    for (const consumer of kafkaJSConsumers) {
      promises.push(consumer.disconnect());
    }
    for (const consumer of confluentConsumers) {
      promises.push(consumer.disconnect());
    }
    await Promise.all(promises);
    confluentConsumers = [];
    kafkaJSConsumers = [];
  });

  it("Handles concurrent startup of multiple KafkaJS consumers", async () => {
    await doTest(KAFKA_JS_TOPICS, []);
  });

  it("Handles concurrent startup of multiple Confluent consumers", async () => {
    await doTest([], CONFLUENT_TOPICS);
  });

  it("Handles concurrent startup of multiple KafkaJS and Confluent consumers", async () => {
    await doTest(KAFKA_JS_TOPICS, CONFLUENT_TOPICS);
  });

  async function doTest(kafkaJSTopics: string[], confluentTopics: string[]) {
    const kafkaJSKafka = new Kafka({
      brokers: ["XXXX.westus2.azure.confluent.cloud:9092"],
      ssl: true,
      sasl: {
        mechanism: "plain",
        username: "XXXX",
        password: "XXXX"
      },
      logLevel: logLevel.INFO,
      logCreator: kafkaLevel => {
        return entry => {
          const {timestamp, logger, message, stack, ...others} = entry.log;
          console.log(`[KafkaJS:${entry.namespace}] ${message} ${JSON.stringify(others)}${stack ? `: ${stack}` : ""}`);
        };
      }
    });

    const confluentKafka = new Confluent.Kafka({
      kafkaJS: {
        brokers: ["XXXX.westus2.azure.confluent.cloud:9092"],
        ssl: true,
        sasl: {
          mechanism: "plain",
          username: "XXXX",
          password: "XXXX"
        },
        logLevel: Confluent.logLevel.INFO,
        logger: new ConfluentLogger()
      }
    });

    kafkaJSConsumers = [];
    let kafkaJSConnected: number = 0;
    setImmediate(async () => {
      for (const topic of kafkaJSTopics) {
        const kafkaJSConsumer = kafkaJSKafka.consumer({groupId: `${topic}-group`});
        kafkaJSConsumer.on(kafkaJSConsumer.events.GROUP_JOIN, (event: ConsumerGroupJoinEvent) => {
          kafkaJSConnected++;
        });
        await kafkaJSConsumer.connect();
        await kafkaJSConsumer.subscribe({topic});
        await kafkaJSConsumer.run({
          eachMessage: async ({message}) => {}
        });
        kafkaJSConsumers.push(kafkaJSConsumer);
      }
    });

    confluentConsumers = [];
    let confluentConnected: number = 0;
    setImmediate(async () => {
      for (const topic of confluentTopics) {
        const confluentConsumer = confluentKafka.consumer({kafkaJS: {groupId: `${topic}-group`}});
        await confluentConsumer.connect();
        confluentConnected++;
        await confluentConsumer.subscribe({topic});
        await confluentConsumer.run({
          eachMessage: async ({message}) => {}
        });
        confluentConsumers.push(confluentConsumer);
      }
    });

    await until(async () => confluentTopics.length == confluentConnected);
    for (const consumer of confluentConsumers) {
      await until(async () => consumer.assignment().length > 0);
    }
    await until(async () => kafkaJSTopics.length == kafkaJSConnected);
  }
});

class ConfluentLogger implements Logger {
  private logLevel: Confluent.logLevel;

  constructor() {
    this.logLevel = Confluent.logLevel.INFO;
  }

  setLogLevel(logLevel: Confluent.logLevel) {
    this.logLevel = logLevel;
  }

  info = (message: string, extra?: object) => this.doLog(Confluent.logLevel.INFO, message, extra);
  error = (message: string, extra?: object) => this.doLog(Confluent.logLevel.ERROR, message, extra);
  warn = (message: string, extra?: object) => this.doLog(Confluent.logLevel.WARN, message, extra);
  debug = (message: string, extra?: object) => this.doLog(Confluent.logLevel.DEBUG, message, extra);

  namespace() {
    return this;
  }

  private doLog(level: Confluent.logLevel, message: string, extra?: object) {
    if (this.logLevel >= level) {
      console.log(`[ConfluentKafka] ${message}${extra ? ` ${JSON.stringify(extra)}` : ""}`);
    }
  }
}

async function until(condition: () => Promise<boolean>) {
  const timeout = 30000;
  const finish = Date.now() + timeout;
  while (Date.now() <= finish) {
    const result = await condition();
    if (result) return;
    await new Promise(resolve => setTimeout(resolve, 500));
  }
  fail(`Failed within ${timeout!}ms`);
}

The test for both ultimately fails to connect all the consumers and on the KafkaJS side produces many occurrences of this error (which is not present when running KafkaJS only):

[KafkaJS:Connection] Connection error: Client network socket disconnected before secure TLS connection was established {"broker":"XXXX.westus2.azure.confluent.cloud:9092","clientId":"kafkajs"}: Error: Client network socket disconnected before secure TLS connection was established
    at connResetException (node:internal/errors:787:14)
    at TLSSocket.onConnectEnd (node:_tls_wrap:1727:19)
    at TLSSocket.emit (node:events:530:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)

It would be helpful to understand what is conflicting here and if it can be prevented on the Confluent side or if there is a way to work around it.

I have confirmed that if I start the KafkaJS consumers before the Confluent consumers, the KafkaJS connections succeed. This is not viable in a real-world scenario however, because if later on the connection is dropped and the consumer tries to reconnect it will encounter this same issue.

[Enhancement] Roadmap or more info on path to production ready

Hi there!

Looking into this library for potential use at my company - it would be great if there was some kind of roadmap, rough timeline, or even MVP scope for this project, so we can evaluate if we'll be able to use it in the near term, or use another library initially and migrate here longer term.

I understand giving estimates on projects like this is a bit of a nightmare, but even something like "maybe this year, next year H1, next year H2" would be a massive help!

Thanks ๐Ÿš€

Request: TypeScript example of KafkaJS

I'm not understanding how to construct an instance of KafkaJS.Kafka since KafkaJS is exported as a type and not a namespace. Are there any examples of this?

Why is binding better than native implementation?

In our project, we plan to use Kafka for cross-service data exchange, but we are put off by the available choice of clients. Many clients look abandoned, including previously popular ones (for example, kafkajs or node-kafka). The only solutions are node-rdkafka or confluent-kafka-javascript based on librdkafka. There is currently no popular client for Kafka in the NodeJS community with active support without third-party dependencies, unlike RabbitMQ or NATS. Using bindings creates additional complexity when using applications on different platforms, and also requires studying the librdkafka documentation, which can be difficult for NodeJS developers without knowledge of other programming languages.

Is there a chance that an official client based on NodeJS will be introduced in the future?

Unable to construct viable Docker image using `node:20-alpine`

Our build uses an ubuntu-latest Github runner to build a Docker image.
image

Our Dockerfile follows the example provided in this repo.

FROM node:20-alpine
COPY ./dist /app/
WORKDIR /app
RUN apk --no-cache add \
  bash \
  g++ \
  ca-certificates \
  lz4-dev \
  musl-dev \
  cyrus-sasl-dev \
  openssl-dev \
  make \
  python3 \
  gcompat # added to provide missing ld-linux-x86-64.so.2
RUN apk add --no-cache --virtual .build-deps gcc zlib-dev libc-dev bsd-compat-headers py-setuptools bash
RUN npm install --omit=dev

EXPOSE 4000
CMD [ "node", "app.js" ]

The deployed pod is hosted in AKS, and both the runners and host nodes are amd64 arch.

Without the @confluentinc/kafka-javascript dependency in the package.json, the application will start without issue on the container.

With the @confluentinc/kafka-javascript dependency in the package.json (and no reference from the application), the application will immediately fail with:

Segmentation fault (core dumped)

While troubleshooting, we discovered that if we reinstalled the package on the running container, the application would then startup normally.

Initial thought was that the wrong flavor of librdkafka was being download.

By adding the following to the Dockerfile, I was able to capture the node-pre-gyp output:

WORKDIR /app/node_modules/@confluentinc/kafka-javascript
RUN npx node-pre-gyp install --update-binary
WORKDIR /app
#12 0.816 node-pre-gyp info using [email protected]
#12 0.816 node-pre-gyp info using [email protected] | linux | x64
#12 0.906 node-pre-gyp http GET https://github.com/confluentinc/confluent-kafka-javascript/releases/download/v0.1.15-devel/confluent-kafka-javascript-v0.1.15-devel-node-v115-linux-musl-x64.tar.gz

Again, launching this container results in the segmentation fault on startup.

Starting the container, and running the following:

cd node_modules/\@confluentinc/kafka-javascript/
npx node-pre-gyp install --update-binary
cd /app

...seemingly performs the same operation we saw during the Docker image construction:

node-pre-gyp info using [email protected]
node-pre-gyp info using [email protected] | linux | x64
http GET https://github.com/confluentinc/confluent-kafka-javascript/releases/download/v0.1.15-devel/confluent-kafka-javascript-v0.1.15-devel-node-v115-linux-musl-x64.tar.gz

...yet after this operation is performed, the application starts without issue.

Please help us to understand what is going on here, and how we can solve this problem.

Segfault while closing the consumer while consume loop is running

Reproduction code:


function runConsumer() {
    const consumer = new RdKafka.KafkaConsumer({
        'group.id': 'test-group' + Math.random(),
        'bootstrap.servers': 'localhost:9092',
    }, {
        'auto.offset.reset': 'earliest',
    });

    consumer.connect();

    consumer.on('ready', () => {
        console.log("Consumer is ready");
        consumer.subscribe(['test-topic']);
        consumer.consume(); // consume loop
    });

    consumer.on('data', (data) => {
        console.log("Received data");
        console.log(data);
        consumer.disconnect();
    });

    consumer.on('event.error', (err) => {
        console.error(err);
    });
}

Cause: NodeKafka::Workers::KafkaConsumerConsumeLoop::HandleMessageCallback is called after KafkaConsumerConsumerLoop:Close and the callback has been cleared by that time so callback->Call causes a segfault.

Headers (outbound and inbound) do not comply with KafkaJS type definitions

The header implementation for both producers and consumers does not comply with the type definitions offered up in kafkajs.d.ts (which are unmodified from the KafkaJS originals).

Below is a comparison between KafkaJS and Confluent.

KafkaJS

import {Kafka} from "kafkajs";

const topic = "test-kafkajs-topic";
let receivedCount = 0;

const kafka = new Kafka({brokers: ["localhost:9092"]});

const consumer = kafka.consumer({groupId:`${topic}-group`});
await consumer.connect();
await consumer.subscribe({topic: TOPIC});
await consumer.run({
  eachMessage: async ({message}) => {
    log.info(JSON.stringify(message.headers, null, 2));
    receivedCount++;
  }
});

const producer = kafka.producer();
await producer.connect();
await producer.send({
  topic: TOPIC,
  messages: [{value: "one", headers: {header1: "alpha", header2: "beta"}}]
});

await until(async () => receivedCount == 1);

await producer.disconnect();
await consumer.disconnect();
{
  "header1": {
    "type": "Buffer",
    "data": [
      97,
      108,
      112,
      104,
      97
    ]
  },
  "header2": {
    "type": "Buffer",
    "data": [
      98,
      101,
      116,
      97
    ]
  }
}

Confluent

import {KafkaJS as Confluent} from "@confluentinc/kafka-javascript";

const topic = "test-confluent-topic";
let receivedCount = 0;

const kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});

const consumer = kafka.consumer({kafkaJS: {groupId: `${topic}-group`}});
await consumer.connect();
await consumer.subscribe({topic});
await consumer.run({
  eachMessage: async ({message}) => {
    log.info(JSON.stringify(message.headers, null, 2));
    receivedCount++;
  }
});

await until(async () => consumer.assignment().length > 0);

const producer = kafka.producer({"linger.ms": 0});
await producer.connect();
await producer.send({
  topic,
  messages: [{value: "one", headers: {header1: "alpha", header2: "beta"}}]
});

await until(async () => receivedCount == 1);

await producer.disconnect();
await consumer.disconnect();
{
  "0": {
    "key": {
      "type": "Buffer",
      "data": [
        104,
        101,
        97,
        100,
        101,
        114,
        49
      ]
    }
  },
  "1": {
    "key": {
      "type": "Buffer",
      "data": [
        104,
        101,
        97,
        100,
        101,
        114,
        50
      ]
    }
  }
}

Two (maybe three) notable issues:

  1. The headers header1=alpha and header2=beta were sent to Kafka as key=header1 and key=header2
  2. When that message was received, the headers object does not match the IHeaders type definition:
export interface IHeaders {
  [key: string]: Buffer | string | (Buffer | string)[] | undefined
}
  1. if I had actually sent key=header1 and key=header2, KafkaJS compatibility would dictate a string key of "key" and a string[] value of ["header1","header2"]

Feature request: Support for DescribeClientQuotas/AlterClientQuotas

Currently, we are evaluating Kafka client libraries for JavaScript and were delighted to learn that with confluent-kafka-javascript an offering supported by Confluent is in the works.

For our use case, support for the Admin API functions

  • DescribeClientQuotas (API Key 48)
  • AlterClientQuotas (API Key 49)

would be required.

We realise that confluent-kafka-javascript depends on librdkafka to provide the missing functions. Can you share any insights into plans/timelines if and when librdkafka might be enhanced to support these API keys?

Cheers, Achim

Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE

Hello, I noticed that COOPERATIVE rebalance (incremental assign/unassign bindings) was implemented in confluent-kafka-javascript library, which is currently not supported by node-rdkafka. I made a small POC to see how it works in Node.js, but encountered a problem where the following error occurs during rebalance:

Consumer [1|poc_test]: rebalance.error Error: Local: Erroneous state
        at KafkaConsumer.assign (/Users/s.franchuk/github/confluent-kafka-javascript/lib/kafka-consumer.js:266:16)
        at KafkaConsumer.conf.rebalance_cb (/Users/s.franchuk/github/confluent-kafka-javascript/lib/kafka-consumer.js:65:16)
[2024-04-01T13:05:27.192Z]  WARN: poc-confluent-kafka/36080 on s-franchuk:
    Consumer [1|poc_test]: event.log {
      severity: 4,
      fac: 'ASSIGN',
      message: '[thrd:main]: Group "poc_test": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE'
    }

Debugging showed that the KafkaConsumer::IncrementalAssign method in kafka-consumer.cc is indeed called, and the consumer method consumer->incremental_assign(partitions) is also called with the correct arguments. What happened next in C++ code of librdkafka is hard for me to say, but rko->rko_u.assign.method does not return RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN or RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN here:
https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_cgrp.c#L4842

Kafka consumer:

const { CODES, KafkaConsumer } = require('@confluentinc/kafka-javascript');

this._consumer = new KafkaConsumer({
      'bootstrap.servers': KAFKA_BROKERS,
      'client.id': clientId,
      'group.id': groupId,
      'auto.offset.reset': 'latest',
      'session.timeout.ms': 10000,
      'heartbeat.interval.ms': 100,
      'enable.auto.commit': false,
      'partition.assignment.strategy': 'cooperative-sticky',
      'fetch.wait.max.ms': 100,
      rebalance_cb: true,
      // debug: 'consumer,topic',
    });

Rebalance callback:

this._consumer.on('rebalance', (err, assignments) => {
      const partitions = assignments.map((assignment) => assignment.partition);
      const type = RebalanceEventType[err.code] || 'ERROR';

      logger.info(`${this._who()}: rebalance happened - ${type} | [${partitions.join(',')}]`);

      switch (err.code) {
        case CODES.ERRORS.ERR__ASSIGN_PARTITIONS:
          this._consumer.incrementalAssign(assignments);
          break;
        case CODES.ERRORS.ERR__REVOKE_PARTITIONS:
          this._consumer.incrementalUnassign(assignments);
          break;
        default:
          logger.error(`${this._who()}: rebalance error`, err);
      }
});

this._consumer.on('rebalance.error', (err) => {
      logger.error(`${this._who()}: rebalance.error`, err);
});

For testing, used the library version 0.1.11-devel from npm and a manual build of the project from the dev_early_access_development_branch branch.

What could be the reason for this problem?

Environment Information

  • OS: Mac
  • Node Version: 18.17.0
  • NPM Version: 9.6.7
  • confluent-kafka-javascript version: 0.1.11-devel / dev_early_access_development_branch
  • Docker Image: confluentinc/cp-server:7.2.1

Producer property warnings on consumer creation

const kafka = new Kafka({
  kafkaJS: {
    brokers: ["localhost:9092"]
  }
});
const consumer = kafka.consumer({
  kafkaJS: {groupId: GROUP_ID}
});
await consumer.connect();

Results in the following log entries, even though I have clearly not specified retry.backoff.ms or retry.backoff.max.ms.

{
  message: '[thrd:app]: Configuration property retry.backoff.ms is a producer property and will be ignored by this consumer instance',
  fac: 'CONFWARN',
  timestamp: 1715289786815
}
{
  message: '[thrd:app]: Configuration property retry.backoff.max.ms is a producer property and will be ignored by this consumer instance',
  fac: 'CONFWARN',
  timestamp: 1715289786816
}

Help with SSL mapping

Since SSL mapping is not provided as part of the KafkaJS config migration, I have a question on how to migrate a KafkaJS SSL configuration.

We have 3rd party SSL-based connections that are configured today in KafkaJS as such:

brokers:
  - kafka-dev1.some-domain.local:9096
ssl:
  ca: ${secret.kafka.ca}
  key: ${secret.kafka.key}
  cert: ${secret.kafka.cert}
  passphrase: ${secret.kafka.passphrase}
  checkServerIdentity: false  # disables hostname verification

...where ${secret.kafka.ca}, ${secret.kafka.key}, and ${secret.kafka.cert} are the string contents of .pem files (and the .pem files are not accessible at runtime).

I see a very wide range of ssl options in GlobalConfig, some prefixed with ssl_ and others with ssl.

I'm not sure how I should be mapping these to incorporate the passphrase and disable hostname verification.

ssl_ca: ${secret.kafka.ca}
ssl_key: ${secret.kafka.key}
ssl_certificate: ${secret.kafka.cert}
???

-or-

ssl.ca.pem: ${secret.kafka.ca}
ssl.key.pem: ${secret.kafka.key}
ssl.key.password: ${secret.kafka.passphrase}
ssl.certificate.pem: ${secret.kafka.cert}
???

-or-
other?

Thanks!

Understanding the roadmap

I wanted to get a better understanding of what are the roadmap items that are left before we enter Beta or GA.
It would be good to have a bit of visibility into the project, beyond the issues that are filed .

Thanks

Errors are not sent back to the caller

im using the kafkaJs variant and producer.send and i was observing logs like this that were not send back to the caller.

timestamp: 1717074105728
  fac: 'REQTMOUT',
  message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]: sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out 2 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests',

Confluent vs KafkaJS performance feedback

I ran a test sending and receiving 1000 messages individually (no batching) using the KafkaJS library, and then ran the same test using the Confluent library (following the migration instructions).

KafkaJS: 455ms
Confluent: 501951ms

That's not a typo. In this case, the Confluent test took 1000x time to complete.

I'm presuming there is some tuning that can be done via configuration; but this was an "out of the box" conversion, and my attempts at "tuning" the configuration did not yield any noticeable differences.

Notes

  • Topic already exists and is empty at start of test.
  • Run on an M3 Mac, with local docker-hosted Kafka (confluentinc/cp-kafka:7.6.0)

KafkaJS

import {Kafka} from "kafkajs";

const topic = "test-kafkajs-topic";
const total = 1000;
const start = Date.now();
let sentCount = 0;
let receivedCount = 0;

const kafka = new Kafka({brokers: ["localhost:9092"]});

const consumer = kafka.consumer({groupId: `${topic}-group`});
await consumer.connect();
await consumer.subscribe({topic});
await consumer.run({
  eachMessage: async ({message}) => {
    receivedCount++;
    if (receivedCount % 100 === 0) {
      log.info(`Rec'd ${String(receivedCount).padStart(4, " ")} : ${Date.now() - start}ms`);
    }
  }
});

const producer = kafka.producer();
await producer.connect();
for (let i = 0; i < total; i++) {
  await producer.send({
    topic,
    messages: [{value: "one"}]
  });
  if (++sentCount % 100 === 0) {
    log.info(`Sent  ${String(sentCount).padStart(4, " ")} : ${Date.now() - start}ms`);
  }
}

await until(async () => receivedCount == total, {timeout: 5000});

await producer.disconnect();
await consumer.disconnect();
Sent   100 : 133ms
Rec'd  100 : 133ms
Sent   200 : 163ms
Rec'd  200 : 163ms
Sent   300 : 193ms
Rec'd  300 : 193ms
Sent   400 : 229ms
Rec'd  400 : 229ms
Sent   500 : 271ms
Rec'd  500 : 271ms
Sent   600 : 331ms
Rec'd  600 : 332ms
Sent   700 : 371ms
Rec'd  700 : 371ms
Sent   800 : 398ms
Rec'd  800 : 399ms
Sent   900 : 427ms
Rec'd  900 : 428ms
Sent  1000 : 454ms
Rec'd 1000 : 455ms

Confluent

import {KafkaJS as Confluent} from "@confluentinc/kafka-javascript";

const topic = "test-confluent-topic";
const total = 1000;
const start = Date.now();
let sentCount = 0;
let receivedCount = 0;

const kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});

const consumer = kafka.consumer({kafkaJS: {groupId: `${topic}-group`}});
await consumer.connect();
await consumer.subscribe({topic});
await consumer.run({
  eachMessage: async ({message}) => {
    receivedCount++;
    if (receivedCount % 100 === 0) {
      log.info(`Rec'd ${String(receivedCount).padStart(4, " ")} : ${Date.now() - start}ms`);
    }
  }
});

const producer = kafka.producer();
await producer.connect();
for (let i = 0; i < total; i++) {
  await producer.send({
    topic,
    messages: [{value: "one"}]
  });
  if (++sentCount % 100 === 0) {
    log.info(`Sent  ${String(sentCount).padStart(4, " ")} : ${Date.now() - start}ms`);
  }
}

await until(async () => receivedCount == total, {timeout: 5000});

await producer.disconnect();
await consumer.disconnect();
Sent   100 : 50630ms
Rec'd  100 : 63159ms
Sent   200 : 100720ms
Rec'd  200 : 127273ms
Sent   300 : 150805ms
Rec'd  300 : 191382ms
Sent   400 : 200890ms
Sent   500 : 250985ms
Rec'd  400 : 255503ms
Rec'd  500 : 255504ms
Sent   600 : 301079ms
Sent   700 : 351164ms
Rec'd  600 : 383739ms
Rec'd  700 : 383740ms
Sent   800 : 401253ms
Sent   900 : 451346ms
Sent  1000 : 501434ms
Rec'd  800 : 501949ms
Rec'd  900 : 501950ms
Rec'd 1000 : 501951ms

How to use logging callback?

With KafkaJS, we used the logCreator config field on when creating the Kafka instance. That has been removed in this library.

I see the underlying rdkafka supports a log_cb logging callback, but I'm unable to determine how to make use of it and was unable to find any examples on the internet.

The type definition for the callback is any (not terribly helpful). I have tried various incarnations, but all of them have the same result.

const kafka = new Confluent.Kafka({
  kafkaJS: {
    brokers: ["localhost:9092"]
  },
  log_cb: () => console.log("log_cb")
});

Results in:

Error: Invalid callback type
    at Client.connect (/Users/peloquina/src/agilysys-inc/stay/backplane-base/node_modules/@confluentinc/kafka-javascript/lib/client.js:253:16)
    at /Users/peloquina/src/agilysys-inc/stay/backplane-base/node_modules/@confluentinc/kafka-javascript/lib/kafkajs/_consumer.js:802:28
    at new Promise (<anonymous>)
    at Consumer.connect (/Users/peloquina/src/agilysys-inc/stay/backplane-base/node_modules/@confluentinc/kafka-javascript/lib/kafkajs/_consumer.js:800:12)
    at Context.<anonymous> (itest/confluentKafkaAccessor.test.ts:149:20)

Please help.

Facing issues with client connection

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: MacOS Sonama 14.3.1
  • Node Version [e.g. 8.2.1]: Node.js v20.11.0
  • NPM Version [e.g. 5.4.2]: 10.2.4
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]: gcc --version
    Apple clang version 15.0.0 (clang-1500.1.0.2.5)
    Target: arm64-apple-darwin23.3.0
    Thread model: posix
    InstalledDir: /Library/Developer/CommandLineTools/usr/bin
  • confluent-kafka-javascript version [e.g. 2.3.3]: "^0.1.10-devel"

Steps to Reproduce
Installed the package using npm: npm install @confluentinc/kafka-javascript
Used original example provided as shown in the welcome page for producing the message

confluent-kafka-javascript Configuration Settings

const  { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS
const kafka = new Client({
    KafkaJS: {
      brokers: ["xxxxxxxxxx"],
      ssl: true,
      sasl: {
        mechanism: "plain",
        username: "xxxxxxx",
        password:
          "yyyyyyyyyyyyy",
      },
    },
  });

Team informed me about the early access version and I am trying to see how the package works and was not able to start the client at all and gives me following errors.

const kafka = new Client({
                ^

ReferenceError: Client is not defined
    at producerStart (/Users/sai/Documents/Development/messaging/confluent.js:9:17)
    at Object.<anonymous> (/Users/sai/Documents/Development/messaging/confluent.js:47:1)
    at Module._compile (node:internal/modules/cjs/loader:1376:14)
    at Module._extensions..js (node:internal/modules/cjs/loader:1435:10)
    at Module.load (node:internal/modules/cjs/loader:1207:32)
    at Module._load (node:internal/modules/cjs/loader:1023:12)
    at Function.executeUserEntryPoint [as runMain] (node:internal/modules/run_main:135:12)
    at node:internal/main/run_main_module:28:49

I did do some extra checks to see if everything package is working as expected or not and was able to see the features and librdversion

const details = require("@confluentinc/kafka-javascript").features;

Result:

[
  'gzip',             'snappy',
  'ssl',              'sasl',
  'regex',            'lz4',
  'sasl_plain',       'sasl_scram',
  'plugins',          'zstd',
  'sasl_oauthbearer', 'http',
  'oidc'
]

Mismatches between`types/kafkajs.d.ts` and documentation

While working on a conversion from KafkaJS, I've encountered the following:

  • Per MIGRATION.md: acks, compression and timeout are not set on a per-send basis, yet ProducerRecord interface still contains acks, compression, and timeout leading one to believe these can be set per message.
  • According to MIGRATION.md retry option from existing KafkaJS config is supported, yet it is missing from KafkaConfig interface.
  • No mention is made of supporting batch message handling in MIGRATION.md and ConsumerRunConfig type only includes eachMessage?: EachMessageHandler and not eachBatch?: EachBatchHandler, yet EachBatchHandler and its supporting types are still present.
  • compression in producer config (and in kafkajs.d.ts) says to use the CompressionTypes enum. Doing so results in error message like: Error: Invalid value "3" for configuration property "compression.codec". To get this working, I had to use the native values from compression.codec: 'none' | 'gzip' | 'snappy' | 'lz4' | 'zstd' and in code as unknown as CompressionTypes to avoid compile-time errors.
  • the consumer on event methods are present in kafkajs.d.ts, they are not mentioned in MIGRATION.md, but use of them throws a "not implemented" error. Is the goal to provide implementation for these?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.