Giter Site home page Giter Site logo

reugn / go-streams Goto Github PK

View Code? Open in Web Editor NEW
1.8K 26.0 144.0 388 KB

A lightweight stream processing library for Go

Home Page: https://pkg.go.dev/github.com/reugn/go-streams

License: MIT License

Go 100.00%
stream-processing pipeline etl kafka data-stream kafka-streams streams redis pulsar data-pipeline

go-streams's People

Contributors

alexaum avatar ichxxx avatar jesseobrien avatar jlong4096 avatar reugn avatar wwslive avatar zhaijian avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-streams's Issues

Kafka messages should be processed in parallel

Hi,

I was looking at row 133 of kafka_sarama.go source code and noticed that every consumed message are pushed to the same channel.
If I'm correct, this have the effect to make the processing of the messages to be sequential, loosing the possibility of processing the partitions in parallel.
Could it make sense to have a channel per partition in this case?

Support for loop/feedback in the graph

I wonder whether this library supports loop/feedback constructs. Skimming the API gave a hint it shall be possible but I wonder about real world implications.

Any insights? Thanks!

how to handle the message and ignore the sink process

Hi developer,
normally the code should be written like this : source().Map().To(sink). But how to handle the messages if no need to provide sink that means no further consumer cares about it anymore. and the code could be like this: source().Map().End().

Thanks for your great lib ~~~

the problem with fanout function

I test the fanout function in flow_test.go, it seems not correct when I add another flow.
var toReture = func(in interface{}) interface{} { msg := in.(string) return msg }

func TestFlowUtil(t *testing.T) { in := make(chan interface{}) out := make(chan interface{}) source := ext.NewChanSource(in) flow1 := flow.NewMap(toUpper, 1) flow2 := flow.NewMap(toReture, 1) filter := flow.NewFilter(filterA, 1) sink := ext.NewChanSink(out) var _input = []string{"a", "b", "c"} var _expectedOutput = []string{"b", "b", "c", "c"} go ingest(_input, in) go deferClose(in, time.Second*10) go func() { fanOut := flow.FanOut(source.Via(filter).Via(flow1).Via(flow2, 2) fanOut[1].To(sink) }() var _output []string for e := range sink.Out { _output = append(_output, e.(string)) } sort.Strings(_output) assertEqual(t, _expectedOutput, _output) }
The function 'toReture' does nothing ,but it return incorrect result.

Protobufs

Do you think the system should describe things using protobufs so that the different distributed workers are tolerant to core type changes ?

Aka Schema Evolution

love the project btw

A Panic happened when I use NewSlidingWindowWithTSExtractor

I want to make my own timestamp func by using NewSlidingWindowWithTSExtractor, here is my code:
var elemTSExtractor = func(elem interface{}) int64 { fmt.Println("<------------------->") fmt.Println(elem) return time.Now().UnixNano() } k.slidingWindow = flow.NewSlidingWindowWithTSExtractor(time.Second*30, time.Second*5, elemTSExtractor)
bug got a panic. how can i use my own timestamp in sliding window.

Example code for Kafka - failed with panic

go version -1.19.5
kafka version -2.8.1

When I try to run the example for kafka I get panic error

panic: interface conversion: interface {} is []interface {}, not []*sarama.ConsumerMessage   goroutine 13 [running]: github.com/reugn/go-streams/flow.(*FlatMap[...]).doStream(0xc000028a60) ####/go/pkg/mod/github.com/reugn/[email protected]/flow/flat_map.go:81 +0x310 created by github.com/reugn/go-streams/flow.NewFlatMap[...] ####go/pkg/mod/github.com/reugn/[email protected]/flow/flat_map.go:38 +0x17c

this come from this function

var appendAsterisk = func(inArr []*sarama.ConsumerMessage) []*sarama.ConsumerMessage {
	outArr := make([]*sarama.ConsumerMessage, len(inArr))
	for i, msg := range inArr {
		msg.Value = []byte(string(msg.Value) + "*")
		outArr[i] = msg
	}
	return outArr
}

when i change code to :

var appendAsterisk = func(inArr []interface{}) []*sarama.ConsumerMessage {
	outArr := make([]*sarama.ConsumerMessage, len(inArr))
	for i, msg := range inArr {

		if value, ok := msg.(*sarama.ConsumerMessage); ok {
			//you can use variable `value`
			value.Value = []byte(string(value.Value) + "*")
			outArr[i] = value
		}
	}
	return outArr
}

its fine

Graceful shutdown

In all examples the main go-routine either blocks forever or waits for timeout.

Is there any way to gracefully exit main function once processing completes (but not aborting all currently running go-routines until they finish the job)? I can do it with ChanSink and using some sync primitive inside it, but it looks a bit ugly. Any nicer solution?

How to use `group` or `keyBy` on the stream?

After I read the example coud files in this repo. I cannot find a right way to achieve stream's group function or keyBy function.
such as 'partition(stream, key=lambda x: x['country'])' in python streamz or keyBy in flink.

Usage Scene: Count the number of times each country appears in 100 pieces of data.

Maybe I was careless and didn't see the corresponding example. But anyway, I need your help and expect your reply

Two questions about redis stream

Hi, developer,

So glad the redis stream is supported right now and I appreciate your effort very much.
This morning I read your code and try to figure out how to refactor my code, but I'm still confused about the following questions.
Could you can take a look when you are free?

  1. why no ack message sent after the message is transferred into the internal chan?
  2. how to avoid the messages in the internal chan of Source/Sink being discarded? Say the internal chan of RedisSource/RedisSink still has many messages that hadn't been consumed, but the program unfortunately needs to exit , how to handle such scenario?

Thanks again

DSL / Config for describing the Flow

We could describe the flow using a file dsl / idl. A type of config fike.

Mythen the runner loads this config file.

It’s almost like how Benthos works though .

It would be useful to write a Comparison with Benthos in the readme too because there is Carey over in sone ways

It would be great if there was a document or tutorial

go-streams is a very succinct library and it is suitable for my usage scenario.
but, I had a lot of question during use it. such as:

  1. Can I change variables outside? how to?
  2. Can I define my own source receiver?
  3. etc....
    I believe that the lack of documentation dissuades many people who want to use go-streams.
    The documentation does not have to be as detailed as flink, but it can clearly mark common usage and points that need attention.

Support grouping data by time and length

Is there a way to support grouping data by time and length?
like:

	ch := make(chan any, 5)
	go func() {
		for i := 0; i < 10000; i++ {
			time.Sleep(rndTime)
			ch <- i
		}
	}()
	source := extension.NewChanSource(ch)

	m := flow.NewMap(func(i interface{}) interface{} {
		logrus.Infof("%v", i)
		return i
	}, 1)
        // not supported now
	th := flow.NewBufferWithTimeOrCount(time.Millisecond * 100, 50) // every 100 mil secs or 50 counts emits data

	source.Via(th).Via(m).To(extension.NewIgnoreSink())

expection print:

[x,x,x,x...x,x,x] // 50 counts
// if 100 mil secs pass
[x,x,...x,x] // data counts less than 50

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.