Giter Site home page Giter Site logo

Custom starting offset about go_kafka_client HOT 5 OPEN

elodina avatar elodina commented on August 22, 2024
Custom starting offset

from go_kafka_client.

Comments (5)

joestein avatar joestein commented on August 22, 2024

We could probably start storing an offset for every minute or something so you can re-wind at any minute in the stream. We are working on some refactoring in that part of the code over the coming weeks. I think so maybe we can try to hook something up that would work best.

from go_kafka_client.

sajal avatar sajal commented on August 22, 2024

For my use case, I could manage the offset myself if there were a way to specify which starting offset to use when starting up (or re-balancing).

A logic like x offsets ago is also fine. Say when starting up or (rebalancing) the newest offset is x... it could process from offset x - n where n is something configured.

from go_kafka_client.

baconalot avatar baconalot commented on August 22, 2024

+1 a common usecase I have is priming a job with some historic data, but not the complete available dataset.

from go_kafka_client.

baconalot avatar baconalot commented on August 22, 2024

Only now do I see the problems here. With a single consumer pid it would work fine if we had something in config like ForceSetParitionStart = 1234.
Flow would then be:
-pid 333 (consumer.go) start
-set offsets to 1234
-start consuming

But... with multiple pids (chronos anyone):
-pid 333 (consumer.go) start
-(333)set offsets to 1234
-(333)start consuming
-pid 334 (consumer.go) start
-(334)set offsets to 1234
-(334)start consuming
-(333) -> reprocess messages

Also how would this be configured. Cant be single int, since there can be any topic/partitions in the conf's group.

For now I am just going to use a helper that can set a commit manually for me:

    zkconfig := go_kafka_client.NewZookeeperConfig()
    zkcoord := go_kafka_client.NewZookeeperCoordinator(zkconfig)

    err := zkcoord.Connect()
    if err != nil {
        fmt.Errorf(err.Error())
    }

    tp := go_kafka_client.TopicAndPartition{}
    tp.Topic = "some_topic_name"
    tp.Partition = 213
    err = zkcoord.CommitOffset("some_group_id", &tp, 123)
    if err != nil {
        fmt.Errorf(err.Error())
    }

from go_kafka_client.

sajal avatar sajal commented on August 22, 2024

Isint Offset tied to a partition and not at topic level? or do you mean to process single partition in multiple pids?

As long as each partition gets processed by single pid(my usecase), is there any issues in using x - n as starting offset?

from go_kafka_client.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.