Giter Site home page Giter Site logo

s8sg / goflow Goto Github PK

View Code? Open in Web Editor NEW
1.1K 22.0 137.0 4.7 MB

A Golang based high performance, scalable and distributed workflow framework

License: MIT License

Go 41.45% Makefile 0.11% JavaScript 3.63% Dockerfile 0.05% CSS 44.54% SCSS 4.80% HTML 5.43%
workflow-engine workload-automation framework golang workflow distributed-computing

goflow's Issues

Allow redis to be configured with password

package main

import (
	"fmt"

	flow "github.com/s8sg/goflow/flow/v1"
	goflow "github.com/s8sg/goflow/v1"
)

// Workload function
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Printf("hello world\n")
	return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
}

// Define provide definition of the workflow
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
	dag := workflow.Dag()
	dag.Node("test", doSomething)
	return nil
}

func main() {
	fs := &goflow.FlowService{
		Port:              8080,
		RedisURL:          "192.168.1.65:16379", // need redis password 123
		OpenTraceUrl:      "192.168.1.65:5775",
		WorkerConcurrency: 5,
		EnableMonitoring:  true,
	}

	fs.Register("myflow", DefineWorkflow)
	fs.Start()
}

[QUERY] Examples for conditionalBranch

Hey @s8sg
I was checking the framework to build an aggregator service which calls multiple services and combine results received from each one of them. I was wondering if i can use this framework to achieve below use case
lets say i have three nodes arranged in series. I want to store the result of intermediate nodes so that on the basis of result, i would decide if i want to proceed to the next node or not. I saw dag.ConditionalBranch can achieve so by adding list of conditions. my question is how would you parse response of intermediate nodes? How can we fetch that response from previous node is one defined in my conditionList
I would appreciate if you can give more detailed example of ConditionalBranch

Http requests get dropped when trying to send multiple requests

How to reproduce

  1. Running the basic example from the README.
  2. Send a single request using curl -d hello http://localhost:8080/myflow - Everything works
  3. Send multiple requests using for i in {1..50}; do curl -d hello http://localhost:8080/myflow >> output.log & ; done

the following is happening:

2023/04/24 13:39:29 http: panic serving 127.0.0.1:51880: runtime error: invalid memory address or nil pointer dereference                                                                                                                     
goroutine 622 [running]:                                                                                                                                                                                                                      
net/http.(*conn).serve.func1()                                                                                         
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:1854 +0xbf
panic({0x765840, 0xaa66e0})                                
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/runtime/panic.go:890 +0x263
github.com/s8sg/goflow/eventhandler.(*TraceHandler).StopNodeSpan(...)                  
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/eventhandler/trace_handler.go:110
github.com/s8sg/goflow/eventhandler.(*GoFlowEventHandler).ReportNodeEnd(0x79c520?, {0xc00088c910?, 0x7f6d9e1e69a8?}, {0x7f6dc6d6e3c8?, 0xc000051400?})
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/eventhandler/goflow_event_handler.go:57 +0x3d
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).findNextNodeToExecute(0xc000177740)
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/sdk/executor/executor.go:627 +0x4f8
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).Execute(0xc000177740, 0xc0010bd720)
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/sdk/executor/executor.go:1265 +0x9d8
github.com/s8sg/goflow/core/runtime/controller/handler.ExecuteFlowHandler(0xc0011484b0, 0xc0007fc600, {0x885bb8, 0xc000902680})
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/runtime/controller/handler/execute_flow_handler.go:33 +0x265
github.com/s8sg/goflow/runtime.newRequestHandlerWrapper.func1({0x882080, 0xc000d382a0}, 0xc00114c000, {0xc000882d80, 0x1, 0xc000c30b70?})
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/runtime/new_request_handler_wrapper.go:52 +0x6ec
github.com/julienschmidt/httprouter.(*Router).ServeHTTP(0xc00010e1e0, {0x882080, 0xc000d382a0}, 0xc00114c000)
        /home/danash/go/pkg/mod/github.com/julienschmidt/[email protected]/router.go:387 +0x81c
net/http.serverHandler.ServeHTTP({0x880de8?}, {0x882080, 0xc000d382a0}, 0xc00114c000)  
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:2936 +0x316
net/http.(*conn).serve(0xc0002a8e10, {0x8822d8, 0xc0001181e0})              
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:1995 +0x612
created by net/http.(*Server).Serve                                                                                    
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:3089 +0x5ed
2023/04/24 13:39:29 Reporting span 338343586c9e519f:62cdaa3f404a5f84:6cad4d79616ee4e9:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:6cad4d79616ee4e9:338343586c9e519f:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:7818de4473a991b2:338343586c9e519f:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:354240b2e50c0ada:6cad4d79616ee4e9:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:6cad4d79616ee4e9:338343586c9e519f:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:6cad4d79616ee4e9:338343586c9e519f:1
2023/04/24 13:39:29 Executing flow myflow                                                                              
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:38cef6c7b9bc1643:16ba1e6c69d0d230:1
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:06ac2ac89bb22560:2cec2b7378dfba32:1
2023/04/24 13:39:29 http: panic serving 127.0.0.1:51896: [request `ch35ooasig6serk31o8g`] Pipeline is not active
goroutine 639 [running]:                                                                                               
net/http.(*conn).serve.func1()                                                                                         
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:1854 +0xbf
panic({0x747500, 0xc000292900})                                                                                        
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/runtime/panic.go:890 +0x263
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).executeNode(0xc0008f5740, {0xc0011c0200, 0x5, 0x200})
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/sdk/executor/executor.go:346 +0x645
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).Execute(0xc0008f5740, 0xc000a49720)
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/sdk/executor/executor.go:1257 +0x8cb
github.com/s8sg/goflow/core/runtime/controller/handler.ExecuteFlowHandler(0xc00047c2d0, 0xc000181c80, {0x885bb8, 0xc0009b8000})
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/runtime/controller/handler/execute_flow_handler.go:33 +0x265
github.com/s8sg/goflow/runtime.newRequestHandlerWrapper.func1({0x882080, 0xc0011bc0e0}, 0xc000454000, {0xc000cfe080, 0x1, 0xc000a49b70?})
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/runtime/new_request_handler_wrapper.go:52 +0x6ec
github.com/julienschmidt/httprouter.(*Router).ServeHTTP(0xc00010e1e0, {0x882080, 0xc0011bc0e0}, 0xc000454000)
        /home/danash/go/pkg/mod/github.com/julienschmidt/[email protected]/router.go:387 +0x81c
net/http.serverHandler.ServeHTTP({0x880de8?}, {0x882080, 0xc0011bc0e0}, 0xc000454000)
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:2936 +0x316
net/http.(*conn).serve(0xc0003223f0, {0x8822d8, 0xc0001181e0})
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:1995 +0x612
created by net/http.(*Server).Serve
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:3089 +0x5ed
2023/04/24 13:39:29 debug logging disabled
2023/04/24 13:39:29 Initializing logging reporter
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:2cec2b7378dfba32:16ba1e6c69d0d230:1
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:37ed544c17db1cb0:2cec2b7378dfba32:1
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:2cec2b7378dfba32:16ba1e6c69d0d230:1
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:1b7dd13c2c4c0da7:0000000000000000:1
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:027da292581982b2:0d43717ff3b76e9f:1
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:1b7dd13c2c4c0da7:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:0d43717ff3b76e9f:1b7dd13c2c4c0da7:1
2023/04/24 13:39:29 debug logging disabled
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:1b7dd13c2c4c0da7:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:1b7dd13c2c4c0da7:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:13e86d6139141d0a:79fe1508dadda560:1
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:79fe1508dadda560:5983b20c54d0ee71:1
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:5983b20c54d0ee71:0000000000000000:1
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:5983b20c54d0ee71:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:5983b20c54d0ee71:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:5983b20c54d0ee71:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored

By implementing #46 we might achieve a more robustness handling for requests

branch permissions.

jozoppi@DESKTOP-T9826C8:/projects/goflow$ git push --set-upstream origin sample_flows
remote: Permission to s8sg/goflow.git denied to giorgiozoppi.
fatal: unable to access 'https://github.com/s8sg/goflow/': The requested URL returned error: 403
jozoppi@DESKTOP-T9826C8:
/projects/goflow$

[Feature] Dynamically update workflows

Hi @s8sg ,

We are using goflow to build an online system. The main use case is to orchestrate different tasks together like workflows.
For example, initially user has a workflow like

Node-A -> Node-B

But the user now want to modify and changed to

Node-A -> Node-C -> Node-B

Currently, however, after registering workflows, the DAG cannot be changed.

(Of course, we can restart the flow service to achieve the reload)

So I try to contribute in this PR #64

[Question] How to restart a flow from where it stopped

I have a flow with multiple nodes: 1 -> 2 -> 3 -> 4. When this flow is executed, Node 1 done, Node 2 done, Node 3 runs for a long time but eventually forces a stop due to the application shutting down. However, when I restart the application, I want the flow to restart from Node 3, but it does not. How can I achieve this?

[Code Quality] Abstract Queue Worker

Queue Worker is mainly used to handle async operations.
Queue working internally uses RMQ - which internally uses Redis

The goal is to abstract the queue-worker in a way so that all RMQ specific details gets abstracted into Queue Worker interface
QueueWorker in theory should abstract

  1. Enqueue Workload Tasks (WorkloadRequest)
  2. Enqueue Flow Operations Tasks (NewRequest, PartialRequest, Stop, Pause, Resume)
  3. Consume handler func(task *Task) error for NewRequest, PartialRequest, Stop, Pause, Resume, WorkloadResponse

Is there any example to get started with goflow?

I tried to implement your library. But I can't for Creating More Complex DAG examples as mentioned on READ.md.

I tried following but the functions never executed:

package main

import (
	"fmt"
	"github.com/faasflow/goflow"
	flow "github.com/faasflow/lib/goflow"
	"log"
)

// Workload function
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
	log.Println(fmt.Sprintf("you said \"%s\"", string(data)))
	return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
}

// Workload function
func loadProfile(data []byte, option map[string][]string) ([]byte, error) {
	log.Println(fmt.Sprintf("load profile \"%s\"", string(data)))
	return []byte(fmt.Sprintf("load profile \"%s\"", string(data))), nil
}
// Workload function
func getPresignedURLForImage(data []byte, option map[string][]string) ([]byte, error) {
	log.Println(fmt.Sprintf("image url \"%s\"", string(data)))
	return []byte(fmt.Sprintf("image url \"%s\"", string(data))), nil
}
// Workload function
func detectFace(data []byte, option map[string][]string) ([]byte, error) {
	log.Println(fmt.Sprintf("detect face \"%s\"", string(data)))
	return []byte(fmt.Sprintf("detect face \"%s\"", string(data))), nil
}
// Workload function
func markProfileBasedOnStatus(data []byte, option map[string][]string) ([]byte, error) {
	log.Println(fmt.Sprintf("mask profile \"%s\"", string(data)))
	return []byte(fmt.Sprintf("mask profile \"%s\"", string(data))), nil
}

// Define provide definition of the workflow
func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
	dag := f.Dag()
	dag.Node("get-kyc-image").Apply("load-profile", loadProfile).
		Apply("get-image-url", getPresignedURLForImage).
		Apply("face-detect", detectFace).
		Apply("mark-profile", markProfileBasedOnStatus)
	dag.Edge("get-kyc-image", "face-detect")
	dag.Edge("face-detect", "mark-profile")
	return nil
}

func main() {
	fs := &goflow.FlowService{
		Port:                8080,
		RedisURL:            "localhost:6379",
		OpenTraceUrl:        "localhost:5775",
		WorkerConcurrency:   5,
	}
	fs.Start("myflow", DefineWorkflow)
}

Export workflows graph

Hi,

Is there any way to show the workflows graphically? same as the photos used in the documentation, or anything else...

Thank you

The latest release does not match github webpage's README.md that causes "does not contain package" error

Firstly, thank you for offering us a very useful tool to build an efficient workflow architecture.

However, when I'm using your github page's README.md's introduction, I encountered this problem:

github.com/s8sg/goflow/flow/v1: module github.com/s8sg/goflow@latest found (v0.1.0), but does not contain package github.com/s8sg/goflow/flow/v1
github.com/s8sg/goflow/v1: module github.com/s8sg/goflow@latest found (v0.1.0), but does not contain package github.com/s8sg/goflow/v1

in my golang project's import, I write this:

import (
	goflow "github.com/s8sg/goflow/v1"
	flow "github.com/s8sg/goflow/flow/v1"
)

just the same as your github webpage's README.md write.

So, I went into my local go path where your src code downloaded to find out what happend. In my directory, I found the source code I downloaded is v0.1.0 which is 2020's version and it does not match your latest README instruction. Apparently, the latest release on Github is outdated. So, could you update a latest release that can match your latest README.md?

[Discussion] Workflow rollback support

in my case, i want to use goflow to create some resources on a public cloud. like creating a cloud vm with public ip.

  1. create a vm using cloud openapi
  2. wait the status of vm is running
  3. create a public ip using cloud openapi
  4. wait the status of public ip is available
  5. bind the public ip with vm
  6. other steps

i want

  1. if I cancel the workflow(like run flow.cancel()), workflow can free the created resources(vm,public ip etc)
  2. if one step failed, workflow can free the created resources(vm,public ip etc)

Installation problems

After installing goflow as explained in the guide, I have the following problem:

$ go build -o goflow
flow.go:6:2: no required module provides package github.com/s8sg/goflow/flow/v1; to add it:
        go get github.com/s8sg/goflow/flow/v1
flow.go:7:2: no required module provides package github.com/s8sg/goflow/v1; to add it:
        go get github.com/s8sg/goflow/v1

I try to solve it with the indicated commands but it does not work.

$ go get github.com/s8sg/goflow/flow/v1
go: module github.com/s8sg/goflow@upgrade found (v0.1.0), but does not contain package github.com/s8sg/goflow/flow/v1

Sorry in advance if this is something silly, I am new to learning golang.

Expose DAG validation in workflow layer

Hi there,

When we build an online system to create/modify workflows for users, we found it is neccessary to validate whether the DAG created by users is valid. And there is a simple way to implement the Validate() in workflow layer. #71

Thanks!

Allow to schedule a node as kubernetes job.

In the DAG asynchronously we're creating a monolith structure when n the Distributed ETL use case it will be nice to have a flag to distribute the node of a graph as job, doing effectively a pipeline effect using redis queuing mechanism.
Suppose we have a DAG:

  1. A -> B ->C
  2. The user trigger the dag from A,
  3. the triggering event schedule 3 types jobs A, B, C to be executed in the kubernetes cluster wherease each job type has a cardinality (number of pods associated)

This needs to be evalutated in design meeting and follow up discussion and check what currentely kubeflow does.

Pause, Resume and Stop don't work fine

Expect:
Pause can pause the specified workflow, and Resume can resume it.
Stop can stop active workflow and set its status to Finished.

Actually:

  1. After running Pasue, the workflow remains executed until the end.
...
2022/09/09 17:12:45 [request `my_test_flow-1662714755`] intermediate result from node 0_1_Node-1 to 0_2_Node-2 stored as 0_1_Node-1--0_2_Node-2
2022/09/09 17:12:45 [request `my_test_flow-1662714755`] performing request for Node 0_2_Node-2, indegree count is 1
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] request submitted for Node 0_2_Node-2
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] intermediate result from Node 0_1_Node-1 to Node 0_2_Node-2 retrieved from 0_1_Node-1--0_2_Node-2
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] executing node 0_2_Node-2
Test Node: O_O "TestData"
2022/09/09 17:12:47 Pausing request my_test_flow-1662714755 of flow my_test_flow
Node End
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] completed execution of node 0_2_Node-2
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] intermediate result from node 0_2_Node-2 to 0_3_Node-3 stored as 0_2_Node-2--0_3_Node-3
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] performing request for Node 0_3_Node-3, indegree count is 1
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] Request is paused, storing partial state for node: 0_3_Node-3
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] request submitted for Node 0_3_Node-3
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] intermediate result from Node 0_2_Node-2 to Node 0_3_Node-3 retrieved from 0_2_Node-2--0_3_Node-3
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] executing node 0_3_Node-3
Test Node: O_O "O_O "TestData""
Node End
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] completed execution of node 0_3_Node-3
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] intermediate result from node 0_3_Node-3 to 0_4_Node-4 stored as 0_3_Node-3--0_4_Node-4
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] performing request for Node 0_4_Node-4, indegree count is 1
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] Request is paused, storing partial state for node: 0_4_Node-4
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] request submitted for Node 0_4_Node-4
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] intermediate result from Node 0_3_Node-3 to Node 0_4_Node-4 retrieved from 0_3_Node-3--0_4_Node-4
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] executing node 0_4_Node-4
...
  1. Panic when exec Stop :
2022/09/09 17:15:03 Pausing request my_test_flow for flow my_test_flow-1662714890
Node End
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] completed execution of node 0_2_Node-2
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] failed to obtain pipeline state, error failed to get key core.my_test_flow.my_test_flow-1662714890.request-state, nil
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] pipeline is not active
panic: [request `my_test_flow-1662714890`] Pipeline is not active

goroutine 30 [running]:
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).findNextNodeToExecute(0xc0002c7c78)
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/sdk/executor/executor.go:609 +0x614
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).Execute(0xc0002c7c78, 0xc0002c7c58)
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/sdk/executor/executor.go:1266 +0x9c8
github.com/s8sg/goflow/core/runtime/controller/handler.PartialExecuteFlowHandler(0xc0002c7dd0, 0xc00028ecc0?, {0x14771f8, 0xc0002a6410})
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/runtime/controller/handler/partial_execute_flow_handler.go:25 +0xe5
github.com/s8sg/goflow/runtime.(*FlowRuntime).handlePartialRequest(0xc00012af70, 0xc00028ecc0)
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:457 +0x1c6
github.com/s8sg/goflow/runtime.(*FlowRuntime).handleRequest(0xc0002e6380?, 0xc0002e21e0?, {0xc0002a43b0?, 0x13260a0?})
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:414 +0xc8
github.com/s8sg/goflow/runtime.(*FlowRuntime).Consume(0xc00012af70, {0x1473a98, 0xc0002320e0})
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:394 +0x2ae
github.com/adjust/rmq/v4.(*redisQueue).consumerConsume(0xc0001d4000, {0x14701a0, 0xc00012af70})
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/adjust/rmq/v4/queue.go:282 +0x9f
created by github.com/adjust/rmq/v4.(*redisQueue).AddConsumer
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/adjust/rmq/v4/queue.go:260 +0xe5

Here is the test code:

package main

import (
	"flag"
	"fmt"
	"time"

	flow "github.com/s8sg/goflow/flow/v1"
	goflow "github.com/s8sg/goflow/v1"
)

const (
	flowName = "my_test_flow"
)

var (
	server = flag.Bool("s", false, "start server")
	op     = flag.String("o", "exec", "operation")
	rid    = flag.String("i", "", "request id")
)

func workload(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Printf("Test Node: %s\n", string(data))
	time.Sleep(10 * time.Second)
	fmt.Println("Node End")
	return []byte(fmt.Sprintf("O_O \"%s\"", string(data))), nil
}

func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
	dag := workflow.Dag()
	dag.Node("Node-1", workload)
	dag.Node("Node-2", workload)
	dag.Node("Node-3", workload)
	dag.Node("Node-4", workload)
	dag.Node("Node-5", workload)
	dag.Node("Node-6", workload)
	dag.Node("Node-7", workload)
	dag.Node("Node-8", workload)
	dag.Edge("Node-1", "Node-2")
	dag.Edge("Node-2", "Node-3")
	dag.Edge("Node-3", "Node-4")
	dag.Edge("Node-4", "Node-5")
	dag.Edge("Node-5", "Node-6")
	dag.Edge("Node-6", "Node-7")
	dag.Edge("Node-7", "Node-8")
	return nil
}

func startServer() {
	fs := &goflow.FlowService{
		Port:              8088,
		RedisURL:          "127.0.0.1:6379",
		WorkerConcurrency: 2,
		RetryCount:        1,
		DebugEnabled:      true,
	}
	err := fs.Register(flowName, DefineWorkflow)
	if err != nil {
		panic(err)
	}

	err = fs.Start()
	if err != nil {
		panic(err)
	}
}

func ExecOp(op string) string {
	fs := &goflow.FlowService{
		RedisURL: "127.0.0.1:6379",
	}

	reqId := *rid
	if op == "exec" {
		reqId = fmt.Sprintf("%s-%d", flowName, time.Now().Unix())
	}
	if len(reqId) == 0 {
		panic("request id is empty")
	}

	switch op {
	case "exec":
		err := fs.Execute(flowName, &goflow.Request{
			Body:      []byte("TestData"),
			RequestId: reqId,
		})
		if err != nil {
			panic(err)
		}
		return reqId

	case "pause":
		err := fs.Pause(flowName, reqId)
		if err != nil {
			panic(err)
		}
	case "resume":
		err := fs.Resume(flowName, reqId)
		if err != nil {
			panic(err)
		}
	case "stop":
		err := fs.Stop(flowName, reqId)
		if err != nil {
			panic(err)
		}
	}

	return reqId
}

func main() {
	flag.Parse()

	if *server {
		startServer()
		return
	}

	reqId := ExecOp(*op)
	fmt.Println(reqId)
}

Start the server:

./wf -s

Client:

./wf -o exec 
./wf -o pause -i [request_id]
./wf -o resume -i [request_id]
./wf -o stop -i [request_id]

[GoFlow-4] Allow GoFlow workload to be completely independent to the flow

Currently Flow and Workload are tightly coupled as workload are defined inside of a flow definition
This has a few drawbacks

  1. You can't scale only a specific workload
  2. Changing a workload will touch the whole flow definition
  3. Workload can't be shared across other flows

To solve this issue GoFlow will support a special kind of Operation called ApplyExternal() like Apply()
It will take a unique ID of the workload that need to be called

Node('n2').ApplyExternal('task1')

To define a workload user need to use the GoFlow Library and Register a Workload using the unique ID

fs.Register('task1', PerformTask1)
fs.StartWorkload(['myflow']) // start workload will take the list of flows

Workload will have the same signature

func PerformTask1(data []byte, option map[string][]string) ([]byte, error) {
  ...
}

There is no option to pass the redis client into goflow struct

When initializing GoFlow with all the necessary configurations, I encountered an issue with passing the Redis client. In the GoFlow struct, there's only an option to provide the RedisClient:

fs := &goflow.FlowService{
RedisURL: redisUrl,
WorkerConcurrency: 5,
}
However, when attempting to include SSL configuration in the Redis URL, I encountered the following error message:

failed to initiate connection, error dial tcp: address .too many colons in address

The sample Redis URL used was: "rediss://" + options.Username + ":" + options.Password + "@" + options.Addr + "/0?ssl_cert_reqs=None"

Further investigation is required to resolve this problem and successfully pass the SSL configuration to the go flow during initialization.

[Code Quality] Abstract the flow Registry

Currently we are using redis to register

  • Flows
  • Worker
  • Workload (In Progress)

Code is currently scattered across runtime like this

The goal is to obstruct the RDB into a flow registry
We can initialise the FlowRegistry at the same time when we are initialising the RDB

Nice framework

I have been contemplating ideas around a workflow library for some time, this looks good. I think one question I'd have is, what would a Flow interface look like? We want to define that abstraction potentially in Micro as a way of doing orchestration. Once you have enough services this makes sense.

[Question] How to set special redis db

Here is FlowService struct

type FlowService struct {
	Port                    int
	RedisURL                string
	RedisPassword           string
	RequestAuthSharedSecret string
	RequestAuthEnabled      bool
	WorkerConcurrency       int
	RetryCount              int
	Flows                   map[string]runtime.FlowDefinitionHandler
	RequestReadTimeout      time.Duration
	RequestWriteTimeout     time.Duration
	OpenTraceUrl            string
	DataStore               sdk.DataStore
	Logger                  sdk.Logger
	EnableMonitoring        bool
	DebugEnabled            bool

	runtime *runtime.FlowRuntime
}

I want to connect to a specifically redis db, but FlowService don't have a field like RedisDB

[enhancement]: flow events

It would be awesome to expose flow events via registering event hooks to flow library.

We would like to know when our flow state change: started, completed, failed, etc.

We would like to add missing features by contributing to this library if you accept it ๐Ÿ™

[Experimental] GoFlow DSL support

GoFlow DSL would allow user to define a workflow as a DSL. DSL can be a simple yaml file

The expectation is:

  1. Aligned to go flow domain
  2. Provide existing go flow construct and readable
  3. Doesn't need compilation of the executor for flow changes
  4. Allow go flow workload to be completely independent of go flow (#4)

codahale/hdrhistogram repo url has been transferred under the github HdrHstogram umbrella

Problem

The codahale/hdrhistogram repo has been transferred under the github HdrHstogram umbrella with the help from the original author in Sept 2020 (new repo url https://github.com/HdrHistogram/hdrhistogram-go). The main reasons are to group all implementations under the same roof and to provide more active contribution from the community as the original repository was archived several years ago.

The dependency URL should be modified to point to the new repository URL. The tag "v0.9.0" was applied at the point of transfer and will reflect the exact code that was frozen in the original repository.

If you are using Go modules, you can update to the exact point of transfer using the @v0.9.0 tag in your go get command.

go mod edit -replace github.com/codahale/hdrhistogram=github.com/HdrHistogram/[email protected]

Performance Improvements

From the point of transfer, up until now (mon 16 aug 2021), we've released 3 versions that aim support the standard HdrHistogram serialization/exposition formats, and deeply improve READ performance.
We recommend to update to the latest version.

[GOFLOW-30] Support for a UI dag builder

Goflow DAG builder: Provide a UI DAG builder

Goal

The goal of this story is to make it easier to build dag from a data engineer prospective. A data engineer shall not be able to
be proficent in Go and basically we want to provide an UI where he/she is able to draw a dag. We want to leverage the https://github.com/AlexImb/vue-dag library to obtain that scope.

Requirement of the story:

Functional requirement

As end user i want to:

  1. Create a graph by the UI.
  2. Select for each node the workflow code and edit that.
  3. Save/Resume the workflow

Tecnical requirements:

So the scope of this story is:
1 . Create a UI that is able to draw/save a graph and schedule its execution.
2.. Provide a REST api that is able to submit a transpiled graph.

Example program needs to be updated

When I try to run the sample program provided, it gives the following errors
./main.go:17:14: cannot use doSomething (type func([]byte, map[string][]string) ([]byte, error)) as type flow.BranchOption in argument to dag.Node

I tried to execute this on "The Go Playground", but it doesn't execute there.

We would like to explore goflow for one of the requirement. Could you please help here

[Feature] Allow to register new flow dynamically

[Feature] Support dynamically appending flows after the flow service started.
Currently, it is only possible to register a workflow before starting the flow service. If you need to register a new workflow after starting the flow service, a restart is required. We need to provide a more flexible solution in the future.

[Question] How to run workflow without middleware independency

It's good for triggering workflow by api. But sometimes i wanna run my workflow in command mode. I wanna keep my workflow definition.

package amin

import (
	"fmt"
	flow "github.com/s8sg/goflow/flow/v1"
)

// Workload function
func node1(data []byte, option map[string][]string) ([]byte, error) {
	result := fmt.Sprintf("(Executing node 1 with data (%s))", string(data))
	fmt.Println(result)
	return []byte(result), nil
}

// DefineWorkflow Define provide definition of the workflow
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
	dag := workflow.Dag()
	dag.Node("node1", node1)
	return nil
}

then run this workflow in main

workflow.Run("my-workflow")

[enhancement]: support route params and adding middlewares

We would like to add custom middleware to the internal http server. Also if we can pass query parameters to the endpoints, we can create more generalized flows.

Goflow library can expose its internal server and handlers by sane defaults, hence users can overwrite it.

[bug] when enable tracing support, I find that there are concurrent operations with map

error message

fatal error: concurrent map writes
fatal error: concurrent map writes

goroutine 67 [running]:
runtime.throw(0x14d49a2, 0x15)
        /usr/local/go/src/runtime/panic.go:1116 +0x72 fp=0xc00038b6e0 sp=0xc00038b6b0 pc=0x10343f2
runtime.mapassign_faststr(0x1439380, 0xc0000df950, 0x14cdfa3, 0x4, 0x1)
        /usr/local/go/src/runtime/map_faststr.go:291 +0x3de fp=0xc00038b748 sp=0xc00038b6e0 pc=0x1014f5e
github.com/s8sg/goflow/eventhandler.(*TraceHandler).StartOperationSpan(0xc0002e2b40, 0xc000227090, 0x8, 0xc000376800, 0x14, 0x14cdfa3, 0x4)
        /Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/eventhandler/trace_handler.go:124 +0x223 fp=0xc00038b7d8 sp=0xc00038b748 pc=0x13107d3
github.com/s8sg/goflow/eventhandler.(*FaasEventHandler).ReportOperationStart(0xc00009c100, 0x14cdfa3, 0x4, 0xc000227090, 0x8, 0xc000376800, 0x14)
        /Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/eventhandler/faas_event_handler.go:66 +0x6b fp=0xc00038b820 sp=0xc00038b7d8 pc=0x130f4bb
github.com/faasflow/sdk/executor.(*FlowExecutor).executeNode(0xc00038bb98, 0xc0002268d0, 0x5, 0x8, 0xc00019a1a8, 0x1841590, 0x1, 0x0, 0x0)
        /Users/apple/go/pkg/mod/github.com/faasflow/sdk@v1.0.0/executor/executor.go:350 +0x702 fp=0xc00038b968 sp=0xc00038b820 pc=0x11338f2
github.com/faasflow/sdk/executor.(*FlowExecutor).Execute(0xc00038bb98, 0xc000380a00, 0x14d1ee9, 0xf, 0x18ac040, 0x203000, 0x203000)
        /Users/apple/go/pkg/mod/github.com/faasflow/sdk@v1.0.0/executor/executor.go:1266 +0xa8a fp=0xc00038bb00 sp=0xc00038b968 pc=0x113cbba
github.com/faasflow/runtime/controller/handler.ExecuteFlowHandler(0xc00038bcc8, 0xc0001f21e0, 0x15781e0, 0xc000128790, 0x0, 0x0)
        /Users/apple/go/pkg/mod/github.com/faasflow/runtime@v0.2.2/controller/handler/execute_flow_handler.go:28 +0x295 fp=0xc00038bc60 sp=0xc00038bb00 pc=0x1388395
github.com/s8sg/goflow/runtime.(*FlowRuntime).handleNewRequest(0xc000168100, 0xc0001f21e0, 0x6, 0xc0001f21e0)
        /Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/runtime/flow_runtime.go:204 +0xf2 fp=0xc00038bd08 sp=0xc00038bc60 pc=0x13d1bf2
github.com/s8sg/goflow/runtime.(*FlowRuntime).queueReceiver(0xc000168100, 0xc0000900a0, 0x15, 0xc00037e180, 0x6, 0x6, 0x2554c7bf5, 0x18800a0)
        /Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/runtime/flow_runtime.go:178 +0x37f fp=0xc00038bdd8 sp=0xc00038bd08 pc=0x13d195f
github.com/s8sg/goflow/runtime.(*FlowRuntime).queueReceiver-fm(0xc0000900a0, 0x15, 0xc00037e180, 0x6, 0x6, 0xc0001803c0, 0x0)
        /Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/runtime/flow_runtime.go:160 +0x5c fp=0xc00038be28 sp=0xc00038bdd8 pc=0x13d391c
github.com/benmanns/goworker.(*worker).run(0xc000208280, 0xc000066100, 0xc0000a2130)
        /Users/apple/go/pkg/mod/github.com/benmanns/goworker@v0.1.3/worker.go:154 +0x29f fp=0xc00038bed8 sp=0xc00038be28 pc=0x13852cf
github.com/benmanns/goworker.(*worker).work.func1(0xc0002260a0, 0xc000208280, 0xc0000a41e0)
        /Users/apple/go/pkg/mod/github.com/benmanns/goworker@v0.1.3/worker.go:108 +0x12a fp=0xc00038bfc8 sp=0xc00038bed8 pc=0x138708a
runtime.goexit()
        /usr/local/go/src/runtime/asm_amd64.s:1373 +0x1 fp=0xc00038bfd0 sp=0xc00038bfc8 pc=0x1064091
created by github.com/benmanns/goworker.(*worker).work
        /Users/apple/go/pkg/mod/github.com/benmanns/goworker@v0.1.3/worker.go:93 +0x1b4


Process finished with exit code 2

test code

package main

import (
	"fmt"
	"os"
	"time"

	"github.com/s8sg/goflow"
	flow "github.com/s8sg/goflow/flow"
)

// Workload function
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Println("doSomething")
	time.Sleep(time.Second)
	return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
}

// Define provide definition of the workflow
func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
	f.SyncNode().Apply("test", doSomething).Apply("test", doSomething).Apply("test", doSomething).Apply("test", doSomething)
	return nil
}

func main() {
	os.Setenv("enable_tracing", "true")
	for i := 0; i < 10; i++ {
		go func() {
			time.Sleep(10*time.Second)
			fmt.Println("start!")
			fs := &goflow.FlowService{
				RedisURL: "127.0.0.1:6379",
			}
			fs.Execute("myflow", &goflow.Request{
				Body: []byte("hallo"),
			})
		}()
	}
	fs := &goflow.FlowService{
		Port:              8080,
		RedisURL:          "127.0.0.1:6379",
		OpenTraceUrl:      "127.0.0.1:5775",
		WorkerConcurrency: 5,
	}
	err := fs.Register("myflow", DefineWorkflow)
	if err != nil {
	    panic(err)
	}
	err = fs.Start()
	if err != nil {
		panic(err)
	}
	
	select {}
}

Maybe TraceHandler's operationSpans concurrent operate.

Replace redis with raft consensus

Redis might become a single point of failure even if redis-sentinel/cluster is used.
Replacement of redis with https://github.com/hashicorp/raft will remove that single point of failure and enable packaging of goflow into a single binary.
Furthermore, you might find storing and manipulating the DAG easier using the raft protocol as it operates on the basis of replicated state machines.

[Feature] Add support in server to stop, pause, resume and list flow

Currently HTTP server only support execution of the flow with

POST /`flow-name`
GET /`flow-name`

The proposal is to add the support to stop, pause, resume and list flows
The APIS would be

Execute

Execute executes the flow, we will change the domain with /flow prefix

POST /flow/`flow-name` 
GET /flow/`flow-name`

Stop

Stop stops the flow

POST /flow/`flow-name`/stop 

Pause

Pause pause execution of the flow

POST /flow/`flow-name`/pause

Resume

Resume resumes the execution of the flow

POST /flow/`flow-name`/resume

List

List lists all the registered flows

POST /flow/list

Project Activity?

Hello, I am very very interested in incorporating goflow into one of my projects. I was just wondering if this repo is still active?

Server will panic if use "time.sleep()" in node func

package main

import (
"fmt"
"time"

goflow "github.com/s8sg/goflow"
flow "github.com/s8sg/goflow/flow"
)

// Workload function
func doSomething1(data []byte, option map[string][]string) ([]byte, error) {
time.Sleep(10 * time.Second)
fmt.Println(fmt.Sprintf("you said 1 %s", string(data)))
return []byte(fmt.Sprintf("you said 1 "%s"", string(data))), nil
}

func doSomething2(data []byte, option map[string][]string) ([]byte, error) {
time.Sleep(10 * time.Second)
fmt.Println(fmt.Sprintf("you said 2 %s", string(data)))
return []byte(fmt.Sprintf("you said 2 "%s"", string(data))), nil
}

func doSomething3(data []byte, option map[string][]string) ([]byte, error) {
time.Sleep(10 * time.Second)
fmt.Println(fmt.Sprintf("you said 3 %s", string(data)))
return []byte(fmt.Sprintf("you said 3 "%s"", string(data))), nil
}

// Define provide definition of the workflow
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
dag := workflow.Dag()
dag.Node("test1").Apply("test1", doSomething1)
dag.Node("test2").Apply("test2", doSomething2)
dag.Node("test3").Apply("test3", doSomething3)
dag.Edge("test1", "test2")
dag.Edge("test2", "test3")
return nil
}

func main() {
fs := &goflow.FlowService{
Port: 8080,
RedisURL: "localhost:6379",
OpenTraceUrl: "localhost:5775",
WorkerConcurrency: 5,
}
fs.Register("myflow", DefineWorkflow)
fs.StartWorker()
err := fs.Execute("myflow", &goflow.Request{
RequestId: "request1",
Body: []byte{'a', 'a', 'a'},
})
fmt.Println(err)
err = fs.Execute("myflow", &goflow.Request{
RequestId: "request2",
Body: []byte{'b', 'b', 'b'},
})
fmt.Println(err)
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.