Giter Site home page Giter Site logo

kafka-net's People

Contributors

jroland avatar kfrederix avatar kichristensen avatar micahzoltu avatar pzang avatar sixeyed avatar warrenfalk avatar wsimmonds avatar xtofs avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-net's Issues

Not able to associate consumer with a group

I was evaluating kafka-client for one of our new project. While testing I found that i cannot associate consumer with a group.
If this is not possible then how could i use kafka as queue.

Apart from that isn't there high level API available?. I could not find such. I wanted to use zookeeper instead of using kafka broker (for instance with consmer).

Thanks In Advance
programmer-pragmatic

Consumer GetOffset from kafka after restart

if i do not save the last offset in the client side, ie if I'm not passing OffsetPosition[] to the Consumer(), Is there any way I can use the kafka-net API to automatically read the saved offset from kafka broker and continue consume where I left off ? This is helpful to avoid duplicates when my client (consumer ) restart.

Non-blocking consumer

Hi Jroland,

The consumer seems only has a blocking consume function, do you think it's a good idea to add a non-blocking function allow us to specify an arbitrary number of messages to consume ?

Thanks.

test failures -

Among other failures I get this:

    public void ConsumerShouldMoveToNextAvailableOffsetWhenQueryingForNextMessage()
Found address 10.103.13.160 for 10.103.13.160
Using address 10.103.13.160 for 10.103.13.160
BrokerRouter: Refreshing metadata for topics: IntegrationTopic
Awaiting message from: http://10.103.13.160:49161/
No connection to:http://10.103.13.160:49161/.  Attempting to re-connect...
Connection established to:http://10.103.13.160:49161/.
Received message of size: 131 From: http://10.103.13.160:49161/
Awaiting message from: http://10.103.13.160:49161/
Found address 10.103.13.160 for 10.103.13.160
Using address 10.103.13.160 for 10.103.13.160
Found address 10.103.13.160 for 10.103.13.160
Using address 10.103.13.160 for 10.103.13.160
Found address 10.103.13.160 for 10.103.13.160
Using address 10.103.13.160 for 10.103.13.160
Awaiting message from: http://10.103.13.160:49162/
No connection to:http://10.103.13.160:49162/.  Attempting to re-connect...
Awaiting message from: http://10.103.13.160:49163/
No connection to:http://10.103.13.160:49163/.  Attempting to re-connect...
Connection established to:http://10.103.13.160:49162/.
Connection established to:http://10.103.13.160:49163/.
Received message of size: 48 From: http://10.103.13.160:49163/
Awaiting message from: http://10.103.13.160:49163/
Closed down connection to: http://10.103.13.160:49161/
Closed down connection to: http://10.103.13.160:49162/
Closed down connection to: http://10.103.13.160:49163/
  This test requires there to be exactly two paritions.

  Expected: 2

  But was:  1


   at NUnit.Framework.Assert.That(Object actual, IResolveConstraint expression, String message, Object[] args)
   at kafka_tests.Integration.ProducerConsumerTests.ConsumerShouldMoveToNextAvailableOffsetWhenQueryingForNextMessage() in ProducerConsumerIntegrationTests.cs: line 188  This test requires there to be exactly two paritions.

  Expected: 2

  But was:  1


   at NUnit.Framework.Assert.That(Object actual, IResolveConstraint expression, String message, Object[] args)
   at kafka_tests.Integration.ProducerConsumerTests.ConsumerShouldMoveToNextAvailableOffsetWhenQueryingForNextMessage() in ProducerConsumerIntegrationTests.cs: line 188  This test requires there to be exactly two paritions.

  Expected: 2

  But was:  1


   at NUnit.Framework.Assert.That(Object actual, IResolveConstraint expression, String message, Object[] args)
   at kafka_tests.Integration.ProducerConsumerTests.ConsumerShouldMoveToNextAvailableOffsetWhenQueryingForNextMessage() in ProducerConsumerIntegrationTests.cs: line 188

Fresh cluster, kafka-docker (https://github.com/wurstmeister/kafka-docker)
My own tests pass, but that doesn't mean all is good with the cluster. I'm particularily confused over the test requiring 2 partitions, but presumably there should be a setup for that.

ConcurrentCircularBuffer out of bounds exception

There are race conditions in the Enqueue method of the ConcurrentCircularBuffer.

The buffer limit is (implemented as a check and "wrap") is two separate Interlocked operations (rather than a single interlocked operation). This could (and does) result in the value of the _head field exceeding _maxSize - 1

        if (Interlocked.Increment(ref _head) > (_maxSize - 1))
            Interlocked.Exchange(ref _head, 0);

This can result in a out of bounds exception when assigning the parameter object to the values array.

        _values[_head] = obj;

I've attached a pull request with a re-implementation of the ConcurrentCircularBuffer without these issues (#71).

Neither producer or consumer fail over to new partition leader when original goes away

If you are reading or writing from/to a partition leader when that node goes down, the tcp code appears to continuously try to reconnect to that node without attempting a metadata refresh to confirm whether that node is still the partition leader. This means that an application cannot take advantage of a successful election of a new partition leader without restarting (at which point the metadata is freshly retrieved).

Massive performance degredation from 0.8.0.109 -> 0.9.0.49

I'm using kafka-net to send around 160,000 messages per second to Kafka, and after an upgrade from 0.8.0.109 to 0.9.0.49 I'm seeing a massive increase in CPU usage, to the point that kafka-net is unusable.

Before and After:
Before and After

The only change in the two runs above was in the version of kafka-net.

To get to the level of throughput needed, messages are being batched into 4000 messages chunks before being written to SendMessageAsync(), which worked well under 0.8.

This was running on a VM with 8 cores @ 2.5 Ghz and 40GB RAM.

I have tried using the batching in 0.9 (_client = new Producer(router) { BatchSize = 4000 };) and writing messages one at a time, which improves things initially, but it fails to keep up with the data volume, and memory usage increases as it falls further behind:
Before and After

I'm not sure where to go from here.

Any advice on how kafka-net should be used to reach throughput levels in the 100,000s of messages per second?

I'm currently looking at implementing a synchronous producer that forgoes the async stuff, as I believe that's where the performance issues are being introduced.

Thanks.

High Level API

Hi!

  1. I looked through and found that there are too much action to wireup publisher and consumer. What do you think about creating simple utility class to wire up all components with fluent configuration?
    Here is an example from EasyNetQ -- https://github.com/mikehadlow/EasyNetQ/blob/master/Source/EasyNetQ/ComponentRegistration.cs and nearest classes :)

  2. Imho, there is no simple API just for simple publish/subscribe. As it is done in EasyNetQ with simple IBus interface, which hides all magic inside.

Are you interested in pull requests with these features?

PS Sorry for bad English :)

Max buffer not blocking in producer

When the max buffer is reached, the async methods are pulling the data out of the buffer and not actually blocking the up stream producer. The driver should block even on messages that are in flight on the wire.

Check whether kafka broker is up or not

We are trying to send some messages to kafka broker , by using some host like "22.102.122.112" , it is producing message as required.
But if the host ip is wrongly set , eg "22.102.122.113". It keep waiting, and doesn't return control to the program.
Is there any way, to find out whether broker is up, or throw some exception after waiting for some time, e.g Timeoutexception.

Any help shall be appreciated.

Implementation on .Net Framework 4.0

Dear James and All contributors,

I need to build this project upon .Net Framework 4.0. The only one issue left is the new method SemaphoreSlim.WaitAsync(CancellationToken canceller), which is introduced in .NET 4.5.

Could you please provide some advises? Or do you have plan to make this be compatible with .NET 4.0?
Thank you and Best regards,

Consumer group Issue

Hi James,

We are using Kafka net in our project. Everything looks good except to achieve consumer group in order to make kafka as Queue model.

Right now there is no option to achieve consumer group.
As we know you are already working on that. Is there any possible to achieve consumer group.

May i know when it will be released? Next build will it have AutoCommit feature?

One more clarification regarding producer response. Currently we are getting response after sending message through await keyword

var x= client.SendMessageAsync();
var response = await x;

Is there any alternate solution available to get response?

Thanks,
Sugumar J

Trying to consume from one topic and produce to another hangs

It appears to be impossible to use a BrokerRouter for producing if it is already being used for consuming on another topic. The following code illustrates the problem I'm seeing.

Is this invalid use of a BrokerRouter?

using KafkaNet;
using KafkaNet.Model;
using KafkaNet.Protocol;
using System;

namespace kafka_net_test
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new KafkaOptions(new Uri[] { new Uri("kafka://localcluster:9092"), });
            var brokerRouter = new BrokerRouter(options);

            long offset = 0;
            var producer = new Producer(brokerRouter, 1);
            producer.BatchSize = 1;
            producer.BatchDelayTime = TimeSpan.FromDays(1);

            {
                // Prime topic with a message if necessary
                //var task = producer.SendMessageAsync("Test", new Message[] { new Message("Test Message") });
                //task.Wait();
                //offset = task.Result[0].Offset;
            }

            // Start a consumer to get that message
            var consumerOptions = new ConsumerOptions("Test", brokerRouter);
            consumerOptions.MinimumBytes = 1;
            consumerOptions.BackoffInterval = TimeSpan.Zero;
            consumerOptions.MaxWaitTimeForMinimumBytes = TimeSpan.MaxValue;
            var consumer = new Consumer(consumerOptions);
            consumer.SetOffsetPosition(new OffsetPosition { PartitionId = 0, Offset = offset });
            var messageStream = consumer.Consume();

            foreach (var message in messageStream)
            {
                // Send message to another topic
                var task = producer.SendMessageAsync("Test2", new Message[] { new Message { Value = message.Value } });
                // ------------------------------------
                // If batch size is 1:
                // Blocks above and never gets here (until eventual timeout)
                // Message never shows up in topic
                // ------------------------------------
                task.Wait();
                // ------------------------------------
                // If batch size is greater than 1:
                // Blocks above and never gets here (until eventual timeout)
                // Message never shows up in topic
                // ------------------------------------
            }
        }
    }
}

Consumer returns first batch of messages then jumps to high water mark

Hi James,

Thanks for the work you are going with Kafka-net. I have been trying it out and have found what I believe to be an issue in Consumer.ConsumeTopicPartitionAsync.

The following line of code causes the Consumer to return the first batch of messages (from the specified offset) and then set the offset to the high water mark. As a result, it does not return the remaining messages on the partition and only returns more messages when new ones are produced on the partition.

_partitionOffsetIndex.AddOrUpdate(partitionId, i => response.HighWaterMark, (i, l) => response.HighWaterMark);

I propose that the line above be replaced with:

nextOffset = offset + response.Messages.Count;
_partitionOffsetIndex.AddOrUpdate(partitionId, i => nextOffset, (i, l) => nextOffset);

Kind regards
Wayne

Producer concurrency issues - TakeAsync method in AsyncCollection class

We observed that TakeAsync method (AsyncCollection class) is not taking into the account the timeout parameter when it is constantly getting data from the TryTake method.
In our environment we could see that the producer was just building up the batch collection and not returning it, even when the timeout task completed. It returned it only when it reached the batch size (default 100 messages).
For some reason in our case it was taking quite a long time before it reached the batch size (over 60 seconds).

It looks like after introducing additional condition to take into the account the completeness of the timeoutTask - it would work as expected.

Please see line:
https://github.com/Jroland/kafka-net/blob/master/src/kafka-net/Common/AsyncCollection.cs#L72

The proposed change could be:

// Current version, line 72
//[...]
if (--count <= 0) return batch;
//[...]

// Proposed change
//[...]
if (--count <= 0 || timeoutTask.IsCompleted) return batch;
//[...]

Looking forward to your opinion about that.

Kafka Synchronous vs Asynchronous Producer

Hi James,

We are using Kafka-Net for our messaging system, we tested Producer to produce message to Kafka with Synchronous and Asynchronous mode. But In Asynchronous mode it performs well than Synchronous.

Synchronous mode takes 2 minutes to produce 1000 message where as Asynchronous mode takes 2 seconds to produce 1000 messages.

Is there any limitation using Asynchronous mode?

(Like message order problem, Message lose)

Can we use Asynchronous mode in Production environment ?

Please suggest us...

Thanks,
Sugumar Jeyachandran

Signed dll.

It would benefit certain users of this library it the dll was signed. This can be done easily with a strong name key file.

re-using client.producer from example code?

Hi, it's great to find this project. I have been running it successfully but I am stuck, due to my poor C# knowledge and even poorer C# async understanding.

Can you shed some light on how to maintain a single connection, but use sendmessageasync in a loop? I've tried various combinations but get blocks or fails. One error I get is below - any tips?

Unhandled Exception: System.AggregateException: One or more errors occurred. ---> System.ObjectDisposedException: The CancellationTokenSource has been disposed.
   at System.Threading.CancellationTokenSource.ThrowObjectDisposedException()
   at KafkaNet.KafkaTcpSocket.WriteAsync(Byte[] buffer)
   at KafkaNet.KafkaConnection.<SendAsync>d__0`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at KafkaNet.Producer.<SendMessageAsync>d__d.MoveNext()
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at kafkatest1.Program.SendMessage(Producer client, String hostname)
   at kafkatest1.Program.Main(String[] args)

GetTopicOffsetAsync returns an array for each replica not for each partition

If I create an Integration topic with two partitions and three replicas then the test ConsumerShouldMoveToNextAvailableOffsetWhenQueryingForNextMessage fails with the error "This test requires there to be exactly two paritions"

The offsets array returned from GetTopicOffsetAsync contains three elements each containing an Offsets property with a count of 2. It looks like Partitions and Replicas have gotten transposed somewhere.

[Enhancement] For Kafka .8.2+ Provide Way to Cache and work with Offset Coordinators Similar to BrokerRouter

Hi. I'm using this library and it has been very helpful. Thank you!

I'm using it and testing against both Kafka .8.1.1 and .8.2-beta with 3 Kafka nodes clustered. While I'm able to get offset management working in both cases, I have to do so differently depending on the Kafka version I'm targeting. Specifically, the new Offset Coordinator concept (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) doesn't allow us to always issue different request types the same way.

In .8.1.1, for any request I can do:

BrokerRouter.SelectBrokerRoute(....).SendAsync(...request)

But, in .8.2-beta I can do the same for everything except OffsetFetch and OffsetCommit, where I have to do:

OffsetCoordinatorConnection.Send(...request)

where I had to create and manage the OffsetCoordinatorConnection (IKafkaConnection), basically repeating what the BrokerRouter already does.

I couldn't use the BrokerRouter to discover the connection for those request types because the leader for a group/topic/partition and the offset coordinator for that same group/topic/partition are not necessarily the same broker, but BrokerRouter.SelectBrokerRoute only ever selects the leader.

As such, I will have to implement my own caching (and refreshing) for these offset coordinators that basically is the same caching you do in Broker Router.

I wanted to share my experience because I suspect you are already aware of these differences, but I wasn't sure what plans you might have to address them. I see that some of your offset management integration tests are ignored for likely these reasons.

Thank you!

Consumers should be able to group together and coordinate consumption of topics

Hi James,

Could you please let me know if you are planning to implement automatic consumer rebalancing similar to how Scala client does?

In my project we need to ensure that only one member of a consumer group consumes from a topic partition at a time. When that consumer fails, another one should take its place automatically.

Thank you very much.

Best regards,
Janos

Delay in consumer after message is send using simple producer

Hi

I'm sending a message using this python code:

client = KafkaClient("localhost:9092")
producer = SimpleProducer(client)
producer.send_messages("mytopic", bytes("some msg", 'utf-8'))

Then i am consuming it using kafka-net in the following way:

        var options = new KafkaOptions(new Uri("http://10.0.1.106:9092"), new Uri("http://10.0.1.106:9092"));
        var router = new BrokerRouter(options);
        ConsumerOptions opts = new ConsumerOptions("mytopic", router);
        var consumer = new Consumer(opts);

foreach (var message in consumer.Consume())
{
// do something with the message
}

My problem is that once a message is sent, it takes between 1-2 up to 10 seconds until it is consumed in jafka-net. Is this normal? I'm using default Kafka /zookeeper configs.

My expectation is that messages are received immediatelly.

Thanks
Angel

Producer.SendMessageAsync() await problem when using IIS

Producer.SendMessageAsync() deadlocks thread when running under IIS. Thread stucks on await even when result has been set for Tcs.Task. It appears in this codeline:

await Task.WhenAll(batch.Select(x => x.Tcs.Task));

The problem is in synchronization context. Similar issue with more explanation here: http://stackoverflow.com/questions/17202709/iis-failing-when-await-task-used

Adding ConfigureAwait(false) fixed the problem for me:

await Task.WhenAll(batch.Select(x => x.Tcs.Task)).ConfigureAwait(false);

It would be great if you'll include this fix into repository or fix it in own way. Thanks!

Tag Nuget releases

It would be really nice if you could tag the release, which would make it possible to see exactly what each Nuget package contains.

Add snappy support to compression

This requires a third party reference, so add function hooks for others to add support for the feature. Create an example version in the kafka-net-client project.

Consumer connects now to Broker not ZooKeeper

Hi,
I am completely new to Kafka, so apologies if this is completely wrong. Anyway; when looking at Kafka tutorials etc., I see that consumers connect to ZooKeeper and not a Broker.

In Kafka-Net, it seems the consumer connects to a Broker? If my observation is correct, will Kafka-Net consumers in some future release be able to connect to ZooKeeper, and does it matter?

Niels

consumer didn't commit message to Kafka.

I try by example code https://github.com/Jroland/kafka-net with kafka version 0.8.2.1.

First time, I run the consumer to consume all message in queue.
but the second time the consumer start to consume message offset 0 again.

I found your code in unit test has something like a commit the message but do not work

public bool Commit(Message message)
        {
            using (var routerx = new BrokerRouter(Options))
            {
                var conn = routerx.SelectBrokerRoute(this.Topic, message.Meta.PartitionId);

                var commit = CreateOffsetCommitRequest(ConsumerGroup, message.Meta.PartitionId, message.Meta.Offset);

                var response = conn.Connection.SendAsync(commit).Result.FirstOrDefault();


                if (response != null && response.Error == 0)
                    return true;
                else
                    throw new ApplicationException(string.Format("Cannot commit message of Topic:{0}, Group:{1}, Partition:{2}, Offset:{3}", Topic, ConsumerGroup, message.Meta.PartitionId, message.Meta.Offset));

            }
        }

please help,

KafkaTcpSocketTests.WriteAndReadShouldBeAsyncronous fails intermittently.

This test occasionally fails with the following:

FakeTcpServer: Accepting clients.
No connection to:http://localhost:8999/.  Attempting to re-connect...
Connection established to:http://localhost:8999/.
FakeTcpServer: Connected client
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: Client exception...  Exception:Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host.
FakeTcpServer: Client Disconnected.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
  Expected is <System.Collections.Generic.List`1[System.Int32]> with 10 elements, actual is <System.Linq.OrderedEnumerable`2[System.Int32,System.Int32]>
  Values differ at index [7]

   at NUnit.Framework.Assert.That(Object actual, IResolveConstraint expression, String message, Object[] args)
   at kafka_tests.Unit.KafkaTcpSocketTests.WriteAndReadShouldBeAsyncronous() in KafkaTcpSocketTests.cs: line 416

Async publishing causes producer to hang

When I use the Producer with acks=0 the producer waits for a response from the broker that never comes. A ProduceRequest with RequiredAcks=0 is the only time a broker will not send a response to a request.

Consumer hangs and no message is received

Hi,

I am implementing a simple consumer using the sample code on project page and it looks like there's an issue there. The code is being blocked by consumer.Consume() and it stays in that state "forever".
In parallel I run a console consumer and a python version of the consumer. Once a new message is publish both and console and python consumers work fine and I can see the message, but kafka-net does not react in any way.

Here is my code :

        var options = new KafkaOptions(new Uri("http://test:9092"));
        var brokerRouter = new BrokerRouter(options);
        var consumerOptions = new ConsumerOptions(topic, brokerRouter);
        var consumer = new Consumer(consumerOptions);

        //Consume returns a blocking IEnumerable (ie: never ending stream)
        foreach (var message in consumer.Consume())
        {
            Console.WriteLine("Response: P{0},O{1} : {2}",
                message.Meta.PartitionId, message.Meta.Offset, message.Value);
        }

Retries

Just quickly glanced through the code, and one thing notice is that there is no retries around the producer/consumer calls.

There are cases when partitions are moved, leader is still being re-elected, etc that Kafka relies on the clients to do retries and also refresh metadata automatically on soft failures (hard failures are failures that we have to abort).

SendAsync method does not time out even when ResponseTimeout occurs.

Submitted by Alex:

We have recently started using your library for our local queue implementation. I have a couple of questions though. We discovered a couple of potential issues with the libraries in how it handles connection. The library is great, but I was wondering of the issues would be addressed, and, possibly fixed in the following releases. One of the issues is in KafkaConnection.Task<List> SendAsync(IKafkaRequest request). You have a piece of code await SendAsync(request.Encode()).ConfigureAwait(false); (line 102). The problem with this piece if the connection goes down and the message times out with ScheduledTimer, it is never returns from await SendAsync, neither it throws the ResponseTimeoutException exception as we never get to read the result (since all exceptions are swallowed inside). The way the library works is that it "awaits" indefinitely (retries to get the tcp client without re-throwing the exception) and instead of timing out and reporting on the connection being down, it just keeps waiting on it. I believe we should not wait on that method and instead await on line 104 var response = await asyncRequest.ReceiveTask.Task.ConfigureAwait(false); which will throw an exception if the message times out (ScheduledTimer will take care of it).

Consumer stops to read messages after the node is recovered

I am using 3-node Kafka and kafka-net v1.0.0.0 client.
If one node failed (suppose it is on myserver2) and then recovered I see the next messages in log:

WARN  KafkaConsumer No connection to:http://myserver2.mycompany.net:9092/.  Attempting to re-connect...
WARN  KafkaConsumer Exception occured in polling read thread.  Message=Lost connection to server: http://myserver2.mycompany.net:9092/
WARN  KafkaConsumer Failed re-connection to:http://myserver2.mycompany.net:9092/.  Will retry in:1000
. . .
WARN  KafkaConsumer Failed re-connection to:http://myserver2.mycompany.net:9092/.  Will retry in:128000
WARN KafkaConsumer Connection established to:http://myserver2.mycompany.net:9092/.
WARN KafkaConsumer Message response received with correlationId=12294, but did not exist in the request queue.

After the last warning consumer remains active but reads nothing.

TCP disconnect handling

I see there a attempt to trigger reconnection when 0 bytes is read, but that is not necessarily the only symptom when you're disconnected I believe.

There might be SocketException when it wasn't able to read after a timeout, and also after it's being disconnected TCPClient has a connected flag that you want to check and if it was reconnect right away.

Memory Leak From Kafka Producer with kafka-net 0.9.0.49

using memory profiling tool to make and compare snapshots,
second snapshot is 20 minutes later than the first one.
full gc collect before create snapshot.

below print is the remained objects from first to second.

In my code,I only using client.SendMessageAsync to produce message , but with out wait method.

image

is there any way to release those bytes array? or i using it wrong way?
looking forward someone to help me.

Add way to specify that consumer should stop (return) if it reaches end of topic

We need the ability to consume a topic to its end and then quit.

Currently a Consume() function blocks/yields forever and never exits. That is to say, when no more messages are available on the server, the Enumerable just blocks and waits for more, which works great unless you don't want it to do that. I envision a way to signal that you want an Enumerable that ends when the last available message is received.

I am implementing this and will submit a PR, but want to get your input as soon as possible to get an idea how you might see something like this fitting into your vision of the project. (E.g. should this be a ConsumerOption, an argument on the Consume() method, a subclass of Consumer? or is there some other way of accomplishing this that I haven't thought of?)

Note: I have also considered, as an alternative, implementing this such that you get the current partition offsets for a topic and then, instead of specifying that you want to quit at the end, you specify the maximum offsets to retrieve. However, as far as I can tell, this isn't supported in the Kafka protocol, so at best it would be a client implementation (i.e. a client may end up retrieving past the maximum offset from the server, and throwing out the excess, then exiting the partition retrieval thread).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.