s8sg / goflow Goto Github PK
View Code? Open in Web Editor NEWA Golang based high performance, scalable and distributed workflow framework
License: MIT License
A Golang based high performance, scalable and distributed workflow framework
License: MIT License
package main
import (
"fmt"
flow "github.com/s8sg/goflow/flow/v1"
goflow "github.com/s8sg/goflow/v1"
)
// Workload function
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
fmt.Printf("hello world\n")
return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
}
// Define provide definition of the workflow
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
dag := workflow.Dag()
dag.Node("test", doSomething)
return nil
}
func main() {
fs := &goflow.FlowService{
Port: 8080,
RedisURL: "192.168.1.65:16379", // need redis password 123
OpenTraceUrl: "192.168.1.65:5775",
WorkerConcurrency: 5,
EnableMonitoring: true,
}
fs.Register("myflow", DefineWorkflow)
fs.Start()
}
Hey @s8sg
I was checking the framework to build an aggregator service which calls multiple services and combine results received from each one of them. I was wondering if i can use this framework to achieve below use case
lets say i have three nodes arranged in series. I want to store the result of intermediate nodes so that on the basis of result, i would decide if i want to proceed to the next node or not. I saw dag.ConditionalBranch
can achieve so by adding list of conditions. my question is how would you parse response of intermediate nodes? How can we fetch that response from previous node is one defined in my conditionList
I would appreciate if you can give more detailed example of ConditionalBranch
Provide a 95% unit test mocking where it's opportune with gomock. No tests are present in the current code base.
How to reproduce
curl -d hello http://localhost:8080/myflow
- Everything worksfor i in {1..50}; do curl -d hello http://localhost:8080/myflow >> output.log & ; done
the following is happening:
2023/04/24 13:39:29 http: panic serving 127.0.0.1:51880: runtime error: invalid memory address or nil pointer dereference
goroutine 622 [running]:
net/http.(*conn).serve.func1()
/nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:1854 +0xbf
panic({0x765840, 0xaa66e0})
/nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/runtime/panic.go:890 +0x263
github.com/s8sg/goflow/eventhandler.(*TraceHandler).StopNodeSpan(...)
/home/danash/go/pkg/mod/github.com/s8sg/[email protected]/eventhandler/trace_handler.go:110
github.com/s8sg/goflow/eventhandler.(*GoFlowEventHandler).ReportNodeEnd(0x79c520?, {0xc00088c910?, 0x7f6d9e1e69a8?}, {0x7f6dc6d6e3c8?, 0xc000051400?})
/home/danash/go/pkg/mod/github.com/s8sg/[email protected]/eventhandler/goflow_event_handler.go:57 +0x3d
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).findNextNodeToExecute(0xc000177740)
/home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/sdk/executor/executor.go:627 +0x4f8
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).Execute(0xc000177740, 0xc0010bd720)
/home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/sdk/executor/executor.go:1265 +0x9d8
github.com/s8sg/goflow/core/runtime/controller/handler.ExecuteFlowHandler(0xc0011484b0, 0xc0007fc600, {0x885bb8, 0xc000902680})
/home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/runtime/controller/handler/execute_flow_handler.go:33 +0x265
github.com/s8sg/goflow/runtime.newRequestHandlerWrapper.func1({0x882080, 0xc000d382a0}, 0xc00114c000, {0xc000882d80, 0x1, 0xc000c30b70?})
/home/danash/go/pkg/mod/github.com/s8sg/[email protected]/runtime/new_request_handler_wrapper.go:52 +0x6ec
github.com/julienschmidt/httprouter.(*Router).ServeHTTP(0xc00010e1e0, {0x882080, 0xc000d382a0}, 0xc00114c000)
/home/danash/go/pkg/mod/github.com/julienschmidt/[email protected]/router.go:387 +0x81c
net/http.serverHandler.ServeHTTP({0x880de8?}, {0x882080, 0xc000d382a0}, 0xc00114c000)
/nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:2936 +0x316
net/http.(*conn).serve(0xc0002a8e10, {0x8822d8, 0xc0001181e0})
/nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:1995 +0x612
created by net/http.(*Server).Serve
/nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:3089 +0x5ed
2023/04/24 13:39:29 Reporting span 338343586c9e519f:62cdaa3f404a5f84:6cad4d79616ee4e9:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:6cad4d79616ee4e9:338343586c9e519f:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:7818de4473a991b2:338343586c9e519f:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:354240b2e50c0ada:6cad4d79616ee4e9:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:6cad4d79616ee4e9:338343586c9e519f:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:6cad4d79616ee4e9:338343586c9e519f:1
2023/04/24 13:39:29 Executing flow myflow
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:38cef6c7b9bc1643:16ba1e6c69d0d230:1
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:06ac2ac89bb22560:2cec2b7378dfba32:1
2023/04/24 13:39:29 http: panic serving 127.0.0.1:51896: [request `ch35ooasig6serk31o8g`] Pipeline is not active
goroutine 639 [running]:
net/http.(*conn).serve.func1()
/nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:1854 +0xbf
panic({0x747500, 0xc000292900})
/nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/runtime/panic.go:890 +0x263
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).executeNode(0xc0008f5740, {0xc0011c0200, 0x5, 0x200})
/home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/sdk/executor/executor.go:346 +0x645
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).Execute(0xc0008f5740, 0xc000a49720)
/home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/sdk/executor/executor.go:1257 +0x8cb
github.com/s8sg/goflow/core/runtime/controller/handler.ExecuteFlowHandler(0xc00047c2d0, 0xc000181c80, {0x885bb8, 0xc0009b8000})
/home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/runtime/controller/handler/execute_flow_handler.go:33 +0x265
github.com/s8sg/goflow/runtime.newRequestHandlerWrapper.func1({0x882080, 0xc0011bc0e0}, 0xc000454000, {0xc000cfe080, 0x1, 0xc000a49b70?})
/home/danash/go/pkg/mod/github.com/s8sg/[email protected]/runtime/new_request_handler_wrapper.go:52 +0x6ec
github.com/julienschmidt/httprouter.(*Router).ServeHTTP(0xc00010e1e0, {0x882080, 0xc0011bc0e0}, 0xc000454000)
/home/danash/go/pkg/mod/github.com/julienschmidt/[email protected]/router.go:387 +0x81c
net/http.serverHandler.ServeHTTP({0x880de8?}, {0x882080, 0xc0011bc0e0}, 0xc000454000)
/nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:2936 +0x316
net/http.(*conn).serve(0xc0003223f0, {0x8822d8, 0xc0001181e0})
/nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:1995 +0x612
created by net/http.(*Server).Serve
/nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:3089 +0x5ed
2023/04/24 13:39:29 debug logging disabled
2023/04/24 13:39:29 Initializing logging reporter
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:2cec2b7378dfba32:16ba1e6c69d0d230:1
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:37ed544c17db1cb0:2cec2b7378dfba32:1
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:2cec2b7378dfba32:16ba1e6c69d0d230:1
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:1b7dd13c2c4c0da7:0000000000000000:1
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:027da292581982b2:0d43717ff3b76e9f:1
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:1b7dd13c2c4c0da7:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:0d43717ff3b76e9f:1b7dd13c2c4c0da7:1
2023/04/24 13:39:29 debug logging disabled
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:1b7dd13c2c4c0da7:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:1b7dd13c2c4c0da7:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:13e86d6139141d0a:79fe1508dadda560:1
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:79fe1508dadda560:5983b20c54d0ee71:1
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:5983b20c54d0ee71:0000000000000000:1
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:5983b20c54d0ee71:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:5983b20c54d0ee71:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:5983b20c54d0ee71:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
By implementing #46 we might achieve a more robustness handling for requests
jozoppi@DESKTOP-T9826C8:/projects/goflow$ git push --set-upstream origin sample_flows/projects/goflow$
remote: Permission to s8sg/goflow.git denied to giorgiozoppi.
fatal: unable to access 'https://github.com/s8sg/goflow/': The requested URL returned error: 403
jozoppi@DESKTOP-T9826C8:
Hi @s8sg ,
We are using goflow to build an online system. The main use case is to orchestrate different tasks together like workflows.
For example, initially user has a workflow like
Node-A -> Node-B
But the user now want to modify and changed to
Node-A -> Node-C -> Node-B
Currently, however, after registering workflows, the DAG cannot be changed.
(Of course, we can restart the flow service to achieve the reload)
So I try to contribute in this PR #64
I have a flow with multiple nodes: 1 -> 2 -> 3 -> 4. When this flow is executed, Node 1 done, Node 2 done, Node 3 runs for a long time but eventually forces a stop due to the application shutting down. However, when I restart the application, I want the flow to restart from Node 3, but it does not. How can I achieve this?
Can you please add some examples for using this library?
Also how would I return value of final result from dag?
Queue Worker is mainly used to handle async operations.
Queue working internally uses RMQ - which internally uses Redis
The goal is to abstract the queue-worker in a way so that all RMQ specific details gets abstracted into Queue Worker interface
QueueWorker in theory should abstract
func(task *Task) error
for NewRequest, PartialRequest, Stop, Pause, Resume, WorkloadResponse
It would be awesome to have default metrics like: flow started, elapsed times, flow counts, etc.
Also those can be exported to prometheus by default.
We would like to contribute it if you accept ๐
I tried to implement your library. But I can't for Creating More Complex DAG examples as mentioned on READ.md.
I tried following but the functions never executed:
package main
import (
"fmt"
"github.com/faasflow/goflow"
flow "github.com/faasflow/lib/goflow"
"log"
)
// Workload function
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
log.Println(fmt.Sprintf("you said \"%s\"", string(data)))
return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
}
// Workload function
func loadProfile(data []byte, option map[string][]string) ([]byte, error) {
log.Println(fmt.Sprintf("load profile \"%s\"", string(data)))
return []byte(fmt.Sprintf("load profile \"%s\"", string(data))), nil
}
// Workload function
func getPresignedURLForImage(data []byte, option map[string][]string) ([]byte, error) {
log.Println(fmt.Sprintf("image url \"%s\"", string(data)))
return []byte(fmt.Sprintf("image url \"%s\"", string(data))), nil
}
// Workload function
func detectFace(data []byte, option map[string][]string) ([]byte, error) {
log.Println(fmt.Sprintf("detect face \"%s\"", string(data)))
return []byte(fmt.Sprintf("detect face \"%s\"", string(data))), nil
}
// Workload function
func markProfileBasedOnStatus(data []byte, option map[string][]string) ([]byte, error) {
log.Println(fmt.Sprintf("mask profile \"%s\"", string(data)))
return []byte(fmt.Sprintf("mask profile \"%s\"", string(data))), nil
}
// Define provide definition of the workflow
func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
dag := f.Dag()
dag.Node("get-kyc-image").Apply("load-profile", loadProfile).
Apply("get-image-url", getPresignedURLForImage).
Apply("face-detect", detectFace).
Apply("mark-profile", markProfileBasedOnStatus)
dag.Edge("get-kyc-image", "face-detect")
dag.Edge("face-detect", "mark-profile")
return nil
}
func main() {
fs := &goflow.FlowService{
Port: 8080,
RedisURL: "localhost:6379",
OpenTraceUrl: "localhost:5775",
WorkerConcurrency: 5,
}
fs.Start("myflow", DefineWorkflow)
}
Hi,
Is there any way to show the workflows graphically? same as the photos used in the documentation, or anything else...
Thank you
Firstly, thank you for offering us a very useful tool to build an efficient workflow architecture.
However, when I'm using your github page's README.md's introduction, I encountered this problem:
github.com/s8sg/goflow/flow/v1: module github.com/s8sg/goflow@latest found (v0.1.0), but does not contain package github.com/s8sg/goflow/flow/v1
github.com/s8sg/goflow/v1: module github.com/s8sg/goflow@latest found (v0.1.0), but does not contain package github.com/s8sg/goflow/v1
in my golang project's import, I write this:
import (
goflow "github.com/s8sg/goflow/v1"
flow "github.com/s8sg/goflow/flow/v1"
)
just the same as your github webpage's README.md write.
So, I went into my local go path where your src code downloaded to find out what happend. In my directory, I found the source code I downloaded is v0.1.0 which is 2020's version and it does not match your latest README instruction. Apparently, the latest release on Github is outdated. So, could you update a latest release that can match your latest README.md?
in my case, i want to use goflow to create some resources on a public cloud. like creating a cloud vm with public ip.
i want
flow.cancel()
), workflow can free the created resources(vm,public ip etc)After installing goflow as explained in the guide, I have the following problem:
$ go build -o goflow
flow.go:6:2: no required module provides package github.com/s8sg/goflow/flow/v1; to add it:
go get github.com/s8sg/goflow/flow/v1
flow.go:7:2: no required module provides package github.com/s8sg/goflow/v1; to add it:
go get github.com/s8sg/goflow/v1
I try to solve it with the indicated commands but it does not work.
$ go get github.com/s8sg/goflow/flow/v1
go: module github.com/s8sg/goflow@upgrade found (v0.1.0), but does not contain package github.com/s8sg/goflow/flow/v1
Sorry in advance if this is something silly, I am new to learning golang.
I'd like to explore and design with you the hashicorp vault support for goflow
Hi there,
When we build an online system to create/modify workflows for users, we found it is neccessary to validate whether the DAG created by users is valid. And there is a simple way to implement the Validate() in workflow layer. #71
Thanks!
And, is this stable for production use?
In the DAG asynchronously we're creating a monolith structure when n the Distributed ETL use case it will be nice to have a flag to distribute the node of a graph as job, doing effectively a pipeline effect using redis queuing mechanism.
Suppose we have a DAG:
This needs to be evalutated in design meeting and follow up discussion and check what currentely kubeflow does.
Add ability to play, pause and Stop a GoFlow, by querying to the individual flow server.
Is there an option to export the entire DAG in json?
Expect:
Pause
can pause the specified workflow, and Resume
can resume it.
Stop
can stop active workflow and set its status to Finished
.
Actually:
Pasue
, the workflow remains executed until the end....
2022/09/09 17:12:45 [request `my_test_flow-1662714755`] intermediate result from node 0_1_Node-1 to 0_2_Node-2 stored as 0_1_Node-1--0_2_Node-2
2022/09/09 17:12:45 [request `my_test_flow-1662714755`] performing request for Node 0_2_Node-2, indegree count is 1
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] request submitted for Node 0_2_Node-2
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] intermediate result from Node 0_1_Node-1 to Node 0_2_Node-2 retrieved from 0_1_Node-1--0_2_Node-2
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] executing node 0_2_Node-2
Test Node: O_O "TestData"
2022/09/09 17:12:47 Pausing request my_test_flow-1662714755 of flow my_test_flow
Node End
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] completed execution of node 0_2_Node-2
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] intermediate result from node 0_2_Node-2 to 0_3_Node-3 stored as 0_2_Node-2--0_3_Node-3
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] performing request for Node 0_3_Node-3, indegree count is 1
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] Request is paused, storing partial state for node: 0_3_Node-3
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] request submitted for Node 0_3_Node-3
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] intermediate result from Node 0_2_Node-2 to Node 0_3_Node-3 retrieved from 0_2_Node-2--0_3_Node-3
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] executing node 0_3_Node-3
Test Node: O_O "O_O "TestData""
Node End
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] completed execution of node 0_3_Node-3
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] intermediate result from node 0_3_Node-3 to 0_4_Node-4 stored as 0_3_Node-3--0_4_Node-4
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] performing request for Node 0_4_Node-4, indegree count is 1
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] Request is paused, storing partial state for node: 0_4_Node-4
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] request submitted for Node 0_4_Node-4
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] intermediate result from Node 0_3_Node-3 to Node 0_4_Node-4 retrieved from 0_3_Node-3--0_4_Node-4
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] executing node 0_4_Node-4
...
Stop
:2022/09/09 17:15:03 Pausing request my_test_flow for flow my_test_flow-1662714890
Node End
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] completed execution of node 0_2_Node-2
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] failed to obtain pipeline state, error failed to get key core.my_test_flow.my_test_flow-1662714890.request-state, nil
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] pipeline is not active
panic: [request `my_test_flow-1662714890`] Pipeline is not active
goroutine 30 [running]:
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).findNextNodeToExecute(0xc0002c7c78)
/Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/sdk/executor/executor.go:609 +0x614
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).Execute(0xc0002c7c78, 0xc0002c7c58)
/Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/sdk/executor/executor.go:1266 +0x9c8
github.com/s8sg/goflow/core/runtime/controller/handler.PartialExecuteFlowHandler(0xc0002c7dd0, 0xc00028ecc0?, {0x14771f8, 0xc0002a6410})
/Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/runtime/controller/handler/partial_execute_flow_handler.go:25 +0xe5
github.com/s8sg/goflow/runtime.(*FlowRuntime).handlePartialRequest(0xc00012af70, 0xc00028ecc0)
/Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:457 +0x1c6
github.com/s8sg/goflow/runtime.(*FlowRuntime).handleRequest(0xc0002e6380?, 0xc0002e21e0?, {0xc0002a43b0?, 0x13260a0?})
/Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:414 +0xc8
github.com/s8sg/goflow/runtime.(*FlowRuntime).Consume(0xc00012af70, {0x1473a98, 0xc0002320e0})
/Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:394 +0x2ae
github.com/adjust/rmq/v4.(*redisQueue).consumerConsume(0xc0001d4000, {0x14701a0, 0xc00012af70})
/Users/dli/workspace/lab/gowf-test/vendor/github.com/adjust/rmq/v4/queue.go:282 +0x9f
created by github.com/adjust/rmq/v4.(*redisQueue).AddConsumer
/Users/dli/workspace/lab/gowf-test/vendor/github.com/adjust/rmq/v4/queue.go:260 +0xe5
Here is the test code:
package main
import (
"flag"
"fmt"
"time"
flow "github.com/s8sg/goflow/flow/v1"
goflow "github.com/s8sg/goflow/v1"
)
const (
flowName = "my_test_flow"
)
var (
server = flag.Bool("s", false, "start server")
op = flag.String("o", "exec", "operation")
rid = flag.String("i", "", "request id")
)
func workload(data []byte, option map[string][]string) ([]byte, error) {
fmt.Printf("Test Node: %s\n", string(data))
time.Sleep(10 * time.Second)
fmt.Println("Node End")
return []byte(fmt.Sprintf("O_O \"%s\"", string(data))), nil
}
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
dag := workflow.Dag()
dag.Node("Node-1", workload)
dag.Node("Node-2", workload)
dag.Node("Node-3", workload)
dag.Node("Node-4", workload)
dag.Node("Node-5", workload)
dag.Node("Node-6", workload)
dag.Node("Node-7", workload)
dag.Node("Node-8", workload)
dag.Edge("Node-1", "Node-2")
dag.Edge("Node-2", "Node-3")
dag.Edge("Node-3", "Node-4")
dag.Edge("Node-4", "Node-5")
dag.Edge("Node-5", "Node-6")
dag.Edge("Node-6", "Node-7")
dag.Edge("Node-7", "Node-8")
return nil
}
func startServer() {
fs := &goflow.FlowService{
Port: 8088,
RedisURL: "127.0.0.1:6379",
WorkerConcurrency: 2,
RetryCount: 1,
DebugEnabled: true,
}
err := fs.Register(flowName, DefineWorkflow)
if err != nil {
panic(err)
}
err = fs.Start()
if err != nil {
panic(err)
}
}
func ExecOp(op string) string {
fs := &goflow.FlowService{
RedisURL: "127.0.0.1:6379",
}
reqId := *rid
if op == "exec" {
reqId = fmt.Sprintf("%s-%d", flowName, time.Now().Unix())
}
if len(reqId) == 0 {
panic("request id is empty")
}
switch op {
case "exec":
err := fs.Execute(flowName, &goflow.Request{
Body: []byte("TestData"),
RequestId: reqId,
})
if err != nil {
panic(err)
}
return reqId
case "pause":
err := fs.Pause(flowName, reqId)
if err != nil {
panic(err)
}
case "resume":
err := fs.Resume(flowName, reqId)
if err != nil {
panic(err)
}
case "stop":
err := fs.Stop(flowName, reqId)
if err != nil {
panic(err)
}
}
return reqId
}
func main() {
flag.Parse()
if *server {
startServer()
return
}
reqId := ExecOp(*op)
fmt.Println(reqId)
}
Start the server:
./wf -s
Client:
./wf -o exec
./wf -o pause -i [request_id]
./wf -o resume -i [request_id]
./wf -o stop -i [request_id]
Currently Flow and Workload are tightly coupled as workload are defined inside of a flow definition
This has a few drawbacks
To solve this issue GoFlow will support a special kind of Operation called ApplyExternal()
like Apply()
It will take a unique ID of the workload that need to be called
Node('n2').ApplyExternal('task1')
To define a workload user need to use the GoFlow Library and Register a Workload using the unique ID
fs.Register('task1', PerformTask1)
fs.StartWorkload(['myflow']) // start workload will take the list of flows
Workload will have the same signature
func PerformTask1(data []byte, option map[string][]string) ([]byte, error) {
...
}
Redis has eventual consistentcy that makes inadapt to use when save statistics, a better option is using etcd.
When initializing GoFlow with all the necessary configurations, I encountered an issue with passing the Redis client. In the GoFlow struct, there's only an option to provide the RedisClient:
fs := &goflow.FlowService{
RedisURL: redisUrl,
WorkerConcurrency: 5,
}
However, when attempting to include SSL configuration in the Redis URL, I encountered the following error message:
failed to initiate connection, error dial tcp: address .too many colons in address
The sample Redis URL used was: "rediss://" + options.Username + ":" + options.Password + "@" + options.Addr + "/0?ssl_cert_reqs=None"
Further investigation is required to resolve this problem and successfully pass the SSL configuration to the go flow during initialization.
Currently we are using redis to register
Code is currently scattered across runtime like this
The goal is to obstruct the RDB into a flow registry
We can initialise the FlowRegistry at the same time when we are initialising the RDB
I have been contemplating ideas around a workflow library for some time, this looks good. I think one question I'd have is, what would a Flow
interface look like? We want to define that abstraction potentially in Micro as a way of doing orchestration. Once you have enough services this makes sense.
Here is FlowService struct
type FlowService struct {
Port int
RedisURL string
RedisPassword string
RequestAuthSharedSecret string
RequestAuthEnabled bool
WorkerConcurrency int
RetryCount int
Flows map[string]runtime.FlowDefinitionHandler
RequestReadTimeout time.Duration
RequestWriteTimeout time.Duration
OpenTraceUrl string
DataStore sdk.DataStore
Logger sdk.Logger
EnableMonitoring bool
DebugEnabled bool
runtime *runtime.FlowRuntime
}
I want to connect to a specifically redis db, but FlowService don't have a field like RedisDB
It would be awesome to expose flow events via registering event hooks to flow library.
We would like to know when our flow state change: started, completed, failed, etc.
We would like to add missing features by contributing to this library if you accept it ๐
GoFlow DSL would allow user to define a workflow as a DSL. DSL can be a simple yaml file
The expectation is:
The codahale/hdrhistogram repo has been transferred under the github HdrHstogram umbrella with the help from the original author in Sept 2020 (new repo url https://github.com/HdrHistogram/hdrhistogram-go). The main reasons are to group all implementations under the same roof and to provide more active contribution from the community as the original repository was archived several years ago.
The dependency URL should be modified to point to the new repository URL. The tag "v0.9.0" was applied at the point of transfer and will reflect the exact code that was frozen in the original repository.
If you are using Go modules, you can update to the exact point of transfer using the @v0.9.0 tag in your go get command.
go mod edit -replace github.com/codahale/hdrhistogram=github.com/HdrHistogram/[email protected]
From the point of transfer, up until now (mon 16 aug 2021), we've released 3 versions that aim support the standard HdrHistogram serialization/exposition formats, and deeply improve READ performance.
We recommend to update to the latest version.
Hey @s8sg ,
I am wondering if goflow can generate dependency graph of defined workflows, since its using opentracing
The goal of this story is to make it easier to build dag from a data engineer prospective. A data engineer shall not be able to
be proficent in Go and basically we want to provide an UI where he/she is able to draw a dag. We want to leverage the https://github.com/AlexImb/vue-dag library to obtain that scope.
As end user i want to:
So the scope of this story is:
1 . Create a UI that is able to draw/save a graph and schedule its execution.
2.. Provide a REST api that is able to submit a transpiled graph.
How to get response from latest executed node and send back to the client?
We want to support multi distributed queue support other then a Redis:
So we might want to abstract the queue mechanism of this workflow manager to allow this extensibility.
When I try to run the sample program provided, it gives the following errors
./main.go:17:14: cannot use doSomething (type func([]byte, map[string][]string) ([]byte, error)) as type flow.BranchOption in argument to dag.Node
I tried to execute this on "The Go Playground", but it doesn't execute there.
We would like to explore goflow for one of the requirement. Could you please help here
[Feature] Support dynamically appending flows after the flow service started.
Currently, it is only possible to register a workflow before starting the flow service. If you need to register a new workflow after starting the flow service, a restart is required. We need to provide a more flexible solution in the future.
It's good for triggering workflow by api. But sometimes i wanna run my workflow in command mode. I wanna keep my workflow definition.
package amin
import (
"fmt"
flow "github.com/s8sg/goflow/flow/v1"
)
// Workload function
func node1(data []byte, option map[string][]string) ([]byte, error) {
result := fmt.Sprintf("(Executing node 1 with data (%s))", string(data))
fmt.Println(result)
return []byte(result), nil
}
// DefineWorkflow Define provide definition of the workflow
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
dag := workflow.Dag()
dag.Node("node1", node1)
return nil
}
then run this workflow in main
workflow.Run("my-workflow")
We would like to add custom middleware to the internal http server. Also if we can pass query parameters to the endpoints, we can create more generalized flows.
Goflow library can expose its internal server and handlers by sane defaults, hence users can overwrite it.
fatal error: concurrent map writes
fatal error: concurrent map writes
goroutine 67 [running]:
runtime.throw(0x14d49a2, 0x15)
/usr/local/go/src/runtime/panic.go:1116 +0x72 fp=0xc00038b6e0 sp=0xc00038b6b0 pc=0x10343f2
runtime.mapassign_faststr(0x1439380, 0xc0000df950, 0x14cdfa3, 0x4, 0x1)
/usr/local/go/src/runtime/map_faststr.go:291 +0x3de fp=0xc00038b748 sp=0xc00038b6e0 pc=0x1014f5e
github.com/s8sg/goflow/eventhandler.(*TraceHandler).StartOperationSpan(0xc0002e2b40, 0xc000227090, 0x8, 0xc000376800, 0x14, 0x14cdfa3, 0x4)
/Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/eventhandler/trace_handler.go:124 +0x223 fp=0xc00038b7d8 sp=0xc00038b748 pc=0x13107d3
github.com/s8sg/goflow/eventhandler.(*FaasEventHandler).ReportOperationStart(0xc00009c100, 0x14cdfa3, 0x4, 0xc000227090, 0x8, 0xc000376800, 0x14)
/Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/eventhandler/faas_event_handler.go:66 +0x6b fp=0xc00038b820 sp=0xc00038b7d8 pc=0x130f4bb
github.com/faasflow/sdk/executor.(*FlowExecutor).executeNode(0xc00038bb98, 0xc0002268d0, 0x5, 0x8, 0xc00019a1a8, 0x1841590, 0x1, 0x0, 0x0)
/Users/apple/go/pkg/mod/github.com/faasflow/sdk@v1.0.0/executor/executor.go:350 +0x702 fp=0xc00038b968 sp=0xc00038b820 pc=0x11338f2
github.com/faasflow/sdk/executor.(*FlowExecutor).Execute(0xc00038bb98, 0xc000380a00, 0x14d1ee9, 0xf, 0x18ac040, 0x203000, 0x203000)
/Users/apple/go/pkg/mod/github.com/faasflow/sdk@v1.0.0/executor/executor.go:1266 +0xa8a fp=0xc00038bb00 sp=0xc00038b968 pc=0x113cbba
github.com/faasflow/runtime/controller/handler.ExecuteFlowHandler(0xc00038bcc8, 0xc0001f21e0, 0x15781e0, 0xc000128790, 0x0, 0x0)
/Users/apple/go/pkg/mod/github.com/faasflow/runtime@v0.2.2/controller/handler/execute_flow_handler.go:28 +0x295 fp=0xc00038bc60 sp=0xc00038bb00 pc=0x1388395
github.com/s8sg/goflow/runtime.(*FlowRuntime).handleNewRequest(0xc000168100, 0xc0001f21e0, 0x6, 0xc0001f21e0)
/Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/runtime/flow_runtime.go:204 +0xf2 fp=0xc00038bd08 sp=0xc00038bc60 pc=0x13d1bf2
github.com/s8sg/goflow/runtime.(*FlowRuntime).queueReceiver(0xc000168100, 0xc0000900a0, 0x15, 0xc00037e180, 0x6, 0x6, 0x2554c7bf5, 0x18800a0)
/Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/runtime/flow_runtime.go:178 +0x37f fp=0xc00038bdd8 sp=0xc00038bd08 pc=0x13d195f
github.com/s8sg/goflow/runtime.(*FlowRuntime).queueReceiver-fm(0xc0000900a0, 0x15, 0xc00037e180, 0x6, 0x6, 0xc0001803c0, 0x0)
/Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/runtime/flow_runtime.go:160 +0x5c fp=0xc00038be28 sp=0xc00038bdd8 pc=0x13d391c
github.com/benmanns/goworker.(*worker).run(0xc000208280, 0xc000066100, 0xc0000a2130)
/Users/apple/go/pkg/mod/github.com/benmanns/goworker@v0.1.3/worker.go:154 +0x29f fp=0xc00038bed8 sp=0xc00038be28 pc=0x13852cf
github.com/benmanns/goworker.(*worker).work.func1(0xc0002260a0, 0xc000208280, 0xc0000a41e0)
/Users/apple/go/pkg/mod/github.com/benmanns/goworker@v0.1.3/worker.go:108 +0x12a fp=0xc00038bfc8 sp=0xc00038bed8 pc=0x138708a
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1373 +0x1 fp=0xc00038bfd0 sp=0xc00038bfc8 pc=0x1064091
created by github.com/benmanns/goworker.(*worker).work
/Users/apple/go/pkg/mod/github.com/benmanns/goworker@v0.1.3/worker.go:93 +0x1b4
Process finished with exit code 2
package main
import (
"fmt"
"os"
"time"
"github.com/s8sg/goflow"
flow "github.com/s8sg/goflow/flow"
)
// Workload function
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
fmt.Println("doSomething")
time.Sleep(time.Second)
return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
}
// Define provide definition of the workflow
func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
f.SyncNode().Apply("test", doSomething).Apply("test", doSomething).Apply("test", doSomething).Apply("test", doSomething)
return nil
}
func main() {
os.Setenv("enable_tracing", "true")
for i := 0; i < 10; i++ {
go func() {
time.Sleep(10*time.Second)
fmt.Println("start!")
fs := &goflow.FlowService{
RedisURL: "127.0.0.1:6379",
}
fs.Execute("myflow", &goflow.Request{
Body: []byte("hallo"),
})
}()
}
fs := &goflow.FlowService{
Port: 8080,
RedisURL: "127.0.0.1:6379",
OpenTraceUrl: "127.0.0.1:5775",
WorkerConcurrency: 5,
}
err := fs.Register("myflow", DefineWorkflow)
if err != nil {
panic(err)
}
err = fs.Start()
if err != nil {
panic(err)
}
select {}
}
Maybe TraceHandler's operationSpans
concurrent operate.
Redis might become a single point of failure even if redis-sentinel/cluster is used.
Replacement of redis with https://github.com/hashicorp/raft will remove that single point of failure and enable packaging of goflow into a single binary.
Furthermore, you might find storing and manipulating the DAG easier using the raft protocol as it operates on the basis of replicated state machines.
Currently HTTP server only support execution of the flow with
POST /`flow-name`
GET /`flow-name`
The proposal is to add the support to stop, pause, resume and list flows
The APIS would be
Execute executes the flow, we will change the domain with /flow
prefix
POST /flow/`flow-name`
GET /flow/`flow-name`
Stop stops the flow
POST /flow/`flow-name`/stop
Pause pause execution of the flow
POST /flow/`flow-name`/pause
Resume resumes the execution of the flow
POST /flow/`flow-name`/resume
List lists all the registered flows
POST /flow/list
Hello, I am very very interested in incorporating goflow into one of my projects. I was just wondering if this repo is still active?
package main
import (
"fmt"
"time"
goflow "github.com/s8sg/goflow"
flow "github.com/s8sg/goflow/flow"
)
// Workload function
func doSomething1(data []byte, option map[string][]string) ([]byte, error) {
time.Sleep(10 * time.Second)
fmt.Println(fmt.Sprintf("you said 1 %s", string(data)))
return []byte(fmt.Sprintf("you said 1 "%s"", string(data))), nil
}
func doSomething2(data []byte, option map[string][]string) ([]byte, error) {
time.Sleep(10 * time.Second)
fmt.Println(fmt.Sprintf("you said 2 %s", string(data)))
return []byte(fmt.Sprintf("you said 2 "%s"", string(data))), nil
}
func doSomething3(data []byte, option map[string][]string) ([]byte, error) {
time.Sleep(10 * time.Second)
fmt.Println(fmt.Sprintf("you said 3 %s", string(data)))
return []byte(fmt.Sprintf("you said 3 "%s"", string(data))), nil
}
// Define provide definition of the workflow
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
dag := workflow.Dag()
dag.Node("test1").Apply("test1", doSomething1)
dag.Node("test2").Apply("test2", doSomething2)
dag.Node("test3").Apply("test3", doSomething3)
dag.Edge("test1", "test2")
dag.Edge("test2", "test3")
return nil
}
func main() {
fs := &goflow.FlowService{
Port: 8080,
RedisURL: "localhost:6379",
OpenTraceUrl: "localhost:5775",
WorkerConcurrency: 5,
}
fs.Register("myflow", DefineWorkflow)
fs.StartWorker()
err := fs.Execute("myflow", &goflow.Request{
RequestId: "request1",
Body: []byte{'a', 'a', 'a'},
})
fmt.Println(err)
err = fs.Execute("myflow", &goflow.Request{
RequestId: "request2",
Body: []byte{'b', 'b', 'b'},
})
fmt.Println(err)
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.