Giter Site home page Giter Site logo

hw-kafka-client's Introduction

hw-kafka-client

CircleCI

Kafka bindings for Haskell backed by the librdkafka C module.

Ecosystem

HaskellWorks Kafka ecosystem is described here: https://github.com/haskell-works/hw-kafka

Consumer

High level consumers are supported by librdkafka starting from version 0.9. High-level consumers provide an abstraction for consuming messages from multiple partitions and topics. They also address scalability (up to a number of partitions) by providing automatic rebalancing functionality. When a new consumer joins a consumer group the set of consumers attempts to "rebalance" the load to assign partitions to each consumer.

Consumer example

See Running integration tests locally to learn how to configure a local environment.

cabal build --flag examples

or

cabal run kafka-client-example --flag examples

A working consumer example can be found here: ConsumerExample.hs
To run an example please compile with the examples flag.

import Control.Exception (bracket)
import Kafka.Consumer

-- Global consumer properties
consumerProps :: ConsumerProperties
consumerProps = brokersList ["localhost:9092"]
             <> groupId "consumer_example_group"
             <> noAutoCommit
             <> logLevel KafkaLogInfo

-- Subscription to topics
consumerSub :: Subscription
consumerSub = topics ["kafka-client-example-topic"]
           <> offsetReset Earliest

-- Running an example
runConsumerExample :: IO ()
runConsumerExample = do
    res <- bracket mkConsumer clConsumer runHandler
    print res
    where
      mkConsumer = newConsumer consumerProps consumerSub
      clConsumer (Left err) = return (Left err)
      clConsumer (Right kc) = maybe (Right ()) Left <$> closeConsumer kc
      runHandler (Left err) = return (Left err)
      runHandler (Right kc) = processMessages kc

-------------------------------------------------------------------
processMessages :: KafkaConsumer -> IO (Either KafkaError ())
processMessages kafka = do
    replicateM_ 10 $ do
      msg <- pollMessage kafka (Timeout 1000)
      putStrLn $ "Message: " <> show msg
      err <- commitAllOffsets OffsetCommit kafka
      putStrLn $ "Offsets: " <> maybe "Committed." show err
    return $ Right ()

Producer

kafka-client producer supports sending messages to multiple topics. Target topic name is a part of each message that is to be sent by produceMessage.

A working producer example can be found here: ProducerExample.hs

Delivery reports

Kafka Producer maintains its own internal queue for outgoing messages. Calling produceMessage does not mean that the message is actually written to Kafka, it only means that the message is put to that outgoing queue and that the producer will (eventually) push it to Kafka.

However, it is not always possible for the producer to send messages to Kafka. Network problems or Kafka cluster being offline can prevent the producer from doing it.

When a message cannot be sent to Kafka for some time (see message.timeout.ms configuration option), the message is dropped from the outgoing queue and the delivery report indicating an error is raised.

It is possible to configure hw-kafka-client to set an infinite message timeout so the message is never dropped from the queue:

producerProps :: ProducerProperties
producerProps = brokersList ["localhost:9092"]
             <> sendTimeout (Timeout 0)           -- for librdkafka "0" means "infinite" (see https://github.com/edenhill/librdkafka/issues/2015)

Delivery reports provide the way to detect when producer experiences problems sending messages to Kafka.

Currently hw-kafka-client only supports delivery error callbacks:

producerProps :: ProducerProperties
producerProps = brokersList ["localhost:9092"]
             <> setCallback (deliveryCallback print)

In the example above when the producer cannot deliver the message to Kafka, the error will be printed (and the message will be dropped).

Producer example

{-# LANGUAGE OverloadedStrings #-}
import Control.Exception (bracket)
import Control.Monad (forM_)
import Data.ByteString (ByteString)
import Kafka.Producer

-- Global producer properties
producerProps :: ProducerProperties
producerProps = brokersList ["localhost:9092"]
             <> logLevel KafkaLogDebug

-- Topic to send messages to
targetTopic :: TopicName
targetTopic = "kafka-client-example-topic"

-- Run an example
runProducerExample :: IO ()
runProducerExample =
    bracket mkProducer clProducer runHandler >>= print
    where
      mkProducer = newProducer producerProps
      clProducer (Left _)     = return ()
      clProducer (Right prod) = closeProducer prod
      runHandler (Left err)   = return $ Left err
      runHandler (Right prod) = sendMessages prod

sendMessages :: KafkaProducer -> IO (Either KafkaError ())
sendMessages prod = do
  err1 <- produceMessage prod (mkMessage Nothing (Just "test from producer") )
  forM_ err1 print

  err2 <- produceMessage prod (mkMessage (Just "key") (Just "test from producer (with key)"))
  forM_ err2 print

  return $ Right ()

mkMessage :: Maybe ByteString -> Maybe ByteString -> ProducerRecord
mkMessage k v = ProducerRecord
                  { prTopic = targetTopic
                  , prPartition = UnassignedPartition
                  , prKey = k
                  , prValue = v
                  }

Synchronous sending of messages

Because of the asynchronous nature of librdkafka, there is no API to provide synchronous production of messages. It is, however, possible to combine the delivery reports feature with that of callbacks. This can be done using the Kafka.Producer.produceMessage' function.

produceMessage' :: MonadIO m
                => KafkaProducer
                -> ProducerRecord
                -> (DeliveryReport -> IO ())
                -> m (Either ImmediateError ())

Using this function, you can provide a callback which will be invoked upon the produced message's delivery report. With a little help of MVars or similar, you can in fact, create a synchronous-like interface.

sendMessageSync :: MonadIO m
                => KafkaProducer
                -> ProducerRecord
                -> m (Either KafkaError Offset)
sendMessageSync producer record = liftIO $ do
  -- Create an empty MVar:
  var <- newEmptyMVar

  -- Produce the message and use the callback to put the delivery report in the
  -- MVar:
  res <- produceMessage' producer record (putMVar var)

  case res of
    Left (ImmediateError err) ->
      pure (Left err)
    Right () -> do
      -- Flush producer queue to make sure you don't get stuck waiting for the
      -- message to send:
      flushProducer producer

      -- Wait for the message's delivery report and map accordingly:
      takeMVar var >>= return . \case
        DeliverySuccess _ offset -> Right offset
        DeliveryFailure _ err    -> Left err
        NoMessageError err       -> Left err

Note: this is a semi-naive solution as this waits forever (or until librdkafka times out). You should make sure that your configuration reflects the behavior you want out of this functionality.

Running integration tests locally

shell.nix can be used to provide a working environment that is enough to build and test hw-kafka-client.

To be able to run tests locally, $KAFKA_TEST_BROKER environment variable is expected to be set (use shell.nix or export manually).

$KAFKA_TEST_BROKER should contain an IP address of an accessible Kafka broker that will be used to run integration tests against.

With Docker Compose this variable is used to configure Kafka broker to listen on this address:

$ docker-compose up

After that, integration tests can switched on with using 'it' flag:

$ cabal test --test-show-details=direct --flag it

Credits

This project is inspired by Haskakafka which unfortunately doesn't seem to be actively maintained.

hw-kafka-client's People

Contributors

alexbiehl avatar alexeyraga avatar chessai avatar crclark avatar diamondy4 avatar felixmulder avatar filterfish avatar fintanh avatar francisdb avatar jameshaydon avatar joashc avatar jonathangenlambda avatar jwoudenberg avatar ktonga avatar lazamar avatar lukasz-golebiewski avatar michaelglass avatar newhoggy avatar phile314 avatar pranaysashank avatar rahulsingh1895 avatar shlevy avatar thalerjonathan avatar thierry-b avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hw-kafka-client's Issues

Timeout of 0 is impossible

Readme documentation mentions a Timeout of 0 means "infinite" for librdkafka.
However currently this doesn't work because the parameter is not strictly greater than linger.ms. See Librdkafka issue and confluentinc/librdkafka#2015

Maybe document that until this issue is fixed, an infinite timeout is impossible, and advise to use the max value of 900 000 (i.e. 15 minutes)?

Missing APIs

Hello,

While working on a PR (I will open soon) to add documentation on many types and functions of hw-kafka-client, I have noticed several times some weird things while reading this library API on Hackage: some types / type classes are used in the public API but not exposed.

E.g. HasKafkaConf is not exposed by the library, yet it appears in various exposed functions, like topicOffsetsForTime :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> TopicName -> m (Either KafkaError [TopicPartition]).

In turn, this means for API consumers that it's pretty unclear what must be passed to functions like topicOffsetsForTime.

List of missing API I found:

  • HasKafkaConf
  • HasKafka
  • HasTopicConf
  • KafkaConf
  • Kafka
  • TopicConf

Side question (maybe unrelated, let me know if this deserves another issue): Why are some functions using a concrete type (e.g. rebalanceCallback uses the concrete type KafkaConf) while others use the type class (e.g. statsCallback uses the constraint HasKafkaConf)?

Refactor Producer

Currently:

When a keyed message is sent, the target partition is ignored even if it was specified.

let realPart = if keyLength == 0 then p else UnassignedPartition

Expected: specified partition should take precedence. Maybe the partition should be a part of ProducedMessage type.

Refactor `ProducerRecord`

ProducerRecord needs some love:

Instead of ADT ProducerRecord | KeyedProducerRecord it would be better to have one type:

ProducerRecord 
  { key   :: Maybe ByteString
  , value :: Maybe ByteString
  }

Implement `poll` for `KafkaProducer`

Apparently KafkaProducer also needs to poll:

Andreas Heider @ah- 12:48
you still need to call poll, messages only get removed from the queue 
once the delivery report has been delivered

also

@AlexeyRaga Depends, in C/C++ if you dont register any callbacks then you dont need to call poll(). I
but you typically do want a delivery report callback, otherwise you have no idea if your messages were succesfully sent or not

This will eventually allow implementing proper delivery callbacks if needed.

Ideas:

  • use forkIO to poll in a separate thread? Or even a heavy-weight OS thread?
  • call poll when a message is sent? Would it impact performance?
  • Anything else?

Add documentation about rdkafka with Nix and Stack

Hey,

Unless this is just me being a huge noob on Nix + Stack (which I am totally ok with 😅 ), I think it would be nice to add in the README information about the need to change some configuration when using Nix with Stack. I scratched my head for several hours before figuring this out :(

In stack.yaml one needs to add

nix:
  enable: true
  packages:
    - rdkafka

after installing rdkafka.

If you're ok with this, I don't mind opening a PR.

Cheers!

Fix MacOS: tinycthread.c error

Sometimes I see the following error when closing KafkaConsumer on MacOS Sierra (my laptop):

Assertion failed: (r == 0), function rwlock_rdlock, file tinycthread.c, line 1005

This error happens after I use watermarkOffsets or allTopicsMetadata or allConsumerGroupsInfo. The functions call itself is successful, and the results are as expected, and the error occurs later when KafkaConsumer is closed.

I reproduce it by running integration tests locally.

Interesting, but running the same tests in Travis CI (also for mac os) does not have this error.
Also it looks like the error doesn't happen on Linux.

Examples: build problems.

Thank you for building the library, really great stuff 🥇

I'm having some problems building the examples using the command given in the readme:

$ stack build --flag hw-kafka-client:examples

Error: While constructing the build plan, the following exceptions were encountered:

In the dependencies for hw-kafka-client-2.6.0(+examples):
    unix needed, but the stack configuration has no specified version  (latest matching version
         is 2.7.2.2)
needed since hw-kafka-client is a build target.

Some different approaches to resolving this:

  * Consider trying 'stack solver', which uses the cabal-install solver to attempt to find some
    working build configuration. This can be convenient when dealing with many complicated
    constraint errors, but results may be unpredictable.

  * Recommended action: try adding the following to your extra-deps
    in C:\Users\piotr_justyna\Documents\github\hw-kafka-client\stack.yaml:

unix-2.7.2.2@sha256:4ef1d010d70a4a07a717e853d4a440c105dad38c6199316e320fdd4c48dacd34

Plan construction failed.

When I do add unix as the extra dependency, the error does not go away. My new stack.yaml:

resolver: lts-12.10

flags: {}

packages:
- '.'

extra-deps:
- unix-2.7.2.2

New output:

$ stack build --flag hw-kafka-client:examples

Error: While constructing the build plan, the following exceptions were encountered:

In the dependencies for hw-kafka-client-2.6.0(+examples):
    unix is a library dependency, but the package provides no library
needed since hw-kafka-client is a build target.

Some different approaches to resolving this:

  * Consider trying 'stack solver', which uses the cabal-install solver to attempt to find some
    working build configuration. This can be convenient when dealing with many complicated
    constraint errors, but results may be unpredictable.


Plan construction failed.

I also tried to update the resolver to 13.9, but getting same results. I must be doing something stupid. Anybody else facing this?

Implement Seek

Investigate, how to do seek?

librdkafka provides this function:

RD_EXPORT
rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *rkt,
                                   int32_t partition,
                                   int64_t offset,
                                   int timeout_ms);

but AFAIK it is considered to be a legacy function for pre-subscription era.
Is it still useful? Perhaps implementing Haskell API on top of it would be useful anyway?

Now we have this function:

RD_EXPORT rd_kafka_resp_err_t
rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics);

Is it possible to specify offsets in rd_kafka_topic_partition_list_t? Would they be respected?
Or how to seek the consumer to the specific offsets?

Ideally, we would want Haskell functions to:

  1. Seek the consumer to the specified offsets
  2. Seek the consumer to the specified timestamp (perhaps using #16)

Better/Friendlier resources allocation/freeing

The intended way of using Consumer currently is through runConsumer function:

runConsumer :: ConsumerGroupId                       -- ^ Consumer group id (a @group.id@ property of a kafka consumer)
             -> BrokersString                        -- ^ Comma separated list of brokers with ports (e.g. @localhost:9092@)
             -> KafkaProps                           -- ^ Extra kafka consumer parameters (see kafka documentation)
             -> TopicProps                           -- ^ Topic config that is going to be used for every topic consumed by the consumer             -> [TopicName]                          -- ^ List of topics to be consumed
             -> [TopicName]                          -- ^ List of topics to be consumed
             -> (Kafka -> IO (Either KafkaError a))  -- ^ A callback function to poll and handle messages
             -> IO (Either KafkaError a)

It is convenient because it is simple, high-level enough and takes care of disposing of unmanaged resources. But at the same time it may be too limited: an IO returning callback cannot be lifted into another monad (transformer).

The library provides all the necessary function to "do it yourself" (write your own runConsumer or a similar thing). But it is a bit hairy and seems to be unnecessary boilerplate, especially when it comes to disposing of resources.

We can provide better API using things like ResourceT for creating consumers/producers so it would be easy to use and there won't be a need for a callback.

segfault with bracket

Hi,
I am struggling with a weird segmentation fault when I use bracket with a KafkaConsumer. Here's a code snippet that on my machine exits with Segmentation Fault: 11:

bracket (KC.newConsumer (KC.brokersList [brokerAddress]
                           <> KC.groupId subscribeGroupId
                           <> KC.noAutoCommit
                           <> KC.logLevel KC.KafkaLogInfo)
                        (KC.topics [subscribeTopicName]))
        (\case
          Left _ -> pure ()
          Right kafka -> processMessages kafka)
        (\case
          Left  _err -> pure ()
          Right consumer -> KC.closeConsumer consumer >> pure ())
 where
  processMessages kafka =
    mapM_ (\_ -> do
                    _msg1 <- KC.pollMessage kafka (KT.Timeout 1000)
                    _err <- KC.commitAllOffsets KC.OffsetCommit kafka
                    pure ()
          ) [0 :: Integer .. 10]

With some putStrLn "debugging", I could trace the origin of the exception to the call of KC.pollMessage. Interestingly, the segfault does not occur if the KafkaConsumer is created in an IO block without a bracket.
Does anybody have an idea what this could be?

Consumers silently fail to connect

When starting a consumer against an unreachable address, librd logs errors. But the consumer doesn't throw or indicate in the haskell application space that anything is wrong. We see these messages in stdout/stderr:

%3|1554196159.148|FAIL|rdkafka#consumer-2| [thrd:sasl_ssl://our-internal-address:9094/bootstr]: sasl_ssl://our-internal-address:9094/bootstrap: Connect to ipv4#********:9094 failed: Connection timed out
%3|1554196159.150|ERROR|rdkafka#consumer-2| [thrd:sasl_ssl://our-internal-address:9094/bootstr]: sasl_ssl://our-internal-addresst:9094/bootstrap: Connect to ipv4#********:9094 failed: Connection timed out
%3|1554196159.150|ERROR|rdkafka#consumer-2| [thrd:sasl_ssl://our-internal-address:9094/bootstr]: 1/1 brokers are down

We tried setting the error and log callbacks by:

let
  props = readFromConfig
       <> setCallback (errorCallback (\_ _ -> putStrLn "from errorCallback!"))
       <> setCallback (logCallback (\_ _ _ -> putStrLn "from logCallback!"))
in
  newConsumer props subscription

But this doesn't seem to enable us to actually get these logs out. We pass the created consumer to hw-kafka-conduit's kafkaSourceNoClose but I'm not sure that has anything to do with the logs not being intercepted by the callbacks.

This is especially scary as the application doesn't crash or anything when we fail to connect.

Separately I wanted to ask, does the client automatically reconnect? Sometimes we loose the connection and we're not sure that it is picked back up again. We'd have to write some form of heartbeat mechanism to be certain I guess...

librd version: 0.11.6

Admin API

Hi,

Apparently the Librdkafka library supports Admin client (e.g. manage topics), would it be possible to support this in HW Kafka Client?

I am aware this must not be a trivial development though 😞

peekCAText is less efficient than it could be

currently it's just defined as

peekCAText cp = Text.pack <$> peekCAString cp

so, it not only calls the more inneficient peekCAString from base, but then it must convert the String into text! This seems unnecessarily expensive. There is an alternative in something similar to Data.Text.Foreign.peekCStringLen, which goes from CStringLen -> IO Text. There is a drawback to this, in that it only supports CStrings that are valid UTF-8, and throws an exception otherwise. Another drawback is that there's only a CStringLen variant, but that's not hard to get around:

peekCString :: CString -> IO Text
peekCStringLen cs = do
  bs <- Data.ByteString.Unsafe.unsafePackCString cs
  return $! decodeUtf8 bs

This is almost exactly like peekCStringLen, but calls a different function from Data.ByteString.Unsafe, since there's no length information present.

This seems like a good idea, if you're willing to sacrifice support for non-UTF-8. you could always use something like bytestring-encodings.Data.ByteString.Encoding.isUtf8 (https://hackage.haskell.org/package/bytestring-encodings-0.2.0.2/docs/Data-ByteString-Encodings.html#v:isUtf8) to verify that the ByteString is UTF-8 encoded before proceeding, but then you'd probably have to return a 'Maybe Text', which doesn't seem worth it.

Implement getting current assignment

It should be possible to query current assignment for Kafka Consumer.
librdkafka provides two functions (rdkafka.h):

RD_EXPORT rd_kafka_resp_err_t
rd_kafka_subscription (rd_kafka_t *rk,  rd_kafka_topic_partition_list_t **topics);

and

RD_EXPORT rd_kafka_resp_err_t
rd_kafka_assignment (rd_kafka_t *rk, rd_kafka_topic_partition_list_t **partitions);

There are "low level" bindings for these functions in RdKafka.chs, but there are no convenient higher-level API on top of them.

There should be something like:

consumerAssignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError [TopicPartition])
consumerSubscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [TopicPartition])

Think of better names, perhaps get rid of consumer* prefix for consistency with other API?

RdKafkaRespErrTimedOut on consume

Hello,

I am trying to use hw-kafka-client to read some messages from remote Kafka and I constantly get RdKafkaRespErrTimedOut.

The code I am playing with:

main :: IO ()
main = consumeSome >>= print

type Record = ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString)

consumerProps :: ConsumerProperties
consumerProps
  = brokersList [BrokerAddress "xxx-kfk0a.xxx.net:1234"]
  <> groupId (ConsumerGroupId "some-random-group")
  <> logLevel KafkaLogDebug
  -- <> debugOptions [DebugGeneric, DebugBroker]

consumerSub :: Subscription
consumerSub
  = topics [TopicName "some-topic"]

consumeSome :: IO (Either KafkaError Record)
consumeSome = do
  bracket mkConsumer clConsumer runHandler
  where
    mkConsumer = newConsumer consumerProps consumerSub
    clConsumer (Left err) = return (Left err)
    clConsumer (Right kc) = (maybe (Right ()) Left) <$> closeConsumer kc
    runHandler (Left err) = return (Left err)
    runHandler (Right kc) = processMessages kc

processMessages :: KafkaConsumer -> IO (Either KafkaError Record)
processMessages kafka = pollMessage kafka (Timeout 5000)

Output:

Left (KafkaResponseError RdKafkaRespErrTimedOut)

outout.log with DebugAll option

I've actually tried to replace broker address and topic name in consumer example (where it consumes 10 messages), but the result is same.

What's interesting, I am being able to get the messages using kafkacat:

$ kafkacat -b broker-address.net:1234 -G some-random-group some-topic

Do you have any ideas? Any help would be appreciated.

How can we figure out which partition an EOF error is for?

The C function for consuming always returns an rd_kafka_message_t, which contains an rd_kafka_resp_err_t in the case of failure. Unfortunately this error value is not self-contained, making sense of it may require inspecting other fields of the rd_kafka_message_t. For example, if you get a partition EOF, you'll have to inspect the partition field to determine which partition the EOF applies to.

In contrast, the Haskell function for consuming is

pollMessage :: MonadIO m
            => KafkaConsumer
            -> Timeout -- ^ the timeout, in milliseconds
            -> m (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString))) -- ^ Left on error or timeout, right for success

This is a much more natural type since the error case is disjoint, but the KafkaError type is missing some crucial context. E.g. there appears to be no way for me to determine which partition hit the EOF. This is important if you want to consume the entire topic in one go and then stop.

Improve `librdkafka` logging capabilities

Currently the hw-kafka-client API allows one to configure the log level of librdkafka, but nothing else (as far as I know).

This is problematic because it forces logs to stdout and in a specific format.

How about at the very least provide an API to customize the log before it goes to stdout? E.g.

librdkafkaLogFormatter :: (Text -> Text) -> ProducerProperties

Basically library users could pass a formatting function that would be applied to each log before being sent to stdout.

Use case: we format all our application logs in JSON so that they are well parsed downstream. The librdkafka break this parsing.

Cheers!

Producer: Add support for timestamps

Currently we are using these functions for producing messages:

RD_EXPORT
int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
		      int msgflags,
		      void *payload, size_t len,
		      const void *key, size_t keylen,
		      void *msg_opaque);
RD_EXPORT
int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition,
                            int msgflags,
                            rd_kafka_message_t *rkmessages, int message_cnt);

They work fine, but they don't support providing timestamp.

librdkafka has another version that have support for timestamp that we can use instead of rd_kafka_produce:

RD_EXPORT
rd_kafka_resp_err_t rd_kafka_producev (rd_kafka_t *rk, ...);

But what about batches? I don't see rd_kafka_producev_batch, is there another way? Or it doesn't make sense for batches? Investigate.

`Foreign.Concurrent` for finalizers and profiling issue

I have a program (which is not minimal example yet), which prints message when terminates:

program-name: error: a C finalizer called back into Haskell.
   This was previously allowed, but is disallowed in GHC 6.10.2 and later.
   To create finalizers that may call back into Haskell, use
   Foreign.Concurrent.newForeignPtr instead of Foreign.newForeignPtr.

also, when program is compiled with -prof and runned with +RTS -p the file program-name.prof is empty, so I can not get profiling info.

If I comment out the code creating kafka consumers and terminate the program then runtime dumps profile info and no message.

I guess the message and empty program-name.prof file are both have same cause laying somewhere in hw-kafka-client code.

Warning Message :: How to use "newConsumer/closeConsumer " functions

When compiling a project using hw-kafka-client, i keep seeing:

Deprecated: "Use newConsumer/closeConsumer instead"
_ <- runConsumer (myConsumerProps consumerGroupId) myConsumerSubscription (processMessages startState)

Granted that compilation succeeds, but the warning prevails, can you give an exampe of how "newConsumer" and "closeConsumer" can be used in tandem, to avoid this message.

Thanks.

tests are flaky, at least on MacOS

this is me trying to build the following derivation with ghc 8.6.5:

{ mkDerivation, base, bifunctors, bytestring, c2hs, containers
, either, hspec, monad-loops, rdkafka, stdenv, text, transformers, unix
}:
mkDerivation {
  pname = "hw-kafka-client";
  version = "2.6.0";
  sha256 = "1318gyl3jn3q2namzpzf0254hqpib2nn1kipf6gnfp4dvwv0wbgn";
  isLibrary = true;
  isExecutable = true;
  libraryHaskellDepends = [
    base bifunctors bytestring containers text transformers unix
  ];
  librarySystemDepends = [ rdkafka ];
  libraryToolDepends = [ c2hs ];
  testHaskellDepends = [
    base bifunctors bytestring containers either hspec monad-loops text
    transformers
  ];
  description = "Kafka bindings for Haskell";
  license = stdenv.lib.licenses.mit;
}

test results:

Running 2 test suites...
Test suite integration-tests: RUNNING...

Kafka.Integration
  Per-message commit
    Run producer
      1. sends 2 messages to test topic
    Consumer with per-message commit
      2. should receive 2 messages
    Run producer again
      3. sends 2 messages to test topic
    Consumer after per-message commit
      4. should receive 2 messages again
  Store offsets
    Run producer
      1. sends 2 messages to test topic
    Consumer with no auto store
      2. should receive 2 messages without storing
    Run producer again
      3. sends 2 messages to test topic
    Consumer after commit without store
      4. should receive 4 messages and store them
    Run producer again
      5. sends 2 messages to test topic
    Consumer after commit with store
      6. should receive 2 messages
    Part 3 - Consume after committing stored offsets
      5. sends 2 messages to test topic
      6. should receive 2 messages
  Kafka.IntegrationSpec
    Run producer
      sends messages to test topic
    Run consumer
      should get committed
      should get position
      should receive messages
      should get watermark offsets
      should return subscription
      should return assignment
      should return all topics metadata FAILED [1]
      should return topic metadata
      should describe all consumer groups FAILED [2]
      should describe a given consumer group
      should describe non-existent consumer group
      should read topic offsets for time
      should seek and return no error
      should seek to the beginning
      should seek to the end
      should respect out-of-bound offsets (invalid offset)
      should respect out-of-bound offsets (huge offset)
  Kafka.Consumer.BatchSpec
    Batch consumer
      should consume first batch
      should consume second batch with not enough messages
      should consume empty batch when there are no messages

Failures:

  tests-it/Kafka/IntegrationSpec.hs:157:17:
  1) Kafka.Integration.Kafka.IntegrationSpec, Run consumer, should return all topics metadata
       expected: Right 1
        but got: Right 2

  To rerun use: --match "/Kafka.Integration/Kafka.IntegrationSpec/Run consumer/should return all topics metadata/"

  tests-it/Kafka/IntegrationSpec.hs:167:17:
  2) Kafka.Integration.Kafka.IntegrationSpec, Run consumer, should describe all consumer groups
       expected: Right [ConsumerGroupId {unConsumerGroupId = "it_spec_03"}]
        but got: Right [ConsumerGroupId {unConsumerGroupId = "batch-consumer"},ConsumerGroupId {unConsumerGroupId = "it_spec_03"}]

  To rerun use: --match "/Kafka.Integration/Kafka.IntegrationSpec/Run consumer/should describe all consumer groups/"

Randomized with seed 1585632419

Finished in 63.9449 seconds
33 examples, 2 failures

Consumer: Add support for Timestamp

Add timestamp to ConsumerRecord.
This is going to be a breaking change for cases where pattern matching is used against ConsumerRecord, but I consider it minor because typically fields would be used to access data inside ConsumerRecord.

How to implement a healthcheck for newProducer?

Hello

I am trying to implement some kind of checks when using newProducer which is supposed to return a KafkaError whenever a failure happens.
When I try providing a wrong broker address, I have the following error messages in console (with KafkaLogDebug log level):

%3|1568025943.517|FAIL|rdkafka#producer-1| [thrd:WHATEVER:9092/bootstrap]: WHATEVER:9092/bootstrap: Failed to resolve 'WHATEVER:9092': nodename nor servname provided, or not known (after 164ms in state CONNECT)
%3|1568025943.517|ERROR|rdkafka#producer-1| [thrd:WHATEVER:9092/bootstrap]: WHATEVER:9092/bootstrap: Failed to resolve 'WHATEVER:9092': nodename nor servname provided, or not known (after 164ms in state CONNECT)
%3|1568025943.517|ERROR|rdkafka#producer-1| [thrd:WHATEVER:9092/bootstrap]: 1/1 brokers are down

However, newProducer returns a (Right Producer) whereas I would have expected to get a KafkaError here somehow.

Did I miss something here or am I wrong to expect such behavior?

Thank you for your help

Rémi

ConsumerExample throws error

I was trying a modified version of the consumer example here with some added configuration properties.

extraConfigs = KafkaProps [ ("enable.auto.commit", "true"), ("client.id", "kafka-python1"), ("session.timeout.ms", "30000")]
topicConfigs = TopicProps [("auto.offset.reset", "latest")]

Full source code here: http://lpaste.net/350626

It compiles fine; however at runtime, it gives following err:

*Main> runConsumerExample 
"Left (KafkaResponseError RdKafkaRespErrTimedOut)"
"Left (KafkaResponseError RdKafkaRespErrTimedOut)"
"Left (KafkaResponseError RdKafkaRespErrTimedOut)"
"Left (KafkaResponseError RdKafkaRespErrTimedOut)"
"Left (KafkaResponseError RdKafkaRespErrPartitionEof)"
"Left (KafkaResponseError RdKafkaRespErrPartitionEof)"
"Left (KafkaResponseError RdKafkaRespErrPartitionEof)"
"Left (KafkaResponseError RdKafkaRespErrPartitionEof)"
"Left (KafkaResponseError RdKafkaRespErrNotImplemented)"
"Left (KafkaResponseError RdKafkaRespErrPartitionEof)"
"Right ()"
  1. What does this error mean?
  2. How do I rectify it given the same properties and kafka broker work fine for an equivalent python script: (source here: http://lpaste.net/350627)

Edit:
Using Kafka version 0.10.0.1 on Ubuntu 16.04

Publish to Stackage

Hello,

Have you considered publishing to Stackage? This would ease integration with Stack build tool. I could not find any issue related to this topic.

Also, I see there are already many Haskell Works (hw- prefix) libraries in Stackage, so this seems to make sense to add this one to provide a better ecosystem.

Thank you, cheers!

Implement getting offsets by timestamp

librdkafka provides this API:

RD_EXPORT rd_kafka_resp_err_t
rd_kafka_offsets_for_times (rd_kafka_t *rk,
                            rd_kafka_topic_partition_list_t *offsets,
                            int timeout_ms);

We need something similar to:

offsetsForTimestamp :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Either KafkaError [TopicPartition])

Think of better API if possible

Contributing guide

Hi,

Is there any contributing guidelines please? How to build, how to test, etc. I could not find any (while starting working on #135) and I don't manage to "guess" it.

E.g. when I run docker-compose up and in another tab nix-build the build gets stuck at Test suite integration-tests: RUNNING....
Same problem when running cabal test.

Otherwise, would you mind adding some? If you explain it here I can submit a PR to improve the README accordingly!

Cheers

Add support for message headers

  • Create bindings for the relevant librdkafka's functions (rd_kafka_headers_new, rd_kafka_header_add, rd_kafka_header_get_all)
  • Implement haskell datatype and converters
  • Implement bindings for rd_kafka_message_set_headers. How do we express headers in the API without introducing a breaking change?
  • Implement bindings for rd_kafka_message_headers. Read comments for this function. How do we parse headers? When receiving the message and converting it to ConsumerRecord? How do we provide functionality for not using headers?

Ideally, it should be up to the client whether to use headers or not... Or not? I don't see anything wrong with always providing/parsing them except that it may:

  • Introduce a breaking change to the API
  • Make hw-kafka-client require Kafka >= 0.11 (lower versions won't work)

Questions around rebalance callback

We found that in newConsumer we set a rebalance callback:

let cp = setCallback (rebalanceCallback (\_ _ -> return ())) <> props

Internally this is happening through librdkafkas rd_kafka_conf_set_rebalance_cb.

Now, we found that librdkafka documentation for rd_kafka_conf_set_rebalance_cb mentions

 * Registering a \p rebalance_cb turns off librdkafka's automatic
 * partition assignment/revocation and instead delegates that responsibility
 * to the application's \p rebalance_cb.

(see https://github.com/edenhill/librdkafka/blob/4fb7c9924c14f44c4f1490b80b8bd714098dd90b/src/rdkafka.h#L1593-L1666)

It seems by setting the rebalance callback internally we deactivate librdkafkas assignment/revocation strategy and the callback we set return () doesn't handle the cases at all!

What are we missing here? Is this an oversight or is there another corner we need to look at?

there is no need for both peekCText and peekCAText to exist

when i made my PR to change things from String to Text, i made an effort not to remove or add any functions, so as to minimise the chance of introducing a bug. however i noticed you had two variants of this peek function in use, but they both have the same type and implementation. One should be removed - i would prefer to remove 'peekCAText' and keep 'peekCText'.

Human readable errors

librdkafka has the ability to describe errors:

/**
 * @brief Error code value, name and description.
 *        Typically for use with language bindings to automatically expose
 *        the full set of librdkafka error codes.
 */
struct rd_kafka_err_desc {
	rd_kafka_resp_err_t code;/**< Error code */
	const char *name;      /**< Error name, same as code enum sans prefix */
	const char *desc;      /**< Human readable error description. */
};


/**
 * @brief Returns the full list of error codes.
 */
RD_EXPORT
void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
			     size_t *cntp);

hw-kafka-client should provide similar functionality:

describeError :: KafkaError -> IO KafkaErrorDescription

Note: check if these functions are pure and if IO can be dropped

Implement getting watermark offsets

librdkafka provides a way to query Kafka brokers for topic's offsets:

RD_EXPORT rd_kafka_resp_err_t
rd_kafka_query_watermark_offsets (rd_kafka_t *rk,
		      const char *topic, int32_t partition,
		      int64_t *low, int64_t *high, int timeout_ms);

Wrapping this API in Haskell would be useful.
Ideally, we'd also want something like:

queryTopicOffsets :: MonadIO m => KafkaConsumer -> TopicName -> m (Either KafkaError [TopicPartitionWatermark])

which would not require specifying each partition individually.

Another variation that looks useful:

queryWatermarkOffsets :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either  KafkaError [TopicPartitionWatermark])

There is another function that provides watermark offsets for partitions that have already been seen by the consumer:

RD_EXPORT rd_kafka_resp_err_t
rd_kafka_get_watermark_offsets (rd_kafka_t *rk,
				const char *topic, int32_t partition,
				int64_t *low, int64_t *high);

Wrapping this one looks less useful to me, but maybe I don't see the whole picture. We may or may not need/want this one.

delivery callbacks for batch production

when producing a message batch, there's no way to have a delivery callback that operates on the entire batch. currently callbacks only work on a single DeliveryReport, which only assumes single-message. Perhaps I'd like to know about the individual messages, but perhaps I'd also like the callback to tell me what the size of the batch was, or maybe even more sophisticated statistics? i can do this before i send it, but most of the time i don't care what these statistics are, and i would rather only see them upon some kind of delivery failure.

Reballance and commit offsets callback

The code is implemented, but using it results in a runtime failure that suggests some threading issues.
Haskell runtime suggests using safe import (which is already done for some methods).

I suspect that when a callback to Haskell executes a C function again it may do it from a different thread and C doesn't really like it.

Error in callback documentation

The readme mentions producer callbacks should be written as:

producerProps :: ProducerProperties
producerProps = brokersList [BrokerAddress "localhost:9092"]
             <> setCallback (deliveryErrorsCallback print)

Unfortunately deliveryErrorsCallback isn't exposed by this library. I assume this was changed to deliveryCallback when adding callbacks for success?

Anyway, let's fix the documentation 😄

OffsetsCommit callback is not fired

KafkaConsumer now can be configured to provide a callback that is supposed to be fired each time offsets are committed. But it doesn't happen for some reason.

The code of this callback registration is identical to the rebalance callback, and the rebalance callback is triggered as expected. But the commit callback doesn't seem to be firing at all.

The example project is configured to provide the commit offsets callback, and it should be printing logs when it happens which may help to verify/debug this issue.

Implement "flush"

There is this api in librdkafka:

/**
 * @brief Wait until all outstanding produce requests, et.al, are completed.
 *        This should typically be done prior to destroying a producer instance
 *        to make sure all queued and in-flight produce requests are completed
 *        before terminating.
 *
 * @remark This function will call rd_kafka_poll() and thus trigger callbacks.
 *
 * @returns RD_KAFKA_RESP_ERR__TIMED_OUT if \p timeout_ms was reached before all
 *          outstanding requests were completed, else RD_KAFKA_RESP_ERR_NO_ERROR
 */
RD_EXPORT
rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms);

It could be useful in flushProducer, closeProducer and perhaps closeConsumer

Add a combinator for compression

Compression currently can be turned on using an extra properties combinator:

extraConsumerProps (singleton "compression.codec" "lz4")

There should be a ConsumerProperties (and perhaps a ProducerProperties too) combinator to set compression in a more convenient way.

Bootstrapping from non-resolvable broker logs errors but allows to continue

Looks like the current implementation of newConsumer creates a consumer anyway even if the bootstrap broker cannot be resolved or doesn't exist.

It logs an error, but still returns a non-bootstrapped client.

One thing to consider is freeing resources: maybe use ResourceT or something like that, but underlying unmanaged objects (from rdkafka) must be disposed correctly or/and must be disposable correctly should we chose not returning an "invalid" client.

mkKafka = do
  kc  <- newConsumerConf (ConsumerGroupId "test_group") emptyKafkaProps
  tc  <- newConsumerTopicConf emptyTopicProps
  setDefaultTopicConf kc tc
  newConsumer (BrokersString "localhost:9092") kc

Non-resolvable:

Kafka {kafkaPtr = 0x00007fbb4b003c00, _kafkaConf = KafkaConf 0x00007fbb49403270}
%3|1483784568.811|FAIL|rdkafka#consumer-1| asadsdasd:9092/bootstrap: Failed to resolve 'asadsdasd:9092': nodename nor servname provided, or not known
%3|1483784568.811|ERROR|rdkafka#consumer-1| asadsdasd:9092/bootstrap: Failed to resolve 'asadsdasd:9092': nodename nor servname provided, or not known
%3|1483784568.811|ERROR|rdkafka#consumer-1| 1/1 brokers are down

Resolvable, but no broker:

Kafka {kafkaPtr = 0x00007fb4bc800c00, _kafkaConf = KafkaConf 0x00007fb4bae005a0}
%3|1483784315.244|FAIL|rdkafka#consumer-1| localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1483784315.244|ERROR|rdkafka#consumer-1| localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1483784315.244|ERROR|rdkafka#consumer-1| 1/1 brokers are down

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.