Giter Site home page Giter Site logo

nestjs-kafka's Introduction

Nest Logo KafkaJS Logo

NestJS + KafkaJS

Integration of KafkaJS with NestJS to build event driven microservices.

Setup

Import and add the KafkaModule to the imports array of the module for which you would like to use Kafka.

Synchronous Module Initialization

Register the KafkaModule synchronous with the register() method:

@Module({
  imports: [
    KafkaModule.register([
      {
        name: 'HERO_SERVICE',
        options: {
          client: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          }
        }
      },
    ]),
  ]
  ...
})

Asynchronous Module Initialization

Register the KafkaModule asynchronous with the registerAsync() method:

import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
  imports: [
    ConfigModule.forRoot(),
    KafkaModule.registerAsync(['HERO_SERVICE'], {
            useFactory: async (configService: ConfigService) => {
                const broker = this.configService.get('broker');
                return [
                    {
                        name: 'HERO_SERVICE',
                        options: {
                              clientId: 'hero',
                              brokers: [broker],
                            },
                            consumer: {
                              groupId: 'hero-consumer'
                            }
                        }
                    }
                ];
            },
            inject: [ConfigService]
        })
  ]
  ...
})

Full settings can be found:

Config Options
client https://kafka.js.org/docs/configuration
consumer https://kafka.js.org/docs/consuming#options
producer https://kafka.js.org/docs/producing#options
serializer
deserializer
consumeFromBeginning true/false

Subscribing

Subscribing to a topic to accept messages.

export class Consumer {
  constructor(
    @Inject('HERO_SERVICE') private client: KafkaService
  ) {}

  onModuleInit(): void {
    this.client.subscribeToResponseOf('hero.kill.dragon', this)
  }

  @SubscribeTo('hero.kill.dragon')
  async getWorld(data: any, key: any, offset: number, timestamp: number, partition: number, headers: IHeaders): Promise<void> {
    ...
  }

}

Producing

Send messages back to kafka.

const TOPIC_NAME = 'hero.kill.dragon';

export class Producer {
  constructor(
    @Inject('HERO_SERVICE') private client: KafkaService
  ) {}

  async post(message: string = 'Hello world'): Promise<RecordMetadata[]> {
    const result = await this.client.send({
      topic: TOPIC_NAME,
      messages: [
        {
          key: '1',
          value: message
        }
      ]
    });

    return result;
  }

}

Schema Registry support.

By default messages are converted to JSON objects were possible. If you're using AVRO you can add the SchemaRegistry deserializer to convert the messages. This uses the KafkaJS Schema-registry module

In your module.ts:

@Module({
  imports: [
    KafkaModule.register([
      {
        name: 'HERO_SERVICE',
        options: {
          client: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          }
        },
        deserializer: new KafkaAvroResponseDeserializer({
          host: 'http://localhost:8081'
        }),
        serializer: new KafkaAvroRequestSerializer({
          config: {
            host: 'http://localhost:8081/'
          },
          schemas: [
            {
              topic: 'test.topic',
              key: join(__dirname, 'key-schema.avsc'),
              value: join(__dirname, 'value-schema.avsc')
            }
          ],
        }),
      },
    ]),
  ]
  ...
})

See the e2e test for example.

TODO

  • Tests

PRs Welcome โค๏ธ

nestjs-kafka's People

Contributors

agboon avatar aguerram avatar dependabot[bot] avatar krampenschiesser avatar matcampos avatar matheusisquierdo avatar rob3000 avatar shamshiel avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

nestjs-kafka's Issues

Feature Request: Provide async module initialization

FIrst of all thank you for providing this package. I tried to use this package today and it seems to miss a critical feature.

I need to load the username, password and broker list on application start up dynamically. Other NestJS packages provide a "forRootAsync" or "registerAsync" method for this. I can use this method to inject some services and load config values.

Example from the HttpModule

HttpModule.registerAsync({
  useFactory: async (someService: SomeService) => {},
  inject: [SomeService]
}),

Sorry if this is already possible but I didn't find any method that would provide this.
(Is there another way to achieve this that I'm not aware of?)

Connection error: connect EACCES

{"level":"ERROR","timestamp":"2022-07-19T09:52:20.557Z","logger":"kafkajs","message":"[Connection] Connection error: connect EACCES fe80::b03d:38e1:27b3:9e22:9092","broker":"W10-lTkFoL0JX1t.radisys.com:9092","clientId":"ScannerService","stack":"Error: connect EACCES fe80::b03d:38e1:27b3:9e22:9092\n at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1187:16)"} {"level":"ERROR","timestamp":"2022-07-19T09:52:20.561Z","logger":"kafkajs","message":"[Connection] Connection error: connect EACCES fe80::b03d:38e1:27b3:9e22:9092","broker":"W10-lTkFoL0JX1t.radisys.com:9092","clientId":"ScannerService","stack":"Error: connect EACCES fe80::b03d:38e1:27b3:9e22:9092\n at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1187:16)"}

I am not able to understand what is the issue over here?
I am running kafka on local machine with KAFKA_BROKERS="127.0.0.1:9092"

Alternative kafka clients and performance

Hello Robert, thanks for the great project, I am using this in my application.
I've been thinking about using nodejs kafka-streams as underlying kafka client or technology for connection between nestjs and kafka.
Right now I see we are using kafkajs as our kafka-client, but I could imagine using also kafka-streams with all the nestjs decorators and methods.
What do you think about that, and what will be trade-offs and performance benefits.
As I see, you are also collaborator on kafka-streams nodefluent project, so maybe there is a reason why you chose kafkajs instead of kafka-streams as underlying connection medium to kafka.
Thanks

Add option to not automatically connect producer and admin

Hi, currently producer and admin are automatically connected in onModuleInit.

await this.producer.connect()

This causes a problem however when Kafka is unreachable. After the configured number of retries, KafkaJS will throw KafkaJSNumberOfRetriesExceeded which is not caught and Nest will not start. Increasing retries does not help because Nest is waiting for the module to be initialized which will never happen until the producer connects successfully.

From the KafkaJS documentation:

If the max number of retries is exceeded the retrier will throw KafkaJSNumberOfRetriesExceeded and interrupt. Producers will bubble up the error to the user code; Consumers will wait the retry time attached to the exception (it will be based on the number of attempts) and perform a full restart.

I propose adding an option to not connect the producer and admin in onModuleInit, instead make it possible for users to connect manually.

Solution to Invalid Topic Name on Producer

Hi! ๐Ÿ‘‹

Firstly, thanks for your work on this project! ๐Ÿ™‚

Today I used patch-package to patch @rob3000/[email protected] for the project I'm working on.

Here is the diff that solved my problem:

diff --git a/node_modules/@rob3000/nestjs-kafka/dist/kafka.service.js b/node_modules/@rob3000/nestjs-kafka/dist/kafka.service.js
index 9bd0b5b..0a13615 100644
--- a/node_modules/@rob3000/nestjs-kafka/dist/kafka.service.js
+++ b/node_modules/@rob3000/nestjs-kafka/dist/kafka.service.js
@@ -97,8 +97,8 @@ let KafkaService = KafkaService_1 = class KafkaService {
             this.logger.error('There is no producer, unable to send message.');
             return;
         }
-        const serializedPacket = await this.serializer.serialize(message);
-        return await this.producer.send(serializedPacket);
+        await Promise.all(message.messages.map(x => x.value = JSON.stringify(x.value)))
+        return await this.producer.send(message);
     }
     getGroupIdSuffix(groupId) {
         return groupId + '-client';

This issue body was partially generated by patch-package.

Subjects with custom suffix -value and -key, allowing in lowercase

Subjects with custom suffix -value and -key, allowing in lowercase

Custom suffix

[Nest] 3080 - 04/11/2020 19:02:01 [KafkaAvroRequestSerializer] Object: { "name": "ResponseError", "status": 404, "unauthorized": false, "url": "get http://localhost:8081/subjects/company-Key/versions/latest" }

Handle KafkaJSNumberOfRetriesExceeded

Hi,

I have a problem with the kafkaJS retry system, I need to wait for two retries and then send a message to a deadletter queue of my own.
The problem is that when I have a kafka message with an error, the consumer tries to consume the same message forever. After KafkaJSNumberOfRetriesExceeded, consumer restarts and keeps consuming the failed message.
This is the log trace when i configure 2 retries:

[KafkaService] Error for message my.topic: QueryFailedError: Some database error
[KafkaService] ERROR [Runner] Error when calling eachMessage {"timestamp":"2021-05-07T07:09:40.915Z","logger":"kafkajs","topic":"my.topic","partition":0,"offset":"358","stack":"QueryFailedError: Some database error"}
[KafkaService] Error for message my.topic: QueryFailedError: Some database error
[KafkaService] ERROR [Runner] Error when calling eachMessage {"timestamp":"2021-05-07T07:09:45.869Z","logger":"kafkajs","topic":"my.topic","partition":0,"offset":"358","stack":"QueryFailedError: Some database error"}
[KafkaService] Error for message my.topic: QueryFailedError: Some database error
[KafkaService] ERROR [Runner] Error when calling eachMessage {"timestamp":"2021-05-07T07:09:51.644Z","logger":"kafkajs","topic":"my.topic","partition":0,"offset":"358","stack":"QueryFailedError: Some database error"}
[KafkaService] ERROR [Consumer] Crash: KafkaJSNumberOfRetriesExceeded: Some database error
[KafkaService] INFO [Consumer] Stopped {"timestamp":"2021-05-07T07:09:56.722Z","logger":"kafkajs","groupId":"my-consumer"}
[KafkaService] ERROR [Consumer] Restarting the consumer in 1302ms {"timestamp":"2021-05-07T07:09:56.724Z","logger":"kafkajs","retryCount":2,"retryTime":1302,"groupId":"my-consumer"}
[KafkaService] INFO [Consumer] Starting {"timestamp":"2021-05-07T07:09:58.029Z","logger":"kafkajs","groupId":"my-consumer"}
[KafkaService] INFO [ConsumerGroup] Consumer has joined the group {"timestamp":"2021-05-07T07:09:59.299Z","logger":"kafkajs","groupId":"my-consumer","memberId":"my-cfd8cbdc-aabb-4469-94f0-d4109f74cf76","leaderId":"my-cfd8cbdc-aabb-4469-94f0-d4109f74cf76","isLeader":true,"memberAssignment":{"my-topic":[0]},"groupProtocol":"RoundRobinAssigner","duration":882}
[KafkaService] Error for message my.topic: QueryFailedError: Some database error
[KafkaService] ERROR [Runner] Error when calling eachMessage {"timestamp":"2021-05-07T07:09:51.644Z","logger":"kafkajs","topic":"my.topic","partition":0,"offset":"358","stack":"QueryFailedError: Some database error"}
[KafkaService] Error for message my.topic: QueryFailedError: Some database error
[KafkaService] ERROR [Runner] Error when calling eachMessage {"timestamp":"2021-05-07T07:10:05.204Z","logger":"kafkajs","topic":"my.topic","partition":0,"offset":"358","stack":"QueryFailedError: Some database error"}
[KafkaService] Error for message my.topic: QueryFailedError: Some database error
[KafkaService] ERROR [Runner] Error when calling eachMessage {"timestamp":"2021-05-07T07:19:37.384Z","logger":"kafkajs","topic":"my.topic","partition":0,"offset":"358","stack":"QueryFailedError: Some database error"}
[KafkaService] ERROR [Consumer] Crash: KafkaJSNumberOfRetriesExceeded: Some database error
[KafkaService] INFO [Consumer] Stopped {"timestamp":"2021-05-07T07:09:56.722Z","logger":"kafkajs","groupId":"my-consumer"}
[KafkaService] ERROR [Consumer] Restarting the consumer in 1302ms {"timestamp":"2021-05-07T07:09:56.724Z","logger":"kafkajs","retryCount":2,"retryTime":1302,"groupId":"my-consumer"}
[KafkaService] INFO [Consumer] Starting {"timestamp":"2021-05-07T07:09:58.029Z","logger":"kafkajs","groupId":"my-consumer"}
[KafkaService] INFO [ConsumerGroup] Consumer has joined the group {"timestamp":"2021-05-07T07:09:59.299Z","logger":"kafkajs","groupId":"my-consumer","memberId":"my-cfd8cbdc-aabb-4469-94f0-d4109f74cf76","leaderId":"my-cfd8cbdc-aabb-4469-94f0-d4109f74cf76","isLeader":true,"memberAssignment":{"my-topic":[0]},"groupProtocol":"RoundRobinAssigner","duration":882}

The loop is repeated forever until I stop my Nestjs project. I need to catch the KafkaJSNumberOfRetriesExceeded error to save the failed message in a deadletter queue (or database table) and continue consuming next messages.

How can I do it?

nestjs microservice kafka

Hello,

I like your library but i wanted to know how it is different from nestjs microservice kafka transport?

Support for commitOffsets on npm

I noticed that the version published on npm is quite outdated compared to the version that is here on github, I would really like to use your library but I need the commitOffsets method where it invokes the commitOffsets method of the consumer's private instance, do you anticipate when this will be available on npm?

Confused by the first registration of schema

Sorry, my understanding may be wrong.
I found we only have getSchemaIds in KafkaAvroRequestSerializer.
Does that mean we don't support registering these schema to avro by nestJs app?

Support for logical types

In the schema registry module of kafkajs, it's possible to add logical types while initialising the schema registry (ref). The typings for SchemaRegistryAPIClientArgs uses only the basic config.

Proposal: add the further configuration options to KafkaAvroResponseDeserializer.

Cheers!

Can't use another logger

Hi!

I'm trying to use NestJS-Pino as default logger for this lib, but I can't do it.

I got this:

{"level":30,"time":1639590337257,"pid":22520,"hostname":"SHS9DEP","msg":"\u001b[90m[info] \u001b[39m\u001b[31mMikroORM failed to connect to database "}
[Nest] 22520   - 12/15/2021, 6:45:38 PM   [KafkaService] INFO [Consumer] Starting {"timestamp":"2021-12-15T17:45:38.006Z","logger":"kafkajs","groupId":"kafka_test"}
{"level":30,"time":1639590338032,"pid":22520,"hostname":SHS9DEP","context":"NestFactory","msg":"Starting Nest application..."}
{"level":30,"time":1639590338032,"pid":22520,"hostname":"SHS9DEP","context":"InstanceLoader","msg":"OrmModule dependencies initialized"}
{"level":30,"time":1639590338032,"pid":22520,"hostname":"SHS9DEP","context":"InstanceLoader","msg":"MikroOrmModule dependencies initialized"}
{"level":30,"time":1639590338032,"pid":22520,"hostname":"SHS9DEP","context":"InstanceLoader","msg":"ConfigHostModule dependencies initialized"}

As you can see, all entries uses in NestJS-Pino but not the Kafka entry.

Here there the configuration what I use:

@Module({
	imports: [
		KafkaModule.registerAsync(['KAFKA_SERVICE'], {
			imports: [
				LoggerModule.forRoot(),
			],
			inject: [],
			useFactory: (logger: Logger) => {
				return [
					{
						name: 'KAFKA_SERVICE',
						options: {
							client: {
								clientId: 'kafka_test',
								brokers: ['hostname:9092'],
								logCreator: KafkaLogger.bind(logger), // I tried another forms, but nothing
								retry: {
									retries: 2,
									initialRetryTime: 30,
								},
							},
							consumer: {
								groupId: 'kafka_group_test',
								allowAutoTopicCreation: true,
							},
						},
					},
				];
			}
		})
	],
	providers: [
		KafkaService
	],
	exports: [
		KafkaService
	]
})
export class KafkaModule { }

Thanks!

Multiple clients on same broker lead to weird behaviour

If I register two clients KAFKA_CLIENT_1 and KAFKA_CLIENT_2 on the same broker I receive every message twice because the @SubscribeTo will be registered to both clients regardless of what I configured.

The problem is the static/global SUBSCRIBER_MAP because it is the same for every KafkaService. So if I subscribe to the topic hero.kill.dragon with KAFKA_CLIENT_1, the current implementation will also subscribe the topic hero.kill.dragon to KAFKA_CLIENT_2. This happens in onModuleInit() in the KafkaService.

I think to solve this problem every KafkaService needs its own SUBSCRIBER_MAP and SUBSCRIBER_OBJECT_MAP.

I'm pretty sure this will also lead to weird behaviour with different brokers.

Invalid topic on Producer

I'm trying to send a batch of messages with the following code:

    await this.client.send(
      {
        topic: 'topic-name',
        messages: [
          { key: '1', value: 'string1' },
          { key: '2', value: 'string2' },
        ],
      });

It seems that the serializer is trying to serialize the whole object including the topic, which results in kafkajs being called with

[ { topic: undefined, messages: undefined } ]

RegisterAsync: Wait for initial fetch of schemas

First of all: Sorry for all the issues, just writing down the stuff which goes into my mind while setting up this library together with NestJS for use in a production environment.๐Ÿค“

When registering KafkaModule asynchronously in NestJS (and using the Avro serializer/deserializer), the application waits for a connection to Kafka before starting and prevents startup when no connection can be established. IMHO, it should act the same way when it comes to schemas. Even when a connection to Kafka is established, it's worth nothing when the events cannot be encoded/decoded.

Inject Nestjs providers into consumer

Hi,

I'm trying to inject a class into my Kafka Consumer but i can't:

import { Inject } from '@nestjs/common';
import { KafkaService, SubscribeTo } from '@rob3000/nestjs-kafka';
import { TestService } from '../../application/test.service';

export class TestConsumer {
  constructor(
      @Inject('TEST_KAFKA') private client: KafkaService,
      @Inject('TestService') private testService: TestService
  ) {
  }

  onModuleInit(): void {
    this.client.subscribeToResponseOf('item.created', this.client);
  }

  @SubscribeTo('item.created')
  async itemCreated(data): Promise<void> {

    await this.testService.createItem(data.id, data.companyId);
  }
}

The error I'm getting is

Cannot read property 'createItem' of undefined.

The service testService is undefined when the method createItem is called, I think this is because the KafkaClient is executing the "createItem" method as a callback instead of using the TestConsumer class.
I used the "TestService" dependency in other parts of the app so it's not a configuration or definition problem.

How can I inject or use dependencies in my Kafka consumers?

Thanks!

Health check or ping

Hello Robert, thanks for this very useful plugin, I have added this to my project deployed on kubernetes but I was not able to find the way to check if the connection is successfully for a healthcheck, if the connection is broken I would stop sending traffic to the pod or restart it.

Is there a way to check if the connection is stablished from KafkaService?

Thank you.

Configuration for two consumers listening to two different topic

Hey @rob3000
I am trying to create a configuration that has two consumers listening to two different topics on two different consumer group ids. I am using the following configuration but it is subscribing to each topic twice which is what I don't expect to happen.

KafkaModule.registerAsync(['EMAIL-REQUEST-PAYLOAD-GENERATION', 'EMAIL-REQUEST-PROCESS'], { useFactory: async (configService: ConfigService) => { const broker = configService.kafkaConfig.kafkaHost; return [ { name: 'EMAIL-REQUEST-PAYLOAD-GENERATION', options: { client: { clientId: 'ECS', brokers: [broker], logLevel: configService.environment === Environment.Development ? logLevel.WARN : logLevel.ERROR, connectionTimeout: 10000, requestTimeout: 30000, retry: { restartOnFailure: async (e) => { console.log('Kafka restarted', e); return Promise.resolve(true); }, }, }, consumer: { sessionTimeout: 120000, heartbeatInterval: 1000, allowAutoTopicCreation: true, groupId: 'EMAIL_GROUP', }, consumerRunConfig: undefined, producer: { allowAutoTopicCreation: true, transactionTimeout: 30000, retry: { retries: 5, }, }, }, }, { name: 'EMAIL-REQUEST-PROCESS', options: { client: { clientId: 'ecs', brokers: [broker], logLevel: configService.environment === Environment.Development ? logLevel.WARN : logLevel.ERROR, connectionTimeout: 10000, requestTimeout: 30000, retry: { restartOnFailure: async (e) => { console.log('Kafka restarted', e); return Promise.resolve(true); }, }, }, consumer: { sessionTimeout: 120000, heartbeatInterval: 1000, allowAutoTopicCreation: true, groupId: 'EMAIL_PROCESS_GROUP', }, consumerRunConfig: undefined, producer: { allowAutoTopicCreation: true, transactionTimeout: 30000, retry: { retries: 5, }, }, }, }, ]; }, inject: [ConfigService], }),

Feel free to take this configuration to a JSON formatter.
Can you guide me with the correct configuration for the module?

KafkaJSNonRetriableError: Invalid topic

Hello,
So following is the output i receive. I've added the test code below as well.

[Nest] 14783   - 12/20/2020, 2:30:46 PM   [RoutesResolver] AppController {}: +151ms
[Nest] 14783   - 12/20/2020, 2:30:46 PM   [RouterExplorer] Mapped {, GET} route +3ms
[Nest] 14783   - 12/20/2020, 2:30:46 PM   [RouterExplorer] Mapped {/auth/amazon/callback, GET} route +1ms
[Nest] 14783   - 12/20/2020, 2:30:46 PM   [RoutesResolver] TestConsumer {/kafka}: +10ms
[Nest] 14783   - 12/20/2020, 2:30:46 PM   [RouterExplorer] Mapped {/kafka, POST} route +1ms
[Nest] 14783   - 12/20/2020, 2:30:46 PM   [RouterExplorer] Mapped {/kafka, GET} route +12ms
[Nest] 14783   - 12/20/2020, 2:30:46 PM   [KafkaService] INFO [Consumer] Starting {"timestamp":"2020-12-20T19:30:46.656Z","logger":"kafkajs","groupId":"test-e2e-consumer"}
[Nest] 14783   - 12/20/2020, 2:30:46 PM   [NestApplication] Nest application successfully started +145ms
[Nest] 14783   - 12/20/2020, 2:30:50 PM   [ExceptionsHandler] Invalid topic +3637ms
KafkaJSNonRetriableError: Invalid topic
import { Module } from '@nestjs/common';
import { KafkaAvroRequestSerializer, KafkaAvroResponseDeserializer, KafkaModule } from '@rob3000/nestjs-kafka';
import { TestConsumer, TOPIC_NAME } from './test.consumer';


@Module({
  imports: [
    KafkaModule.register([
      {
        name: 'KAFKA_SERVICE',
        options: {
          client: {
            clientId: 'test-e2e',
            brokers: ['localhost:9092'],
            retry: {
              retries: 2,
              initialRetryTime: 30,
            },
          },
          consumer: {
            groupId: 'test-e2e-consumer',
            allowAutoTopicCreation: true,
          },
        }
      },
    ]),
 
  ],
  providers: [],
  controllers: [TestConsumer],
})
export class TestModule {
}
import { Controller, Get, Inject } from '@nestjs/common';
import { KafkaService, SubscribeTo } from '@rob3000/nestjs-kafka';
import { Payload } from '@nestjs/microservices';

export const TOPIC_NAME = 'say.hello';

@Controller('/kafka')
export class TestConsumer {

  // Used to log the errors for testing.
  messages = [];

  constructor(
    @Inject('KAFKA_SERVICE') private client: KafkaService,
  ) {
  }

  onModuleInit(): void {
    this.client.subscribeToResponseOf(TOPIC_NAME, this);
  }

  @SubscribeTo(TOPIC_NAME)
  async message(@Payload() data: any): Promise<void> {
    this.messages.push(data);
  }

  @Get()
  async index() {
    const a = {
      messages: [{
        key: '1',
        value: '127.0.0.1',
      }],
      topic: TOPIC_NAME,
    };
    return await this.client.send(a);
  }
}

KafkaModule.registerAsync() gives error Nest can't resolve dependencies of the KAFKA_MODULE_OPTIONS (?)

Hey there, I'm trying to use this but while attempting to use the module with RegisterAsync (version 1.4.0) but it's blowing up. Not sure if I'm doing something wrong or if this is a bug.

This is my app.module.ts:

import { KafkaService, KafkaModule } from '@rob3000/nestjs-kafka';
import { v4 as uuidv4 } from 'uuid';
import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
  imports: [
    ConfigModule.forRoot(),
    KafkaModule.registerAsync(['my-name'], {
      useFactory: async (configService: ConfigService) => {
        const uuid = uuidv4();
        const broker = configService.get('KAFKA_BROKER');

        Logger.log(`Registering to Kafka service with client uuid: ${uuid}`);
        return [
          {
            name: 'my-name',
            options: {
              client: {
                brokers: [configService.get('KAFKA_BROKER')],
              },
              brokers: [broker],
              consumer: { groupId: 'my-name' },
            },
          },
        ];
      },
      inject: [ConfigService],      
    }),
  ],
  providers: [
    ConfigService,
    KafkaService,
  ],
})
export class AppModule {}

I am injecting the ConfigService by using inject... no idea why am I getting this error.

The error I'm getting:

consumer_1         | [Nest] 90   - 05/24/2021, 10:16:52 PM   [NestFactory] Starting Nest application...
consumer_1         | [Nest] 90   - 05/24/2021, 10:16:53 PM   [ExceptionHandler] Nest can't resolve dependencies of the KAFKA_MODULE_OPTIONS (?). Please make sure that the argument ConfigService at index [0] is available in the KafkaModule context.

Confusing documentation about payloads

When using the module as stated in the documentation, it's not possible to get other informations about the incoming kafka message. The documentation needs to be changed a bit because the consumers are get called different than when using the normal KafkaTransporter provided by NestJS. The @Payload decorator is not needed anyways.
So when using the example from below, data would be the decoded message from Kafka without any more informations.

export class Consumer {
  constructor(
    @Inject('HERO_SERVICE') private client: KafkaService
  ) {}

  @SubscribeTo('hero.kill.dragon')
  async getWorld(@Payload() data: KafkaMessage): Promise<void> {
    ...
  }

}

try {
const { timestamp, response, offset, key } = await this.deserializer.deserialize(message, { topic });
await callback.apply(objectRef, [response, key, offset, timestamp, partition]);
} catch(e) {
this.logger.error(`Error for message ${topic}: ${e}`);
// Log and throw to ensure we don't keep processing the messages when there is an error.
throw e;
}

Furthermore, as this module is about having no reply channels (comparing to the original KafkaTransporter) - why is the method

client.subscribeToResponseOf('hero.kill.dragon');

called in the examples?

Cheers!๐Ÿš€

Caching of latest schema ID's

Firstly, thanks for the work here!๐Ÿš€

In KafkaAvroRequestSerializer, the latest schema id's are saved. Definitely makes sense for me - but what happens when a schema updates (and with it the correct latest id)?
Do we need a mechanism for period refreshes of the latest schema id's?

Avro deserializer fails if key is not avro format

Can an option be added to not call decode on message keys, in our case only message values need decoding, and therefore we can't use the built in avro deserialiser class.

Also perhaps the use of a deserialiser can be documented to mention that you must return the message with a response property on it because of this line:

const { timestamp, response, offset, key } = await this.deserializer.deserialize(message, { topic });

As it seems that it's this response property that will be the data in any methods that are decorated with SubscribeTo.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.