swift-server / swift-kafka-client Goto Github PK
View Code? Open in Web Editor NEWLicense: Apache License 2.0
License: Apache License 2.0
Discovery Task: What public
error types do we want to expose in this package?
Having figured that out, please also implement those error types.
Note
Please also include those new error types in the DocC documentation of their throwing function using theThrows
aside:
/// - Throws:
New configuration API already implemented in #42.
KafkaClient
s into using the new configuration types (ConsumerConfig
, ProducerConfig
and TopicConfig
)KafkaConfig
(alongside its tests) and initialise librdkafka
config inside of KafkaClient
KafkaConsumer
: determine if consumer is assigned or subscribed through ConsumerConfig
ClientConfig
instead of theinit
of KafkaConsumer
Split the KafkaConsumer
into:
KafkaSinglePartitionConsumer
for the assignment-based consumerKafkaGroupConsumer
for the consumer-group-based consumerCurrently, there is an implicit requirement to have openssl to build or use the library:
Possible build error:
In file included from /home/actions/_work/package-storage/package-storage/.build/checkouts/swift-kafka-client/Sources/Crdkafka/librdkafka/src/rdkafka_mock_handlers.c:34:
/home/actions/_work/package-storage/package-storage/.build/checkouts/swift-kafka-client/Sources/Crdkafka/librdkafka/src/rdkafka_int.h:55:10: fatal error: 'openssl/ssl.h' file not found
#include <openssl/ssl.h>
Possible runtime error for compiled code:
Exception Type: EXC_CRASH (SIGABRT)
Exception Codes: 0x0000000000000000, 0x0000000000000000
Termination Reason: Namespace DYLD, Code 1 Library missing
Should OpenSSL library be statically linked or requirement explicitly mentioned in documentation?
Currently ServiceLifecycle is a dependency for Kafka target.
While it might be convenient when producers/consumers are used as a bunch of services, it is not really needed in most of other cases as simple task or task group is enough to run kafka consumers/producers with stop() method (triggerGracefulShutdown()
in case of KafkaConsumer/Producer).
Though, ServiceLifecycle has its own dependencies and target, it may not bring value to the end user. So, it would be nice to preserve this compatibility on the one hand and not ship it with main target.
Therefore, I wonder if it would be possible to move compatibility and related parts to separate target, please?
Our package makes use of NIOAsyncSequenceProducer
. At the time of writing this issue, NIOAsyncSequenceProducer
has not been included in an official swit-nio release (version at the time of writing this: 2.41.1).
Once a new swit-nio version has been released, please change the Package.swift
dependency as follows:
dependencies: [
- .package(url: "https://github.com/apple/swift-nio.git", branch: "main"),
+ .package(url: "https://github.com/apple/swift-nio.git", from: <latest_version>),
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
],
Build a struct called KafkaConfig
that exposes a string-based dictionary that enables the user to configure their Kafka client.
Edit: The string-based dictionary has already been implemented, discuss here how and if we also want to create a strongly typed configuration
struct
Following the case #99 I was trying to use library with manual offset commits and disabled auto commit:
for try await messageResult in consumer.messages {
try await consumer.commitSync(messageResult)
}
That one crashes the application on graceful stop of the group while still trying to commit offset:
func commitSync() -> CommitSyncAction {
switch self.state {
...
case .consumptionStopped:
fatalError("Cannot commit when consumption has been stopped") <-- here
...
}
My assumption is that it would be good to allow commit an offset or to throw an error as done for finishing
/finished
states
Implement the sendSync()
method for the KafkaProducer
that is async
and returns upon reception of a delivery report or error.
Hello!
As I mention before I really eager to jump off from my home made implementation of Swift Kafka API and use this version and ready to contribute to make it happens. To make it transparent I would like to list the functional gap and create separate issues for every item. Here is the list:
librdkafka
logs to logger.As I see, there are a lot of properties duplicated in Producer and Consumer configurations.
As example:
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["receive.message.max.bytes"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["max.in.flight.requests.per.connection"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["metadata.max.age.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["topic.metadata.refresh.interval.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["topic.metadata.refresh.fast.interval.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["topic.metadata.refresh.sparse"] = newValue.description }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["topic.metadata.propagation.max.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["topic.blacklist"] = newValue.joined(separator: ",") }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["socket.timeout.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["socket.send.buffer.bytes"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["socket.receive.buffer.bytes"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["socket.keepalive.enable"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["socket.nagle.disable"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["socket.max.fails"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift: set { self.dictionary["socket.connection.setup.timeout.ms"] = String(newValue) }
and
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["receive.message.max.bytes"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["max.in.flight.requests.per.connection"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["metadata.max.age.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["topic.metadata.refresh.interval.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["topic.metadata.refresh.fast.interval.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["topic.metadata.refresh.sparse"] = newValue.description }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["topic.metadata.propagation.max.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["topic.blacklist"] = newValue.joined(separator: ",") }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["socket.timeout.ms"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["socket.send.buffer.bytes"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["socket.receive.buffer.bytes"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["socket.keepalive.enable"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["socket.nagle.disable"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["socket.max.fails"] = String(newValue) }
./Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift: set { self.dictionary["socket.connection.setup.timeout.ms"] = String(newValue) }
Probably, it make sense to put them into one place and de-dup this code or I miss something and that was made intentionally.
@felixschlegel could you suggest on that, please?
Note: Use the GSoC22 Proposal as a reference for the implementation of the public interface.
make it possible to connect and disconnect to/from a Kafka Server by implementing the KafkaClient
class
create a static Logger
property for logging the client status
Not in the scope of this issue:
customizable KafkaConfig
, please use default config here
connectAdditional()
method, just throw a fatalError()
or leave it out completely
Note: Use the GSoC22 Proposal as a reference for the implementation of the public interface.
KafkaConsumerMessage
typeKafkaClient
class with the subscribe()
method as described in the proposalPossible Solution:
Create a docker container with a kafka-server
instance running locally.
The integration tests can then be run inside of this docker container.
We want to test that every configuration property works properly with the new (strongly typed) configuration API.
Proposal: Write a test/tests that check if each available value from the new configuration API is set correctly in librdkafka
.
(Open to discussion, might be too redundant)
.gitignore
file including all default settings for Swift Packages*.swp
files that are created by VimImplement the Service
protocol from swift-service-lifecycle
in both the KafkaProducer
and the KafkaConsumer
.
With consumer enableAutoCommit
option library crashes with fatalError at:
private func handleOffsetCommitEvent(_ event: OpaquePointer?) {
guard let opaquePointer = rd_kafka_event_opaque(event) else { <--- here
fatalError("Could not resolve reference to catpured Swift callback instance")
}
After some time (I guess after 5 seconds).
It is possible to either disable commit events in that case with enabled_events
, either just skip events without opaque pointer.
With auto commit disabled this seems not reproducible.
Note: Use the GSoC22 Proposal as a reference for the implementation of the public interface.
create the KafkaProducerMessage
type
implement the KafkaProducer
class with the sendAsync()
method as described in the proposal
Not in the scope of this issue:
implementation of TopicConfig
, please use default config here
sendSync()
method, just throw a fatalError()
or leave it out completely
(To be done, when we agreed upon a public interface for this package)
Although we have DocC documentation already, it lacks some consistency and examples. Therefore, we want to do the following:
If client still iterating through async sequence but triggerGracefulShutdown()
was called, it may lead to fatal error:
func storeOffset() -> StoreOffsetAction {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .initializing:
fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets")
case .consumptionStopped:
fatalError("Cannot store offset when consumption has been stopped")
case .consuming(let client, _):
return .storeOffset(client: client)
case .finishing, .finished:
fatalError("\(#function) invoked while still in state \(self.state)") <---- here
}
}
Unlike for consumptionStopped
, I believe it should be allowed to continue iterate on graceful shutdown until end of current sequence.
@FranzBusch: librdkafka exposes a callback based API for consuming messages which ought to be faster than the poll based one. We should investigate this
Create a dedicated MetaData
type that enables users to:
Note: This is a discovery task that might be extended or abolished in the future.
It's quite useful to pass a logger to 3rd party and have consolidated log for both your application and 3rd party. I guess it's why KafkaConsumer
/KafkaProducer
have logger
as part of init(...)
.
I suggest to do next step further and redirect logging from librdkafka
to the same logger.
For this librdkafka
allows to specify logging callback: rd_kafka_conf_set_log_cb
Transactional API is required to use Exactly Once Semantics provided by Kafka.
One of the ideas how to structure this:
KafkaTransactionalProducer
which supports regular KafkaProducer
API and transactional API (like beginTransaction
, commitTransaction
, abortTransaction
etc. Maybe it's simpler to make KafkaProducer
as class and inherit transactional producer from it to avoid code duplicationKafkaTransactionalProducer
should have send
and sendOffset
within a transactionKafkaTransactionalProducer
should take transactional.id
as parameter.KafkaTransactionalProducer
should call rd_kafka_init_transactions(...)
and make sure it's initialised and not fenced with others.KafkaTransactionalProducer
should handle retriable errors and tries to recover. Possible such errors also need to be delivered to optional callback (as notification)The README
file is the first thing a potential package user sees. Currently, our README
still states that the project is work-in-progress and provides not much information about the package itself.
README
that briefly explains the package's public interface with some code examplesBuild a struct called KafkaTopicConfig
that exposes a string-based dictionary that enables the user to configure the new topic that is created by the KafkaProducer
when a message is sent to a non-existing topic.
Edit: The string-based dictionary has already been implemented, discuss here how and if we also want to create a strongly typed configuration
struct
KafkaProducer.sendAsync
to KafkaProducer.send
KafkaProducer.send*
anymoreKafkaProducer.send
is asynchronous (not in the Swift way, but rather in a non-blocking way as acknowledgements are received through the AsyncSequence
)librdkafka
provides statistics over a callback which can be set using rd_kafka_conf_set_stats_cb
.
Statistics callback should be set in combination with statistics.interval.ms
.
Reference https://docs.confluent.io/platform/current/clients/librdkafka/html/md_STATISTICS.html
https://quicktype.io can be used to generate proper structs for statistics content.
Sometimes it is nice to know that partition/topic was read to EOF and it is supported by librdkafka.
It should be explicitly enabled with property enable.partition.eof=true
and error is handled, e.g.:
for _ in 0..<maxEvents {
let event = rd_kafka_queue_poll(self.queue, 0)
defer { rd_kafka_event_destroy(event) }
let rdEventType = rd_kafka_event_type(event)
guard let eventType = RDKafkaEvent(rawValue: rdEventType) else {
fatalError("Unsupported event type: \(rdEventType)")
}
switch eventType {
case .error:
let err = rd_kafka_event_error(event)
if err == RD_KAFKA_RESP_ERR__PARTITION_EOF {
let topicPartition = rd_kafka_event_topic_partition(event)
if let topicPartition {
... return events
Probably, it could be extended with current api e.g.:
public struct KafkaConsumerMessage {
/// The topic that the message was received from.
public var topic: String
/// The partition that the message was received from.
public var partition: KafkaPartition
/// The key of the message.
public var key: ByteBuffer?
/// The body of the message.
public var value: ByteBuffer
/// The offset of the message in its partition.
public var offset: KafkaOffset
var eof: Bool {
self.value.readableBytesView.isEmpty
}
/// Initialize ``KafkaConsumerMessage`` as EOF from `rd_kafka_topic_partition_t` pointer.
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
internal init(topicPartitionPointer: UnsafePointer<rd_kafka_topic_partition_t>) {
let topicPartition = topicPartitionPointer.pointee
guard let topic = String(validatingUTF8: topicPartition.topic) else {
fatalError("Received topic name that is non-valid UTF-8")
}
self.topic = topic
self.partition = KafkaPartition(rawValue: Int(topicPartition.partition))
self.offset = KafkaOffset(rawValue: Int(topicPartition.offset))
self.value = ByteBuffer()
}
or changed to enum:
enum KafkaConsumerMessage {
case message(topic: String, partition: KafkaPartition, key: ByteBuffer?, value: ByteBuffer, offset: KafkaOffset)
case eof(topic: String, partition: KafkaPartition, offset: KafkaOffset)
}
We need to find a way to prevent reference cycles between KafkaConsumer
and ConsumerMessagesAsyncSequenceDelegate
without making use of weak
as it causes performance issues.
For more information see: #24 (comment)
Currently KafkaAcknowledgedMessageError
uses a preliminary public interface. This shall be changed to a more user-friendly and descriptive version.
Franz: we do not want to expose
.rawValue
asInt32
Implement the connectAdditional(brokers:)
method in KafkaClient
that enables users to connect to additional Kafka brokers during runtime.
It is not something obvious in librdkafka but if we set topic config to default values, config become modified.
That may cause unexpected behaviour.
For example, I use the following config (dump):
Config for topic is
auto.commit.enable: true
auto.commit.interval.ms: 60000
auto.offset.reset: largest
compression.codec: inherit
compression.level: -1
consume.callback.max.messages: 0
message.timeout.ms: 300000
offset.store.method: broker
offset.store.path: .
offset.store.sync.interval.ms: -1
partitioner: consistent_random
produce.offset.report: false
queuing.strategy: fifo
request.required.acks: -1
request.timeout.ms: 30000
After the code in RDKafkaTopicConfig:
try topicConfig.dictionary.forEach { key, value in
try Self.set(configPointer: configPointer, key: key, value: value)
}
it is not changed:
After Config for topic is
auto.commit.enable: true
auto.commit.interval.ms: 60000
auto.offset.reset: largest
compression.codec: inherit
compression.level: -1
consume.callback.max.messages: 0
message.timeout.ms: 300000
offset.store.method: broker
offset.store.path: .
offset.store.sync.interval.ms: -1
partitioner: consistent_random
produce.offset.report: false
queuing.strategy: fifo
request.required.acks: -1
request.timeout.ms: 30000
But since these values are modified, I have the following error:
error: -[IntegrationTests.SwiftKafkaTests testProduceAndConsumeWithTransaction] : failed: caught error: "KafkaError.rdKafkaError: Local: Invalid argument or configuration SwiftKafka/RDKafkaTopicHandles.swift:76
For experiment, I've commented the code above and everything works.
Just some more information. The reason for that is hidden in how librdkafka works. It has some 'modified' flags inside that changes even if we set values to default, while remain unchanged when config is untouched:
rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void) {
rd_kafka_topic_conf_t *tconf = rd_calloc(1, sizeof(*tconf));
rd_assert(RD_KAFKA_CONF_PROPS_IDX_MAX > sizeof(*tconf) &&
*"Increase RD_KAFKA_CONF_PROPS_IDX_MAX");
rd_kafka_defaultconf_set(_RK_TOPIC, tconf);
rd_kafka_anyconf_clear_all_is_modified(tconf);
return tconf;
}
const char *rd_kafka_topic_conf_finalize(rd_kafka_type_t cltype,
const rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *tconf) {
...
if (rd_kafka_topic_conf_is_modified(tconf, "acks")) {
...
if (rd_kafka_topic_conf_is_modified(tconf,
"queuing.strategy")) {
...
if (conf->eos.transactional_id) {
if (!rd_kafka_topic_conf_is_modified(
tconf, "message.timeout.ms"))
...
Furthermore, topic config supports nullptr
(aka swift nil
) value that creates default topic configuration.
I see several options here for gsoc interface (though, might be more):
I see that KafkaProducerMessage
is using ByteBuffer
from nio:
public struct KafkaProducerMessage {
public var topic: String
public var partition: KafkaPartition
public var key: ByteBuffer?
public var value: ByteBuffer
While it might be convenient way to pass safe bytes, it seem that no any async things happens on the way:
// KafkaProducer
public func send(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID {
let action = try self.stateMachine.withLockedValue { try $0.send() }
switch action {
case .send(let client, let newMessageID, let topicHandles):
try client.produce(
...
// KafkaClient
func produce(
message: KafkaProducerMessage,
newMessageID: UInt,
topicConfig: KafkaTopicConfiguration,
topicHandles: RDKafkaTopicHandles
) throws {
let keyBytes: [UInt8]?
if var key = message.key {
keyBytes = key.readBytes(length: key.readableBytes)
} else {
keyBytes = nil
}
let responseCode = try message.value.withUnsafeReadableBytes { valueBuffer in
return try topicHandles.withTopicHandlePointer(topic: message.topic, topicConfig: topicConfig) { topicHandle in
// Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster.
// Returns 0 on success, error code otherwise.
return rd_kafka_produce(
topicHandle,
message.partition.rawValue,
RD_KAFKA_MSG_F_COPY,
UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress),
valueBuffer.count,
keyBytes,
keyBytes?.count ?? 0,
UnsafeMutableRawPointer(bitPattern: newMessageID)
)
}
}
So, if we use our own buffer (e.g. Flatbuffers ByteBuffer), we should copy it to nio ByteBuffer.
Could you advise if there is any way to avoid copying the buffer in nio that I'am missing?
Should it be replaced with ByteBufferView or with UnsafeRawBufferPointer if it is not supposed?
Make sure that the package builds inside of Docker
if let foo {
.swiftformat
fileExpose the rdkafka.h
headers to the Swift package using module map
+ a shim header as the location of the rdkafka.h
file relies on the OS
Internalise properties of KafkaProducer
into its state
using enum
associated values as well as possible
Investigate if we still need KafkaProducer.shutdownGracefully()
KafkaConsumer stops after 5 mins of consumption with error:
MAXPOLL] [thrd:main]: Application maximum poll interval (300000ms) exceeded by 472ms (adjust max.poll.interval.ms for long-running message processing): leaving group
There are two problems:
eventPoll
)As minimal reproducing snippet I can suggest the following diff to existing test to make sequence big enough
diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift
index d18cd72..f0ceb94 100644
--- a/Sources/Kafka/RDKafka/RDKafkaClient.swift
+++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift
@@ -183,6 +183,10 @@ final class RDKafkaClient: Sendable {
self.handleLogEvent(event)
case .offsetCommit:
self.handleOffsetCommitEvent(event)
+ case .error:
+ let err = rd_kafka_event_error_string(event)
+ let error = String(cString: err!)
+ fatalError("Got an error for: \(error)")
case .none:
// Finished reading events, return early
return events
diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift
index b5c551e..a5edd5d 100644
--- a/Tests/IntegrationTests/KafkaTests.swift
+++ b/Tests/IntegrationTests/KafkaTests.swift
@@ -291,7 +291,7 @@ final class KafkaTests: XCTestCase {
}
func testCommittedOffsetsAreCorrect() async throws {
- let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
+ let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10000000) // some endless sequence of messages
let firstConsumerOffset = testMessages.count / 2
let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)
@@ -307,6 +307,9 @@ final class KafkaTests: XCTestCase {
),
bootstrapBrokerAddresses: [self.bootstrapBrokerAddress]
)
+// Uncomment to speed up (30 seconds instead of 5 mins):
+// consumer1Config.maximumPollInterval = .seconds(30)
+// consumer1Config.session.timeout = .seconds(30)
consumer1Config.autoOffsetReset = .beginning // Read topic from beginning
consumer1Config.broker.addressFamily = .v4
@@ -455,6 +458,7 @@ final class KafkaTests: XCTestCase {
var messageIDs = Set<KafkaProducerMessageID>()
for message in messages {
+ try await Task.sleep(for: .seconds(1)) // avoid queue overflow
messageIDs.insert(try producer.send(message))
}
Result of this test is:
Test Case '-[IntegrationTests.KafkaTests testCommittedOffsetsAreCorrect]' started.
2023-08-09T15:07:04+0300 warning kafka.test : [MAXPOLL] [thrd:main]: Application maximum poll interval (300000ms) exceeded by 472ms (adjust max.poll.interval.ms for long-running message processing): leaving group
Kafka/RDKafkaClient.swift:189: Fatal error: Got an error for: Application maximum poll interval (300000ms) exceeded by 472ms
That is very similar to confluentinc/confluent-kafka-go#980 but not the same. That bug was fixed in 2.1.1 which is a part of swift kafka gsoc.
Instead of kicking off the KafkaProducer.pollTask
ourselves, we want to have something like a function:
func run() {
while !shutdown {
poll()
}
}
This function run()
is blocking until the producer is stopped. It shall be called by a Task
that the user creates. E.g.:
let producer = KafkaProducer()
Task {
producer.run()
}
// Produce messages etc.
librdkafka
configurations โ do we want to narrow it down to the most basic configurations and expand this on user request / provide an unsafe API that allows pro-users to set arbitrary key-value options?librdkafka
exposes configuration options regarding OAuth and SSL keys etc. โ does our package even intend to expose that as well?Write tests that execute sendAsync()
concurrently on a KafkaProducer
to ensure there are no race conditions occurring.
KafkaError
that contains all possible error names + their code
and description
KafkaError
s being thrown and make them more specific (e.g. by making use of librdkafka
s errstr
value)librdkafka
's rd_kafka_get_err_descs()
functionCurrently, KafkaConsumer.serialQueue
does not allow us to cancel tasks. This shall be changed.
KafkaConsumer.commitSync
function to take an array of KafkaConsumerMessage
's from different topics as its parameterCurrently, to catch rdkafka specific errors it is required to compare a string, for example:
do {
try self.kafkaProducer.send(record)
return
} catch let error as KafkaError where error.description.contains("one possible rdkafka error") {
// handle exception
} catch let error as KafkaError where error.description.contains("second possible rdkafka error") {
// handle exception
} catch let error as KafkaError where error.description.contains("third possible rdkafka error") {
// handle exception
} catch {
// handle unknown exception
}
It would be nice to have errors listed as public enum somewhere:
do {
try self.kafkaProducer.send(record)
return
} catch let error as KafkaError where error.code == .onePossibleRdKafkaError {
// handle exception
} catch let error as KafkaError where error.code == .secondPossibleRdKafkaError {
// handle exception
} catch let error as KafkaError where error.code == .thirdPossibleRdKafkaError {
// handle exception
} catch {
// handle unknown exception
}
Since it might be an often error: e.g. RD_KAFKA_RESP_ERR__QUEUE_FULL/RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE/RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION/RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC/RD_KAFKA_RESP_ERR__FATAL/RD_KAFKA_RESP_ERR__STATE
See: rd_kafka_conf_set_error_cb()
logger
objectKafkaProducer / KafkaConsumer
when fatal error occurs (needs more investigation, idea: shutdown on fatal error)A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.