reugn / go-streams Goto Github PK
View Code? Open in Web Editor NEWA lightweight stream processing library for Go
Home Page: https://pkg.go.dev/github.com/reugn/go-streams
License: MIT License
A lightweight stream processing library for Go
Home Page: https://pkg.go.dev/github.com/reugn/go-streams
License: MIT License
Hi,
I was looking at row 133 of kafka_sarama.go source code and noticed that every consumed message are pushed to the same channel.
If I'm correct, this have the effect to make the processing of the messages to be sequential, loosing the possibility of processing the partitions in parallel.
Could it make sense to have a channel per partition in this case?
I wonder whether this library supports loop/feedback constructs. Skimming the API gave a hint it shall be possible but I wonder about real world implications.
Any insights? Thanks!
Hi developer,
normally the code should be written like this : source().Map().To(sink). But how to handle the messages if no need to provide sink that means no further consumer cares about it anymore. and the code could be like this: source().Map().End().
Thanks for your great lib ~~~
I test the fanout function in flow_test.go, it seems not correct when I add another flow.
var toReture = func(in interface{}) interface{} { msg := in.(string) return msg }
func TestFlowUtil(t *testing.T) { in := make(chan interface{}) out := make(chan interface{}) source := ext.NewChanSource(in) flow1 := flow.NewMap(toUpper, 1) flow2 := flow.NewMap(toReture, 1) filter := flow.NewFilter(filterA, 1) sink := ext.NewChanSink(out) var _input = []string{"a", "b", "c"} var _expectedOutput = []string{"b", "b", "c", "c"} go ingest(_input, in) go deferClose(in, time.Second*10) go func() { fanOut := flow.FanOut(source.Via(filter).Via(flow1).Via(flow2, 2) fanOut[1].To(sink) }() var _output []string for e := range sink.Out { _output = append(_output, e.(string)) } sort.Strings(_output) assertEqual(t, _expectedOutput, _output) }
The function 'toReture' does nothing ,but it return incorrect result.
There are many instances running in Distributed architecture. Some status are shared between instances, like time windows. We could put it on Redis, or other storage.
Hello,
I can't find any example of flow feature 'split'.
Please provide an example,
Thank You
Do you think the system should describe things using protobufs so that the different distributed workers are tolerant to core type changes ?
Aka Schema Evolution
love the project btw
I want to make my own timestamp func by using NewSlidingWindowWithTSExtractor, here is my code:
var elemTSExtractor = func(elem interface{}) int64 { fmt.Println("<------------------->") fmt.Println(elem) return time.Now().UnixNano() } k.slidingWindow = flow.NewSlidingWindowWithTSExtractor(time.Second*30, time.Second*5, elemTSExtractor)
bug got a panic. how can i use my own timestamp in sliding window.
If no one else is interested, I want to do.
This neat framework is awesome. :)
go version -1.19.5
kafka version -2.8.1
When I try to run the example for kafka I get panic error
panic: interface conversion: interface {} is []interface {}, not []*sarama.ConsumerMessage goroutine 13 [running]: github.com/reugn/go-streams/flow.(*FlatMap[...]).doStream(0xc000028a60) ####/go/pkg/mod/github.com/reugn/[email protected]/flow/flat_map.go:81 +0x310 created by github.com/reugn/go-streams/flow.NewFlatMap[...] ####go/pkg/mod/github.com/reugn/[email protected]/flow/flat_map.go:38 +0x17c
this come from this function
var appendAsterisk = func(inArr []*sarama.ConsumerMessage) []*sarama.ConsumerMessage {
outArr := make([]*sarama.ConsumerMessage, len(inArr))
for i, msg := range inArr {
msg.Value = []byte(string(msg.Value) + "*")
outArr[i] = msg
}
return outArr
}
when i change code to :
var appendAsterisk = func(inArr []interface{}) []*sarama.ConsumerMessage {
outArr := make([]*sarama.ConsumerMessage, len(inArr))
for i, msg := range inArr {
if value, ok := msg.(*sarama.ConsumerMessage); ok {
//you can use variable `value`
value.Value = []byte(string(value.Value) + "*")
outArr[i] = value
}
}
return outArr
}
its fine
https://nats.io/blog/new-per-subject-discard-policy/
Probably worth turning this on as a default in go-streams ?
It removes sone hairy race conditions
In all examples the main go-routine either blocks forever or waits for timeout.
Is there any way to gracefully exit main function once processing completes (but not aborting all currently running go-routines until they finish the job)? I can do it with ChanSink and using some sync primitive inside it, but it looks a bit ugly. Any nicer solution?
After I read the example coud files in this repo. I cannot find a right way to achieve stream's group
function or keyBy
function.
such as 'partition(stream, key=lambda x: x['country'])' in python streamz or keyBy
in flink.
Usage Scene: Count the number of times each country appears in 100 pieces of data.
Maybe I was careless and didn't see the corresponding example. But anyway, I need your help and expect your reply
This is probably not high priority for the vast majority of users, however the use of the confluent kafka client makes it so that go-streams is not windows compatible out of the box.
The suggestion I'd like to make is to change the extension to use something like:
https://github.com/segmentio/kafka-go
will redis stream be supported in future?
That would be wonderful to have this feature. : )
Hi, developer,
So glad the redis stream is supported right now and I appreciate your effort very much.
This morning I read your code and try to figure out how to refactor my code, but I'm still confused about the following questions.
Could you can take a look when you are free?
Thanks again
We could describe the flow using a file dsl / idl. A type of config fike.
Mythen the runner loads this config file.
It’s almost like how Benthos works though .
It would be useful to write a Comparison with Benthos in the readme too because there is Carey over in sone ways
go-streams is a very succinct library and it is suitable for my usage scenario.
but, I had a lot of question during use it. such as:
Is there a way to support grouping data by time and length?
like:
ch := make(chan any, 5)
go func() {
for i := 0; i < 10000; i++ {
time.Sleep(rndTime)
ch <- i
}
}()
source := extension.NewChanSource(ch)
m := flow.NewMap(func(i interface{}) interface{} {
logrus.Infof("%v", i)
return i
}, 1)
// not supported now
th := flow.NewBufferWithTimeOrCount(time.Millisecond * 100, 50) // every 100 mil secs or 50 counts emits data
source.Via(th).Via(m).To(extension.NewIgnoreSink())
expection print:
[x,x,x,x...x,x,x] // 50 counts
// if 100 mil secs pass
[x,x,...x,x] // data counts less than 50
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.