Giter Site home page Giter Site logo

swift-kafka-client's People

Contributors

axelandersson avatar blindspotbounty avatar felixschlegel avatar franzbusch avatar gmilos avatar hyerra avatar mr-swifter avatar omarkj avatar rnro avatar samsv77 avatar shilpeegupta14 avatar yim-lee avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

swift-kafka-client's Issues

Improve general public interface of Errors

Discovery Task: What public error types do we want to expose in this package?

Having figured that out, please also implement those error types.

Note
Please also include those new error types in the DocC documentation of their throwing function using the Throws aside:
/// - Throws:

Integration of new Configuration API into project

New configuration API already implemented in #42.

  • Refactor KafkaClients into using the new configuration types (ConsumerConfig, ProducerConfig and TopicConfig)
  • deprecate the legacy configuration type
  • ideally: remove KafkaConfig (alongside its tests) and initialise librdkafka config inside of KafkaClient
  • KafkaConsumer: determine if consumer is assigned or subscribed through ConsumerConfig
  • enable passing topics through the ClientConfig instead of theinit of KafkaConsumer
  • make tests use new configuration API
  • update README to use new configuration API

rdkafka: SSL requirement

Currently, there is an implicit requirement to have openssl to build or use the library:
Possible build error:

In file included from /home/actions/_work/package-storage/package-storage/.build/checkouts/swift-kafka-client/Sources/Crdkafka/librdkafka/src/rdkafka_mock_handlers.c:34:
/home/actions/_work/package-storage/package-storage/.build/checkouts/swift-kafka-client/Sources/Crdkafka/librdkafka/src/rdkafka_int.h:55:10: fatal error: 'openssl/ssl.h' file not found
#include <openssl/ssl.h>

Possible runtime error for compiled code:

Exception Type:        EXC_CRASH (SIGABRT)
Exception Codes:       0x0000000000000000, 0x0000000000000000
Termination Reason:    Namespace DYLD, Code 1 Library missing

Should OpenSSL library be statically linked or requirement explicitly mentioned in documentation?

Make `triggerGracefulShutdown()` public (was: De-couple Kafka from ServiceLifecycle to separate target)

Currently ServiceLifecycle is a dependency for Kafka target.
While it might be convenient when producers/consumers are used as a bunch of services, it is not really needed in most of other cases as simple task or task group is enough to run kafka consumers/producers with stop() method (triggerGracefulShutdown() in case of KafkaConsumer/Producer).

Though, ServiceLifecycle has its own dependencies and target, it may not bring value to the end user. So, it would be nice to preserve this compatibility on the one hand and not ship it with main target.
Therefore, I wonder if it would be possible to move compatibility and related parts to separate target, please?

Change `swift-nio` Package dependency version (once `NIOAsyncSequenceProducer` has been released)

Our package makes use of NIOAsyncSequenceProducer. At the time of writing this issue, NIOAsyncSequenceProducer has not been included in an official swit-nio release (version at the time of writing this: 2.41.1).

Once a new swit-nio version has been released, please change the Package.swift dependency as follows:

dependencies: [
-   .package(url: "https://github.com/apple/swift-nio.git", branch: "main"),
+   .package(url: "https://github.com/apple/swift-nio.git", from: <latest_version>),
    .package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
],

Create `KafkaConfig` struct for global configurations

Build a struct called KafkaConfig that exposes a string-based dictionary that enables the user to configure their Kafka client.

Edit: The string-based dictionary has already been implemented, discuss here how and if we also want to create a strongly typed configuration struct

Committing offset on stopping leads to fatalError

Following the case #99 I was trying to use library with manual offset commits and disabled auto commit:

for try await messageResult in consumer.messages {
    try await consumer.commitSync(messageResult)
}

That one crashes the application on graceful stop of the group while still trying to commit offset:

        func commitSync() -> CommitSyncAction {
            switch self.state {
...
            case .consumptionStopped:
                fatalError("Cannot commit when consumption has been stopped") <-- here
...
        }

My assumption is that it would be good to allow commit an offset or to throw an error as done for finishing/finished states

Functionality wishlist

Hello!

As I mention before I really eager to jump off from my home made implementation of Swift Kafka API and use this version and ready to contribute to make it happens. To make it transparent I would like to list the functional gap and create separate issues for every item. Here is the list:

  • Transactional API - #78
    Ability to write multiple messages and offsets in the same transaction to leverage EOS
  • Rebalance callback
    Redefine rebalancing callback to be able properly react on assign/unassign partitions for all assignment strategies. Be able to seek to specific offsets
  • Statistics callback - #79
    Listent to Kafka statistics
  • Logging callback - #60
    Redirect librdkafka logs to logger.
  • Admin API
    Create and remove topics, manage consumer groups etc.
  • Message header API
    Be able specify message header when writing to Kafka
  • Metadata - #31
    Brokers information, number of in sync replies, leadership information etc
  • Sync / Async offset commit
    Support both ways and ack/nack if commit fails.
  • Poll outside of cooperative queue
    Always poll from single task + back pressure from the task (sync domain) to consumer stream (async domain)
  • Read in batches
    Performance optimisation which significantly improve read throughput.
  • Statically link with dependencies
    Get rid of dependencies to be able deploy single binary without prerequisites.

New Config: de-duplicate setters/getters properties

As I see, there are a lot of properties duplicated in Producer and Consumer configurations.

As example:

./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["receive.message.max.bytes"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["max.in.flight.requests.per.connection"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["metadata.max.age.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["topic.metadata.refresh.interval.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["topic.metadata.refresh.fast.interval.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["topic.metadata.refresh.sparse"] = newValue.description }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["topic.metadata.propagation.max.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["topic.blacklist"] = newValue.joined(separator: ",") }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["socket.timeout.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["socket.send.buffer.bytes"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["socket.receive.buffer.bytes"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["socket.keepalive.enable"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["socket.nagle.disable"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["socket.max.fails"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift:        set { self.dictionary["socket.connection.setup.timeout.ms"] = String(newValue) }

and

./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["receive.message.max.bytes"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["max.in.flight.requests.per.connection"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["metadata.max.age.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["topic.metadata.refresh.interval.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["topic.metadata.refresh.fast.interval.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["topic.metadata.refresh.sparse"] = newValue.description }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["topic.metadata.propagation.max.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["topic.blacklist"] = newValue.joined(separator: ",") }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["socket.timeout.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["socket.send.buffer.bytes"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["socket.receive.buffer.bytes"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["socket.keepalive.enable"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["socket.nagle.disable"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["socket.max.fails"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift:        set { self.dictionary["socket.connection.setup.timeout.ms"] = String(newValue) }

Probably, it make sense to put them into one place and de-dup this code or I miss something and that was made intentionally.

@felixschlegel could you suggest on that, please?

Enable connection to Kafka Servers

Note: Use the GSoC22 Proposal as a reference for the implementation of the public interface.

  • make it possible to connect and disconnect to/from a Kafka Server by implementing the KafkaClient class

  • create a static Logger property for logging the client status

    Not in the scope of this issue:

  • customizable KafkaConfig, please use default config here

  • connectAdditional() method, just throw a fatalError() or leave it out completely

New Config API: Test that values are set correctly

We want to test that every configuration property works properly with the new (strongly typed) configuration API.

Proposal: Write a test/tests that check if each available value from the new configuration API is set correctly in librdkafka.

(Open to discussion, might be too redundant)

Add `.gitignore`

  • add a .gitignore file including all default settings for Swift Packages
  • ignore all *.swp files that are created by Vim

Consumer autocommit lead to crash

With consumer enableAutoCommit option library crashes with fatalError at:

    private func handleOffsetCommitEvent(_ event: OpaquePointer?) {
        guard let opaquePointer = rd_kafka_event_opaque(event) else { <--- here
            fatalError("Could not resolve reference to catpured Swift callback instance")
        }

After some time (I guess after 5 seconds).
It is possible to either disable commit events in that case with enabled_events, either just skip events without opaque pointer.

With auto commit disabled this seems not reproducible.

Implement `KafkaProducer` and `sendAsync()`

Note: Use the GSoC22 Proposal as a reference for the implementation of the public interface.

  • create the KafkaProducerMessage type

  • implement the KafkaProducer class with the sendAsync() method as described in the proposal

    Not in the scope of this issue:

  • implementation of TopicConfig, please use default config here

  • sendSync() method, just throw a fatalError() or leave it out completely

Overhaul DocC documentation

(To be done, when we agreed upon a public interface for this package)

Although we have DocC documentation already, it lacks some consistency and examples. Therefore, we want to do the following:

  • polish our documentation
    • improve consistency
    • make this package easy to understand for someone who is not familiar with the Kafka terminology
  • provide code examples

Store offset on .finishing may lead to fatalError

If client still iterating through async sequence but triggerGracefulShutdown() was called, it may lead to fatal error:

        func storeOffset() -> StoreOffsetAction {
            switch self.state {
            case .uninitialized:
                fatalError("\(#function) invoked while still in state \(self.state)")
            case .initializing:
                fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets")
            case .consumptionStopped:
                fatalError("Cannot store offset when consumption has been stopped")
            case .consuming(let client, _):
                return .storeOffset(client: client)
            case .finishing, .finished:
                fatalError("\(#function) invoked while still in state \(self.state)") <---- here
            }
        }

Unlike for consumptionStopped, I believe it should be allowed to continue iterate on graceful shutdown until end of current sequence.

Make a Kafka Cluster's meta data accessible

Idea

Create a dedicated MetaData type that enables users to:

  • get all partitions for a topic

Note: This is a discovery task that might be extended or abolished in the future.

Be able to redirect librdkafka logging to Logger

It's quite useful to pass a logger to 3rd party and have consolidated log for both your application and 3rd party. I guess it's why KafkaConsumer/KafkaProducer have logger as part of init(...).

I suggest to do next step further and redirect logging from librdkafka to the same logger.
For this librdkafka allows to specify logging callback: rd_kafka_conf_set_log_cb

Transactional API

Transactional API is required to use Exactly Once Semantics provided by Kafka.

One of the ideas how to structure this:

  • Make new actor KafkaTransactionalProducer which supports regular KafkaProducer API and transactional API (like beginTransaction, commitTransaction, abortTransaction etc. Maybe it's simpler to make KafkaProducer as class and inherit transactional producer from it to avoid code duplication
    • KafkaTransactionalProducer should have send and sendOffset within a transaction
  • KafkaTransactionalProducer should take transactional.id as parameter.
  • KafkaTransactionalProducer should call rd_kafka_init_transactions(...) and make sure it's initialised and not fenced with others.
  • KafkaTransactionalProducer should handle retriable errors and tries to recover. Possible such errors also need to be delivered to optional callback (as notification)

Improve `README.md`

Motivation

The README file is the first thing a potential package user sees. Currently, our README still states that the project is work-in-progress and provides not much information about the package itself.

Desired Changes

  • write a brief description of the package
  • add a section to the README that briefly explains the package's public interface with some code examples
  • add a section about how to test locally on Mac/Linux (setting up Kafka, creating topics etc.)

Create `KafkaTopicConfig` struct for topic configurations

Build a struct called KafkaTopicConfig that exposes a string-based dictionary that enables the user to configure the new topic that is created by the KafkaProducer when a message is sent to a non-existing topic.

Edit: The string-based dictionary has already been implemented, discuss here how and if we also want to create a strongly typed configuration struct

Rename `KafkaProducer.sendAsync` to `KafkaProducer.send`

  • rename KafkaProducer.sendAsync to KafkaProducer.send
  • we are not planning to add a sync variant of KafkaProducer.send* anymore
  • add documentation stating that KafkaProducer.send is asynchronous (not in the Swift way, but rather in a non-blocking way as acknowledgements are received through the AsyncSequence)

Listen and propagate RD_KAFKA_RESP_ERR__PARTITION_EOF

Sometimes it is nice to know that partition/topic was read to EOF and it is supported by librdkafka.
It should be explicitly enabled with property enable.partition.eof=true and error is handled, e.g.:

        for _ in 0..<maxEvents {
            let event = rd_kafka_queue_poll(self.queue, 0)
            defer { rd_kafka_event_destroy(event) }

            let rdEventType = rd_kafka_event_type(event)
            guard let eventType = RDKafkaEvent(rawValue: rdEventType) else {
                fatalError("Unsupported event type: \(rdEventType)")
            }

            switch eventType {
            case .error:
                let err = rd_kafka_event_error(event)
                if err == RD_KAFKA_RESP_ERR__PARTITION_EOF {
                    let topicPartition = rd_kafka_event_topic_partition(event)
                    if let topicPartition {
                    ... return events

Probably, it could be extended with current api e.g.:

public struct KafkaConsumerMessage {
    /// The topic that the message was received from.
    public var topic: String
    /// The partition that the message was received from.
    public var partition: KafkaPartition
    /// The key of the message.
    public var key: ByteBuffer?
    /// The body of the message.
    public var value: ByteBuffer
    /// The offset of the message in its partition.
    public var offset: KafkaOffset
    var eof: Bool {
        self.value.readableBytesView.isEmpty
    }

    /// Initialize ``KafkaConsumerMessage`` as EOF from `rd_kafka_topic_partition_t` pointer.
    /// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
    internal init(topicPartitionPointer: UnsafePointer<rd_kafka_topic_partition_t>) {
        let topicPartition = topicPartitionPointer.pointee
        guard let topic = String(validatingUTF8: topicPartition.topic) else {
            fatalError("Received topic name that is non-valid UTF-8")
        }
        self.topic = topic
        self.partition = KafkaPartition(rawValue: Int(topicPartition.partition))
        self.offset = KafkaOffset(rawValue: Int(topicPartition.offset))
        self.value = ByteBuffer()
    }

or changed to enum:

enum KafkaConsumerMessage {
    case message(topic: String, partition: KafkaPartition, key: ByteBuffer?, value: ByteBuffer, offset: KafkaOffset)
    case eof(topic: String, partition: KafkaPartition, offset: KafkaOffset)
}

TopicConfiguration default values should not be set to librdkafka config

It is not something obvious in librdkafka but if we set topic config to default values, config become modified.
That may cause unexpected behaviour.

For example, I use the following config (dump):

Config for topic is 
auto.commit.enable: true
auto.commit.interval.ms: 60000
auto.offset.reset: largest
compression.codec: inherit
compression.level: -1
consume.callback.max.messages: 0
message.timeout.ms: 300000
offset.store.method: broker
offset.store.path: .
offset.store.sync.interval.ms: -1
partitioner: consistent_random
produce.offset.report: false
queuing.strategy: fifo
request.required.acks: -1
request.timeout.ms: 30000

After the code in RDKafkaTopicConfig:

        try topicConfig.dictionary.forEach { key, value in
            try Self.set(configPointer: configPointer, key: key, value: value)
        }

it is not changed:

After Config for topic is 
auto.commit.enable: true
auto.commit.interval.ms: 60000
auto.offset.reset: largest
compression.codec: inherit
compression.level: -1
consume.callback.max.messages: 0
message.timeout.ms: 300000
offset.store.method: broker
offset.store.path: .
offset.store.sync.interval.ms: -1
partitioner: consistent_random
produce.offset.report: false
queuing.strategy: fifo
request.required.acks: -1
request.timeout.ms: 30000

But since these values are modified, I have the following error:

error: -[IntegrationTests.SwiftKafkaTests testProduceAndConsumeWithTransaction] : failed: caught error: "KafkaError.rdKafkaError: Local: Invalid argument or configuration SwiftKafka/RDKafkaTopicHandles.swift:76

For experiment, I've commented the code above and everything works.

Just some more information. The reason for that is hidden in how librdkafka works. It has some 'modified' flags inside that changes even if we set values to default, while remain unchanged when config is untouched:

rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void) {
        rd_kafka_topic_conf_t *tconf = rd_calloc(1, sizeof(*tconf));
        rd_assert(RD_KAFKA_CONF_PROPS_IDX_MAX > sizeof(*tconf) &&
                  *"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
        rd_kafka_defaultconf_set(_RK_TOPIC, tconf);
        rd_kafka_anyconf_clear_all_is_modified(tconf);
        return tconf;
}

const char *rd_kafka_topic_conf_finalize(rd_kafka_type_t cltype,
                                         const rd_kafka_conf_t *conf,
                                         rd_kafka_topic_conf_t *tconf) {
...
if (rd_kafka_topic_conf_is_modified(tconf, "acks")) {
...
if (rd_kafka_topic_conf_is_modified(tconf,
                                                    "queuing.strategy")) {
...
                if (conf->eos.transactional_id) {
                        if (!rd_kafka_topic_conf_is_modified(
                                tconf, "message.timeout.ms"))
...

Furthermore, topic config supports nullptr (aka swift nil) value that creates default topic configuration.

I see several options here for gsoc interface (though, might be more):

  1. Add possibility to provide nil topic configuration -> convert to null pointer in librdkafka
  2. Make values optional in topic configuration and set only changed values
  3. Check values in topic configuration: if they equal to provided -> don't set them

Does `KafkaProducerMessage` suppose to copy bytes to ByteBuffer?

I see that KafkaProducerMessage is using ByteBuffer from nio:

public struct KafkaProducerMessage {
     public var topic: String
     public var partition: KafkaPartition
     public var key: ByteBuffer?
     public var value: ByteBuffer

While it might be convenient way to pass safe bytes, it seem that no any async things happens on the way:

// KafkaProducer
public func send(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID {
        let action = try self.stateMachine.withLockedValue { try $0.send() }
        switch action {
        case .send(let client, let newMessageID, let topicHandles):
            try client.produce(
...
// KafkaClient
func produce(
        message: KafkaProducerMessage,
        newMessageID: UInt,
        topicConfig: KafkaTopicConfiguration,
        topicHandles: RDKafkaTopicHandles
    ) throws {
        let keyBytes: [UInt8]?
        if var key = message.key {
            keyBytes = key.readBytes(length: key.readableBytes)
        } else {
            keyBytes = nil
        }

        let responseCode = try message.value.withUnsafeReadableBytes { valueBuffer in
            return try topicHandles.withTopicHandlePointer(topic: message.topic, topicConfig: topicConfig) { topicHandle in
                // Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster.
                // Returns 0 on success, error code otherwise.
                return rd_kafka_produce(
                    topicHandle,
                    message.partition.rawValue,
                    RD_KAFKA_MSG_F_COPY,
                    UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress),
                    valueBuffer.count,
                    keyBytes,
                    keyBytes?.count ?? 0,
                    UnsafeMutableRawPointer(bitPattern: newMessageID)
                )
            }
        }

So, if we use our own buffer (e.g. Flatbuffers ByteBuffer), we should copy it to nio ByteBuffer.
Could you advise if there is any way to avoid copying the buffer in nio that I'am missing?
Should it be replaced with ByteBufferView or with UnsafeRawBufferPointer if it is not supposed?

Upgrade Package to Swift 5.7

  • make use of new optional binding syntax e.g. if let foo {
  • make use of the new Clock API for time and date types
  • update generics
  • update Swift version in .swiftformat file

Kafka consumer leaves group after 5 mins

KafkaConsumer stops after 5 mins of consumption with error:

MAXPOLL] [thrd:main]: Application maximum poll interval (300000ms) exceeded by 472ms (adjust max.poll.interval.ms for long-running message processing): leaving group

There are two problems:

  1. The error itself
  2. This error is not reported to client code (i.e. not handled in eventPoll)

How to reproduce

As minimal reproducing snippet I can suggest the following diff to existing test to make sequence big enough

diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift
index d18cd72..f0ceb94 100644
--- a/Sources/Kafka/RDKafka/RDKafkaClient.swift
+++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift
@@ -183,6 +183,10 @@ final class RDKafkaClient: Sendable {
                 self.handleLogEvent(event)
             case .offsetCommit:
                 self.handleOffsetCommitEvent(event)
+            case .error:
+                let err = rd_kafka_event_error_string(event)
+                let error = String(cString: err!)
+                fatalError("Got an error for: \(error)")
             case .none:
                 // Finished reading events, return early
                 return events
diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift
index b5c551e..a5edd5d 100644
--- a/Tests/IntegrationTests/KafkaTests.swift
+++ b/Tests/IntegrationTests/KafkaTests.swift
@@ -291,7 +291,7 @@ final class KafkaTests: XCTestCase {
     }
 
     func testCommittedOffsetsAreCorrect() async throws {
-        let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
+        let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10000000) // some endless sequence of messages
         let firstConsumerOffset = testMessages.count / 2
         let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)
 
@@ -307,6 +307,9 @@ final class KafkaTests: XCTestCase {
             ),
             bootstrapBrokerAddresses: [self.bootstrapBrokerAddress]
         )
+//      Uncomment to speed up (30 seconds instead of 5 mins):
+//        consumer1Config.maximumPollInterval = .seconds(30)
+//        consumer1Config.session.timeout = .seconds(30)
         consumer1Config.autoOffsetReset = .beginning // Read topic from beginning
         consumer1Config.broker.addressFamily = .v4
 
@@ -455,6 +458,7 @@ final class KafkaTests: XCTestCase {
         var messageIDs = Set<KafkaProducerMessageID>()
 
         for message in messages {
+            try await Task.sleep(for: .seconds(1)) // avoid queue overflow
             messageIDs.insert(try producer.send(message))
         }

Result of this test is:

Test Case '-[IntegrationTests.KafkaTests testCommittedOffsetsAreCorrect]' started.
2023-08-09T15:07:04+0300 warning kafka.test : [MAXPOLL] [thrd:main]: Application maximum poll interval (300000ms) exceeded by 472ms (adjust max.poll.interval.ms for long-running message processing): leaving group
Kafka/RDKafkaClient.swift:189: Fatal error: Got an error for: Application maximum poll interval (300000ms) exceeded by 472ms

That is very similar to confluentinc/confluent-kafka-go#980 but not the same. That bug was fixed in 2.1.1 which is a part of swift kafka gsoc.

Refactor `KafkaProducer`'s poll-loop

Instead of kicking off the KafkaProducer.pollTask ourselves, we want to have something like a function:

func run() {
    while !shutdown {
        poll()
    }
}

This function run() is blocking until the producer is stopped. It shall be called by a Task that the user creates. E.g.:

let producer = KafkaProducer()

Task {
  producer.run()
}

// Produce messages etc.

New Config API: Re-evaluate public API & configuration options

  • Currently, we expose more or less all librdkafka configurations โ€” do we want to narrow it down to the most basic configurations and expand this on user request / provide an unsafe API that allows pro-users to set arbitrary key-value options?
  • librdkafka exposes configuration options regarding OAuth and SSL keys etc. โ€” does our package even intend to expose that as well?

Map `librdkafka`'s error types to Swift

  • create an error type called KafkaError that contains all possible error names + their code and description
  • update all existing KafkaErrors being thrown and make them more specific (e.g. by making use of librdkafkas errstr value)
  • (Optional) Use automatic code generation to create the different errors, as all error types are exposed by librdkafka's rd_kafka_get_err_descs() function

Make KafkaError more catch friendly and efficient

Currently, to catch rdkafka specific errors it is required to compare a string, for example:

do {
    try self.kafkaProducer.send(record)
    return
} catch let error as KafkaError where error.description.contains("one possible rdkafka error") {
// handle exception
} catch let error as KafkaError where error.description.contains("second possible rdkafka error") {
// handle exception
} catch let error as KafkaError where error.description.contains("third possible rdkafka error") {
// handle exception
} catch {
// handle unknown exception
}

It would be nice to have errors listed as public enum somewhere:

do {
    try self.kafkaProducer.send(record)
    return
} catch let error as KafkaError where error.code == .onePossibleRdKafkaError {
// handle exception
} catch let error as KafkaError where error.code == .secondPossibleRdKafkaError {
// handle exception
} catch let error as KafkaError where error.code == .thirdPossibleRdKafkaError {
// handle exception
} catch {
// handle unknown exception
}

Since it might be an often error: e.g. RD_KAFKA_RESP_ERR__QUEUE_FULL/RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE/RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION/RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC/RD_KAFKA_RESP_ERR__FATAL/RD_KAFKA_RESP_ERR__STATE

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.