Giter Site home page Giter Site logo

s8sg / goflow Goto Github PK

View Code? Open in Web Editor NEW
1.1K 22.0 128.0 4.7 MB

A Golang based high performance, scalable and distributed workflow framework

License: MIT License

Go 41.45% Makefile 0.11% JavaScript 3.63% Dockerfile 0.05% CSS 44.54% SCSS 4.80% HTML 5.43%
workflow-engine workload-automation framework golang workflow distributed-computing

goflow's Introduction

Go-Flow   Tweet

GoDoc Build Go Report Card License: MIT

Gopher staring_at flow

A Golang based high performance, scalable and distributed workflow framework

It allows to programmatically author distributed workflow as Directed Acyclic Graph (DAG) of tasks. GoFlow executes your tasks on an array of workers by uniformly distributing the loads

Stability and Compatibility

Status: The library is currently undergoing heavy development with frequent, breaking API changes.

☝️ Important Note: Current major version is zero (v0.x.x) to accommodate rapid development and fast iteration. The public API could change without a major version update before v1.0.0 release.

Install It

Install GoFlow

go mod init myflow
go get github.com/s8sg/goflow@master

Write First Flow

Library to Build Flow github.com/s8sg/goflow/flow/v1

GoDoc

Make a flow.go file

package main

import (
	"fmt"
	goflow "github.com/s8sg/goflow/v1"
	flow "github.com/s8sg/goflow/flow/v1"
)

// Workload function
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
	return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
}

// Define provide definition of the workflow
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
    dag := workflow.Dag()
    dag.Node("test", doSomething)
    return nil
}

func main() {
    fs := &goflow.FlowService{
        Port:                8080,
        RedisURL:            "localhost:6379",
        OpenTraceUrl:        "localhost:5775",
        WorkerConcurrency:   5,
        EnableMonitoring:    true,
    }
    fs.Register("myflow", DefineWorkflow)
    fs.Start()
}

Start() runs a HTTP Server that listen on the provided Port. It also runs a flow worker that handles the workload

Run It

Start goflow stack

docker-compose up

This will start the required services

  • redis
  • jaeger
  • dashboard

Run the Flow

go build -o goflow
./goflow

Invoke It

Using curl

curl -d hallo localhost:8080/flow/myflow

Using Client

Using the goflow client you can request the flow directly. The requests are always async and gets queued for the workers to pick up

fs := &goflow.FlowService{
    RedisURL: "localhost:6379",
}
fs.Execute("myflow", &goflow.Request{
    Body: []byte("hallo")
})

Using Dashboard

Dashboard visualize the flow and provides observability Dashboard

Scale It

GoFlow scale horizontally, you can distribute the load by just adding more instances

Worker Mode

Alternatively you can start your GoFlow in worker mode. As a worker, GoFlow only handles the workload instead of running an HTTP server. If required you can only scale the workers

fs := &goflow.FlowService{
    RedisURL:            "localhost:6379",
    OpenTraceUrl:        "localhost:5775",
    WorkerConcurrency:   5,
}
fs.Register("myflow", DefineWorkflow)
fs.StartWorker()

Register Multiple Flow

Register() allows user to bind multiple flows onto single flow service. This way one instance of server/worker can be used for more than one flows

fs.Register("createUser", DefineCreateUserFlow)
fs.Register("deleteUser", DefineDeleteUserFlow)

Creating More Complex DAG

The initial example is a single vertex DAG. Single vertex DAG are great for synchronous task

Using GoFlow's DAG construct one can achieve more complex compositions with multiple vertexes and connect them using edges.

Multi Nodes

A multi-vertex flow is always asynchronous in nature where each nodes gets distributed across the workers

Below is an example of a simple multi vertex flow to validate a KYC image of a user and mark the user according to the result. This is a asynchronous flow with three steps Async Flow

func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    dag := f.Dag()
    dag.Node("get-kyc-image", getPresignedURLForImage)
    dag.Node("face-detect", detectFace)
    dag.Node("mark-profile", markProfileBasedOnStatus)
    dag.Edge("get-kyc-image", "face-detect")
    dag.Edge("face-detect", "mark-profile")
    return nil
}

Branching

Branching are great for parallelizing independent workloads in separate branches

Branching can be achieved with simple vertex and edges. GoFlow provides a special operator Aggregator to aggregate result of multiple branch on a converging node

We are extending our earlier example to include a new requirement to match the face with existing data and we are performing the operation in parallel to reduce time Branching

func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    dag := f.Dag()
    dag.Node("get-kyc-image", getPresignedURLForImage)
    dag.Node("face-detect", detectFace)
    dag.Node("face-match", matchFace)
    // Here mark-profile depends on the result from face-detect and face-match, 
    // we are using a aggregator to create unified results
    dag.Node("mark-profile", markProfileBasedOnStatus, flow.Aggregator(func(responses map[string][]byte) ([]byte, error) {
       status := validateResults(responses["face-detect"],  responses["face-match"])
       return []byte(status), nil
    }))
    dag.Edge("get-kyc-image", "face-detect")
    dag.Edge("get-kyc-image", "face-match")
    dag.Edge("face-detect", "mark-profile")
    dag.Edge("face-match", "mark-profile")
    return nil
}

Subdag

Subdag allows to reuse existing DAG by embedding it into DAG with wider functionality

SubDag is available as a GoFlow DAG construct which takes a separate DAG as an input and composite it within a vertex, where the vertex completion depends on the embedded DAG's completion

func (currentDag *Dag) SubDag(vertex string, dag *Dag)

Say we have a separate flow that needs the same set of steps to validate a user. With our earlier example we can separate out the validation process into subdag and put it in a library that can be shared across different flows Subdag

func KycImageValidationDag() *flow.Dag {
    dag := flow.NewDag()
    dag.Node("verify-url", s3DocExists)
    dag.Node("face-detect", detectFace)
    dag.Node("face-match", matchFace)
    dag.Node("generate-result", func(data []byte, option map[string][]string) ([]byte, error) {
                 return data, nil
            }, 
            flow.Aggregator(func(responses map[string][]byte) ([]byte, error) {
                status := validateResults(responses["face-detect"],  responses["face-match"])
                status = "failure"
                if status {
                   status = "success"
                }
                return []byte(status), nil
            }
    ))
    dag.Edge("verify-url", "face-detect")
    dag.Edge("verify-url", "face-match")
    dag.Edge("face-detect", "generate-result")
    dag.Edge("face-match", "generate-result")
    return dag
}

Our existing flow embeds the KycImageValidation DAG

func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    dag := f.Dag()
    dag.Node("get-image", getPresignedURLForImage)
    dag.SubDag("verify-image", common.KycImageValidationDag)
    dag.Node("mark-profile", markProfileBasedOnStatus)
    dag.Edge("get-image", "verify-image")
    dag.Edge("verify-image", "mark-profile")
    return nil
}

Conditional Branching

Conditional branching is a great way to choose different execution path dynamically

GoFlow provides a DAG component called ConditionalBranch. ConditionalBranch creates a vertex that composites different conditional branches as an individual subdags, each identified with a unique key resemble the condition

func (currentDag *Dag) ConditionalBranch(vertex string, conditions []string, condition sdk.Condition,
    options ...BranchOption) (conditiondags map[string]*Dag)

Condition is a special handler that allows user to dynamically choose one or more execution path based on the result from earlier node and return a set of condition Keys

User gets the condition branches as a response where each branch specific dags are mapped against the specific condition. User can farther define each branch using the DAG constructs

Below is the updated example with a conditional Branch where we are trying to call face-match only when face-detect passes Conditional

func KycImageValidationDag() *flow.Dag {
    dag := flow.NewDag()
    dag.Node("verify-url", s3DocExists)
    dag.Node("face-detect", detectFace)
    // here face match happen only when face-detect is success
    branches = dag.ConditionalBranch("handle-face-detect-response", []string{"pass"}, func(response []byte) []string {
        response := ParseFaceDetectResponse(response)
        if response[0] == "pass" { return []string{"pass"}  }
        return []string{}
    })

    // On the pass branch we are performing the `face-match` . If condition `pass` 
    // is not matched execution of next node `generate-result` is continued

    branches["pass"].Node("face-match", matchFace)
    dag.Node("generate-result", generateResult)
    dag.Edge("verify-url", "face-detect")
    dag.Edge("face-detect", "handle-face-detect-response")
    dag.Edge("handle-face-detect-response", "generate-result")
    return dag
}

You can also have multiple conditional branch in a workflow and different nodes corresponding to each branch

Below is the updated example with two conditional Branches where we are trying to call face-match or create-user based on response from previous node Conditional

func KycImageValidationDag() *flow.Dag {
    dag := flow.NewDag()
    dag.Node("verify-url", s3DocExists)
    dag.Node("face-detect", detectFace)
    // here face match happen only when face-detect is success
    // otherwise create-user is called
    branches = dag.ConditionalBranch("handle-face-detect-response", []string{"pass", "fail"}, 
        func(response []byte) []string {
           response := ParseFaceDetectResponse(response)
           if response.isSuccess() { return []string{"pass"}  }
           return []string{"fail"}
    })
    // On the pass branch we are performing the `face-match`
    branches["pass"].Node("face-match", matchFace)
    // on the fail branch we are performing `create-user`
    branches["fail"].Node("create-user", createUser)
  
    dag.Node("generate-result", generateResult)
    dag.Edge("verify-url", "face-detect")
    dag.Edge("face-detect", "handle-face-detect-response")
    dag.Edge("handle-face-detect-response", "generate-result")
    return dag
}

Foreach Branching

Foreach branching allows user to iteratively perform a certain set of task for a range of values

GoFlow provides a DAG component called ForEachBranch. ForEachBranch creates a vertex composites of a subdag that defines the flow within the iteration

func (currentDag *Dag) ForEachBranch(vertex string, foreach sdk.ForEach, options ...BranchOption) (dag *Dag)

ForEach is a special handler that allows user to dynamically return a set of key and values. For each of the items in the returned set, the user defined dag will get executed

User gets the foreach branch as a response and can define the flow using the DAG constructs

We are updating our flow to execute over a set of user that has been listed for possible fraud Foreach

func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    dag := f.Dag()
    dag.Node("get-users", getListedUsers)
    verifyDag = dag.ForEachBranch("for-each-user-verify", func(data []byte) map[string][]byte {
       users := ParseUsersList(data)
       forEachSet := make(map[string][]byte)
       for _, user := range users {
           forEachSet[user.id] = []byte(user.GetKycImageUrl())
       }
       return forEachSet
    })
    verifyDag.SubDag("verify-image", KycImageValidationDag)
    verifyDag.Node("mark-profile", markProfileBasedOnStatus)
    verifyDag.Edge("verify-image", "mark-profile")

    dag.Edge("get-users", "for-each-user-verify")
    return nil
}

goflow's People

Contributors

asjdf avatar dan-ash avatar danli001 avatar dependabot[bot] avatar giorgiozoppi avatar khanhchung2k5 avatar s8sg avatar shradha131 avatar taoxiesz avatar zaklll avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

goflow's Issues

There is no option to pass the redis client into goflow struct

When initializing GoFlow with all the necessary configurations, I encountered an issue with passing the Redis client. In the GoFlow struct, there's only an option to provide the RedisClient:

fs := &goflow.FlowService{
RedisURL: redisUrl,
WorkerConcurrency: 5,
}
However, when attempting to include SSL configuration in the Redis URL, I encountered the following error message:

failed to initiate connection, error dial tcp: address .too many colons in address

The sample Redis URL used was: "rediss://" + options.Username + ":" + options.Password + "@" + options.Addr + "/0?ssl_cert_reqs=None"

Further investigation is required to resolve this problem and successfully pass the SSL configuration to the go flow during initialization.

Replace redis with raft consensus

Redis might become a single point of failure even if redis-sentinel/cluster is used.
Replacement of redis with https://github.com/hashicorp/raft will remove that single point of failure and enable packaging of goflow into a single binary.
Furthermore, you might find storing and manipulating the DAG easier using the raft protocol as it operates on the basis of replicated state machines.

Pause, Resume and Stop don't work fine

Expect:
Pause can pause the specified workflow, and Resume can resume it.
Stop can stop active workflow and set its status to Finished.

Actually:

  1. After running Pasue, the workflow remains executed until the end.
...
2022/09/09 17:12:45 [request `my_test_flow-1662714755`] intermediate result from node 0_1_Node-1 to 0_2_Node-2 stored as 0_1_Node-1--0_2_Node-2
2022/09/09 17:12:45 [request `my_test_flow-1662714755`] performing request for Node 0_2_Node-2, indegree count is 1
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] request submitted for Node 0_2_Node-2
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] intermediate result from Node 0_1_Node-1 to Node 0_2_Node-2 retrieved from 0_1_Node-1--0_2_Node-2
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] executing node 0_2_Node-2
Test Node: O_O "TestData"
2022/09/09 17:12:47 Pausing request my_test_flow-1662714755 of flow my_test_flow
Node End
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] completed execution of node 0_2_Node-2
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] intermediate result from node 0_2_Node-2 to 0_3_Node-3 stored as 0_2_Node-2--0_3_Node-3
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] performing request for Node 0_3_Node-3, indegree count is 1
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] Request is paused, storing partial state for node: 0_3_Node-3
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] request submitted for Node 0_3_Node-3
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] intermediate result from Node 0_2_Node-2 to Node 0_3_Node-3 retrieved from 0_2_Node-2--0_3_Node-3
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] executing node 0_3_Node-3
Test Node: O_O "O_O "TestData""
Node End
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] completed execution of node 0_3_Node-3
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] intermediate result from node 0_3_Node-3 to 0_4_Node-4 stored as 0_3_Node-3--0_4_Node-4
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] performing request for Node 0_4_Node-4, indegree count is 1
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] Request is paused, storing partial state for node: 0_4_Node-4
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] request submitted for Node 0_4_Node-4
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] intermediate result from Node 0_3_Node-3 to Node 0_4_Node-4 retrieved from 0_3_Node-3--0_4_Node-4
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] executing node 0_4_Node-4
...
  1. Panic when exec Stop :
2022/09/09 17:15:03 Pausing request my_test_flow for flow my_test_flow-1662714890
Node End
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] completed execution of node 0_2_Node-2
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] failed to obtain pipeline state, error failed to get key core.my_test_flow.my_test_flow-1662714890.request-state, nil
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] pipeline is not active
panic: [request `my_test_flow-1662714890`] Pipeline is not active

goroutine 30 [running]:
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).findNextNodeToExecute(0xc0002c7c78)
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/sdk/executor/executor.go:609 +0x614
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).Execute(0xc0002c7c78, 0xc0002c7c58)
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/sdk/executor/executor.go:1266 +0x9c8
github.com/s8sg/goflow/core/runtime/controller/handler.PartialExecuteFlowHandler(0xc0002c7dd0, 0xc00028ecc0?, {0x14771f8, 0xc0002a6410})
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/runtime/controller/handler/partial_execute_flow_handler.go:25 +0xe5
github.com/s8sg/goflow/runtime.(*FlowRuntime).handlePartialRequest(0xc00012af70, 0xc00028ecc0)
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:457 +0x1c6
github.com/s8sg/goflow/runtime.(*FlowRuntime).handleRequest(0xc0002e6380?, 0xc0002e21e0?, {0xc0002a43b0?, 0x13260a0?})
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:414 +0xc8
github.com/s8sg/goflow/runtime.(*FlowRuntime).Consume(0xc00012af70, {0x1473a98, 0xc0002320e0})
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:394 +0x2ae
github.com/adjust/rmq/v4.(*redisQueue).consumerConsume(0xc0001d4000, {0x14701a0, 0xc00012af70})
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/adjust/rmq/v4/queue.go:282 +0x9f
created by github.com/adjust/rmq/v4.(*redisQueue).AddConsumer
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/adjust/rmq/v4/queue.go:260 +0xe5

Here is the test code:

package main

import (
	"flag"
	"fmt"
	"time"

	flow "github.com/s8sg/goflow/flow/v1"
	goflow "github.com/s8sg/goflow/v1"
)

const (
	flowName = "my_test_flow"
)

var (
	server = flag.Bool("s", false, "start server")
	op     = flag.String("o", "exec", "operation")
	rid    = flag.String("i", "", "request id")
)

func workload(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Printf("Test Node: %s\n", string(data))
	time.Sleep(10 * time.Second)
	fmt.Println("Node End")
	return []byte(fmt.Sprintf("O_O \"%s\"", string(data))), nil
}

func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
	dag := workflow.Dag()
	dag.Node("Node-1", workload)
	dag.Node("Node-2", workload)
	dag.Node("Node-3", workload)
	dag.Node("Node-4", workload)
	dag.Node("Node-5", workload)
	dag.Node("Node-6", workload)
	dag.Node("Node-7", workload)
	dag.Node("Node-8", workload)
	dag.Edge("Node-1", "Node-2")
	dag.Edge("Node-2", "Node-3")
	dag.Edge("Node-3", "Node-4")
	dag.Edge("Node-4", "Node-5")
	dag.Edge("Node-5", "Node-6")
	dag.Edge("Node-6", "Node-7")
	dag.Edge("Node-7", "Node-8")
	return nil
}

func startServer() {
	fs := &goflow.FlowService{
		Port:              8088,
		RedisURL:          "127.0.0.1:6379",
		WorkerConcurrency: 2,
		RetryCount:        1,
		DebugEnabled:      true,
	}
	err := fs.Register(flowName, DefineWorkflow)
	if err != nil {
		panic(err)
	}

	err = fs.Start()
	if err != nil {
		panic(err)
	}
}

func ExecOp(op string) string {
	fs := &goflow.FlowService{
		RedisURL: "127.0.0.1:6379",
	}

	reqId := *rid
	if op == "exec" {
		reqId = fmt.Sprintf("%s-%d", flowName, time.Now().Unix())
	}
	if len(reqId) == 0 {
		panic("request id is empty")
	}

	switch op {
	case "exec":
		err := fs.Execute(flowName, &goflow.Request{
			Body:      []byte("TestData"),
			RequestId: reqId,
		})
		if err != nil {
			panic(err)
		}
		return reqId

	case "pause":
		err := fs.Pause(flowName, reqId)
		if err != nil {
			panic(err)
		}
	case "resume":
		err := fs.Resume(flowName, reqId)
		if err != nil {
			panic(err)
		}
	case "stop":
		err := fs.Stop(flowName, reqId)
		if err != nil {
			panic(err)
		}
	}

	return reqId
}

func main() {
	flag.Parse()

	if *server {
		startServer()
		return
	}

	reqId := ExecOp(*op)
	fmt.Println(reqId)
}

Start the server:

./wf -s

Client:

./wf -o exec 
./wf -o pause -i [request_id]
./wf -o resume -i [request_id]
./wf -o stop -i [request_id]

Example program needs to be updated

When I try to run the sample program provided, it gives the following errors
./main.go:17:14: cannot use doSomething (type func([]byte, map[string][]string) ([]byte, error)) as type flow.BranchOption in argument to dag.Node

I tried to execute this on "The Go Playground", but it doesn't execute there.

We would like to explore goflow for one of the requirement. Could you please help here

[Code Quality] Abstract the flow Registry

Currently we are using redis to register

  • Flows
  • Worker
  • Workload (In Progress)

Code is currently scattered across runtime like this

The goal is to obstruct the RDB into a flow registry
We can initialise the FlowRegistry at the same time when we are initialising the RDB

Nice framework

I have been contemplating ideas around a workflow library for some time, this looks good. I think one question I'd have is, what would a Flow interface look like? We want to define that abstraction potentially in Micro as a way of doing orchestration. Once you have enough services this makes sense.

[Experimental] GoFlow DSL support

GoFlow DSL would allow user to define a workflow as a DSL. DSL can be a simple yaml file

The expectation is:

  1. Aligned to go flow domain
  2. Provide existing go flow construct and readable
  3. Doesn't need compilation of the executor for flow changes
  4. Allow go flow workload to be completely independent of go flow (#4)

[GOFLOW-30] Support for a UI dag builder

Goflow DAG builder: Provide a UI DAG builder

Goal

The goal of this story is to make it easier to build dag from a data engineer prospective. A data engineer shall not be able to
be proficent in Go and basically we want to provide an UI where he/she is able to draw a dag. We want to leverage the https://github.com/AlexImb/vue-dag library to obtain that scope.

Requirement of the story:

Functional requirement

As end user i want to:

  1. Create a graph by the UI.
  2. Select for each node the workflow code and edit that.
  3. Save/Resume the workflow

Tecnical requirements:

So the scope of this story is:
1 . Create a UI that is able to draw/save a graph and schedule its execution.
2.. Provide a REST api that is able to submit a transpiled graph.

[Code Quality] Abstract Queue Worker

Queue Worker is mainly used to handle async operations.
Queue working internally uses RMQ - which internally uses Redis

The goal is to abstract the queue-worker in a way so that all RMQ specific details gets abstracted into Queue Worker interface
QueueWorker in theory should abstract

  1. Enqueue Workload Tasks (WorkloadRequest)
  2. Enqueue Flow Operations Tasks (NewRequest, PartialRequest, Stop, Pause, Resume)
  3. Consume handler func(task *Task) error for NewRequest, PartialRequest, Stop, Pause, Resume, WorkloadResponse

[Question] How to run workflow without middleware independency

It's good for triggering workflow by api. But sometimes i wanna run my workflow in command mode. I wanna keep my workflow definition.

package amin

import (
	"fmt"
	flow "github.com/s8sg/goflow/flow/v1"
)

// Workload function
func node1(data []byte, option map[string][]string) ([]byte, error) {
	result := fmt.Sprintf("(Executing node 1 with data (%s))", string(data))
	fmt.Println(result)
	return []byte(result), nil
}

// DefineWorkflow Define provide definition of the workflow
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
	dag := workflow.Dag()
	dag.Node("node1", node1)
	return nil
}

then run this workflow in main

workflow.Run("my-workflow")

[QUERY] Examples for conditionalBranch

Hey @s8sg
I was checking the framework to build an aggregator service which calls multiple services and combine results received from each one of them. I was wondering if i can use this framework to achieve below use case
lets say i have three nodes arranged in series. I want to store the result of intermediate nodes so that on the basis of result, i would decide if i want to proceed to the next node or not. I saw dag.ConditionalBranch can achieve so by adding list of conditions. my question is how would you parse response of intermediate nodes? How can we fetch that response from previous node is one defined in my conditionList
I would appreciate if you can give more detailed example of ConditionalBranch

[Feature] Allow to register new flow dynamically

[Feature] Support dynamically appending flows after the flow service started.
Currently, it is only possible to register a workflow before starting the flow service. If you need to register a new workflow after starting the flow service, a restart is required. We need to provide a more flexible solution in the future.

Export workflows graph

Hi,

Is there any way to show the workflows graphically? same as the photos used in the documentation, or anything else...

Thank you

[bug] when enable tracing support, I find that there are concurrent operations with map

error message

fatal error: concurrent map writes
fatal error: concurrent map writes

goroutine 67 [running]:
runtime.throw(0x14d49a2, 0x15)
        /usr/local/go/src/runtime/panic.go:1116 +0x72 fp=0xc00038b6e0 sp=0xc00038b6b0 pc=0x10343f2
runtime.mapassign_faststr(0x1439380, 0xc0000df950, 0x14cdfa3, 0x4, 0x1)
        /usr/local/go/src/runtime/map_faststr.go:291 +0x3de fp=0xc00038b748 sp=0xc00038b6e0 pc=0x1014f5e
github.com/s8sg/goflow/eventhandler.(*TraceHandler).StartOperationSpan(0xc0002e2b40, 0xc000227090, 0x8, 0xc000376800, 0x14, 0x14cdfa3, 0x4)
        /Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/eventhandler/trace_handler.go:124 +0x223 fp=0xc00038b7d8 sp=0xc00038b748 pc=0x13107d3
github.com/s8sg/goflow/eventhandler.(*FaasEventHandler).ReportOperationStart(0xc00009c100, 0x14cdfa3, 0x4, 0xc000227090, 0x8, 0xc000376800, 0x14)
        /Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/eventhandler/faas_event_handler.go:66 +0x6b fp=0xc00038b820 sp=0xc00038b7d8 pc=0x130f4bb
github.com/faasflow/sdk/executor.(*FlowExecutor).executeNode(0xc00038bb98, 0xc0002268d0, 0x5, 0x8, 0xc00019a1a8, 0x1841590, 0x1, 0x0, 0x0)
        /Users/apple/go/pkg/mod/github.com/faasflow/sdk@v1.0.0/executor/executor.go:350 +0x702 fp=0xc00038b968 sp=0xc00038b820 pc=0x11338f2
github.com/faasflow/sdk/executor.(*FlowExecutor).Execute(0xc00038bb98, 0xc000380a00, 0x14d1ee9, 0xf, 0x18ac040, 0x203000, 0x203000)
        /Users/apple/go/pkg/mod/github.com/faasflow/sdk@v1.0.0/executor/executor.go:1266 +0xa8a fp=0xc00038bb00 sp=0xc00038b968 pc=0x113cbba
github.com/faasflow/runtime/controller/handler.ExecuteFlowHandler(0xc00038bcc8, 0xc0001f21e0, 0x15781e0, 0xc000128790, 0x0, 0x0)
        /Users/apple/go/pkg/mod/github.com/faasflow/runtime@v0.2.2/controller/handler/execute_flow_handler.go:28 +0x295 fp=0xc00038bc60 sp=0xc00038bb00 pc=0x1388395
github.com/s8sg/goflow/runtime.(*FlowRuntime).handleNewRequest(0xc000168100, 0xc0001f21e0, 0x6, 0xc0001f21e0)
        /Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/runtime/flow_runtime.go:204 +0xf2 fp=0xc00038bd08 sp=0xc00038bc60 pc=0x13d1bf2
github.com/s8sg/goflow/runtime.(*FlowRuntime).queueReceiver(0xc000168100, 0xc0000900a0, 0x15, 0xc00037e180, 0x6, 0x6, 0x2554c7bf5, 0x18800a0)
        /Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/runtime/flow_runtime.go:178 +0x37f fp=0xc00038bdd8 sp=0xc00038bd08 pc=0x13d195f
github.com/s8sg/goflow/runtime.(*FlowRuntime).queueReceiver-fm(0xc0000900a0, 0x15, 0xc00037e180, 0x6, 0x6, 0xc0001803c0, 0x0)
        /Users/apple/go/pkg/mod/github.com/s8sg/goflow@v0.0.8/runtime/flow_runtime.go:160 +0x5c fp=0xc00038be28 sp=0xc00038bdd8 pc=0x13d391c
github.com/benmanns/goworker.(*worker).run(0xc000208280, 0xc000066100, 0xc0000a2130)
        /Users/apple/go/pkg/mod/github.com/benmanns/goworker@v0.1.3/worker.go:154 +0x29f fp=0xc00038bed8 sp=0xc00038be28 pc=0x13852cf
github.com/benmanns/goworker.(*worker).work.func1(0xc0002260a0, 0xc000208280, 0xc0000a41e0)
        /Users/apple/go/pkg/mod/github.com/benmanns/goworker@v0.1.3/worker.go:108 +0x12a fp=0xc00038bfc8 sp=0xc00038bed8 pc=0x138708a
runtime.goexit()
        /usr/local/go/src/runtime/asm_amd64.s:1373 +0x1 fp=0xc00038bfd0 sp=0xc00038bfc8 pc=0x1064091
created by github.com/benmanns/goworker.(*worker).work
        /Users/apple/go/pkg/mod/github.com/benmanns/goworker@v0.1.3/worker.go:93 +0x1b4


Process finished with exit code 2

test code

package main

import (
	"fmt"
	"os"
	"time"

	"github.com/s8sg/goflow"
	flow "github.com/s8sg/goflow/flow"
)

// Workload function
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Println("doSomething")
	time.Sleep(time.Second)
	return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
}

// Define provide definition of the workflow
func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
	f.SyncNode().Apply("test", doSomething).Apply("test", doSomething).Apply("test", doSomething).Apply("test", doSomething)
	return nil
}

func main() {
	os.Setenv("enable_tracing", "true")
	for i := 0; i < 10; i++ {
		go func() {
			time.Sleep(10*time.Second)
			fmt.Println("start!")
			fs := &goflow.FlowService{
				RedisURL: "127.0.0.1:6379",
			}
			fs.Execute("myflow", &goflow.Request{
				Body: []byte("hallo"),
			})
		}()
	}
	fs := &goflow.FlowService{
		Port:              8080,
		RedisURL:          "127.0.0.1:6379",
		OpenTraceUrl:      "127.0.0.1:5775",
		WorkerConcurrency: 5,
	}
	err := fs.Register("myflow", DefineWorkflow)
	if err != nil {
	    panic(err)
	}
	err = fs.Start()
	if err != nil {
		panic(err)
	}
	
	select {}
}

Maybe TraceHandler's operationSpans concurrent operate.

Http requests get dropped when trying to send multiple requests

How to reproduce

  1. Running the basic example from the README.
  2. Send a single request using curl -d hello http://localhost:8080/myflow - Everything works
  3. Send multiple requests using for i in {1..50}; do curl -d hello http://localhost:8080/myflow >> output.log & ; done

the following is happening:

2023/04/24 13:39:29 http: panic serving 127.0.0.1:51880: runtime error: invalid memory address or nil pointer dereference                                                                                                                     
goroutine 622 [running]:                                                                                                                                                                                                                      
net/http.(*conn).serve.func1()                                                                                         
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:1854 +0xbf
panic({0x765840, 0xaa66e0})                                
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/runtime/panic.go:890 +0x263
github.com/s8sg/goflow/eventhandler.(*TraceHandler).StopNodeSpan(...)                  
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/eventhandler/trace_handler.go:110
github.com/s8sg/goflow/eventhandler.(*GoFlowEventHandler).ReportNodeEnd(0x79c520?, {0xc00088c910?, 0x7f6d9e1e69a8?}, {0x7f6dc6d6e3c8?, 0xc000051400?})
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/eventhandler/goflow_event_handler.go:57 +0x3d
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).findNextNodeToExecute(0xc000177740)
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/sdk/executor/executor.go:627 +0x4f8
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).Execute(0xc000177740, 0xc0010bd720)
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/sdk/executor/executor.go:1265 +0x9d8
github.com/s8sg/goflow/core/runtime/controller/handler.ExecuteFlowHandler(0xc0011484b0, 0xc0007fc600, {0x885bb8, 0xc000902680})
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/runtime/controller/handler/execute_flow_handler.go:33 +0x265
github.com/s8sg/goflow/runtime.newRequestHandlerWrapper.func1({0x882080, 0xc000d382a0}, 0xc00114c000, {0xc000882d80, 0x1, 0xc000c30b70?})
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/runtime/new_request_handler_wrapper.go:52 +0x6ec
github.com/julienschmidt/httprouter.(*Router).ServeHTTP(0xc00010e1e0, {0x882080, 0xc000d382a0}, 0xc00114c000)
        /home/danash/go/pkg/mod/github.com/julienschmidt/[email protected]/router.go:387 +0x81c
net/http.serverHandler.ServeHTTP({0x880de8?}, {0x882080, 0xc000d382a0}, 0xc00114c000)  
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:2936 +0x316
net/http.(*conn).serve(0xc0002a8e10, {0x8822d8, 0xc0001181e0})              
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:1995 +0x612
created by net/http.(*Server).Serve                                                                                    
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:3089 +0x5ed
2023/04/24 13:39:29 Reporting span 338343586c9e519f:62cdaa3f404a5f84:6cad4d79616ee4e9:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:6cad4d79616ee4e9:338343586c9e519f:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:7818de4473a991b2:338343586c9e519f:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:354240b2e50c0ada:6cad4d79616ee4e9:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:6cad4d79616ee4e9:338343586c9e519f:1
2023/04/24 13:39:29 Reporting span 338343586c9e519f:6cad4d79616ee4e9:338343586c9e519f:1
2023/04/24 13:39:29 Executing flow myflow                                                                              
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:38cef6c7b9bc1643:16ba1e6c69d0d230:1
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:06ac2ac89bb22560:2cec2b7378dfba32:1
2023/04/24 13:39:29 http: panic serving 127.0.0.1:51896: [request `ch35ooasig6serk31o8g`] Pipeline is not active
goroutine 639 [running]:                                                                                               
net/http.(*conn).serve.func1()                                                                                         
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:1854 +0xbf
panic({0x747500, 0xc000292900})                                                                                        
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/runtime/panic.go:890 +0x263
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).executeNode(0xc0008f5740, {0xc0011c0200, 0x5, 0x200})
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/sdk/executor/executor.go:346 +0x645
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).Execute(0xc0008f5740, 0xc000a49720)
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/sdk/executor/executor.go:1257 +0x8cb
github.com/s8sg/goflow/core/runtime/controller/handler.ExecuteFlowHandler(0xc00047c2d0, 0xc000181c80, {0x885bb8, 0xc0009b8000})
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/core/runtime/controller/handler/execute_flow_handler.go:33 +0x265
github.com/s8sg/goflow/runtime.newRequestHandlerWrapper.func1({0x882080, 0xc0011bc0e0}, 0xc000454000, {0xc000cfe080, 0x1, 0xc000a49b70?})
        /home/danash/go/pkg/mod/github.com/s8sg/[email protected]/runtime/new_request_handler_wrapper.go:52 +0x6ec
github.com/julienschmidt/httprouter.(*Router).ServeHTTP(0xc00010e1e0, {0x882080, 0xc0011bc0e0}, 0xc000454000)
        /home/danash/go/pkg/mod/github.com/julienschmidt/[email protected]/router.go:387 +0x81c
net/http.serverHandler.ServeHTTP({0x880de8?}, {0x882080, 0xc0011bc0e0}, 0xc000454000)
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:2936 +0x316
net/http.(*conn).serve(0xc0003223f0, {0x8822d8, 0xc0001181e0})
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:1995 +0x612
created by net/http.(*Server).Serve
        /nix/store/i6a9j315dl95az9a9vjpgya90y6j9z66-go-1.20.2/share/go/src/net/http/server.go:3089 +0x5ed
2023/04/24 13:39:29 debug logging disabled
2023/04/24 13:39:29 Initializing logging reporter
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:2cec2b7378dfba32:16ba1e6c69d0d230:1
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:37ed544c17db1cb0:2cec2b7378dfba32:1
2023/04/24 13:39:29 Reporting span 16ba1e6c69d0d230:2cec2b7378dfba32:16ba1e6c69d0d230:1
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:1b7dd13c2c4c0da7:0000000000000000:1
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:027da292581982b2:0d43717ff3b76e9f:1
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:1b7dd13c2c4c0da7:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:0d43717ff3b76e9f:1b7dd13c2c4c0da7:1
2023/04/24 13:39:29 debug logging disabled
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:1b7dd13c2c4c0da7:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 1b7dd13c2c4c0da7:1b7dd13c2c4c0da7:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:13e86d6139141d0a:79fe1508dadda560:1
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:79fe1508dadda560:5983b20c54d0ee71:1
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:5983b20c54d0ee71:0000000000000000:1
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:5983b20c54d0ee71:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:5983b20c54d0ee71:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored
2023/04/24 13:39:29 Reporting span 5983b20c54d0ee71:5983b20c54d0ee71:0000000000000000:1
2023/04/24 13:39:29 ERROR: Repeated attempt to close the reporter is ignored

By implementing #46 we might achieve a more robustness handling for requests

The latest release does not match github webpage's README.md that causes "does not contain package" error

Firstly, thank you for offering us a very useful tool to build an efficient workflow architecture.

However, when I'm using your github page's README.md's introduction, I encountered this problem:

github.com/s8sg/goflow/flow/v1: module github.com/s8sg/goflow@latest found (v0.1.0), but does not contain package github.com/s8sg/goflow/flow/v1
github.com/s8sg/goflow/v1: module github.com/s8sg/goflow@latest found (v0.1.0), but does not contain package github.com/s8sg/goflow/v1

in my golang project's import, I write this:

import (
	goflow "github.com/s8sg/goflow/v1"
	flow "github.com/s8sg/goflow/flow/v1"
)

just the same as your github webpage's README.md write.

So, I went into my local go path where your src code downloaded to find out what happend. In my directory, I found the source code I downloaded is v0.1.0 which is 2020's version and it does not match your latest README instruction. Apparently, the latest release on Github is outdated. So, could you update a latest release that can match your latest README.md?

codahale/hdrhistogram repo url has been transferred under the github HdrHstogram umbrella

Problem

The codahale/hdrhistogram repo has been transferred under the github HdrHstogram umbrella with the help from the original author in Sept 2020 (new repo url https://github.com/HdrHistogram/hdrhistogram-go). The main reasons are to group all implementations under the same roof and to provide more active contribution from the community as the original repository was archived several years ago.

The dependency URL should be modified to point to the new repository URL. The tag "v0.9.0" was applied at the point of transfer and will reflect the exact code that was frozen in the original repository.

If you are using Go modules, you can update to the exact point of transfer using the @v0.9.0 tag in your go get command.

go mod edit -replace github.com/codahale/hdrhistogram=github.com/HdrHistogram/[email protected]

Performance Improvements

From the point of transfer, up until now (mon 16 aug 2021), we've released 3 versions that aim support the standard HdrHistogram serialization/exposition formats, and deeply improve READ performance.
We recommend to update to the latest version.

[enhancement]: flow events

It would be awesome to expose flow events via registering event hooks to flow library.

We would like to know when our flow state change: started, completed, failed, etc.

We would like to add missing features by contributing to this library if you accept it 🙏

Allow to schedule a node as kubernetes job.

In the DAG asynchronously we're creating a monolith structure when n the Distributed ETL use case it will be nice to have a flag to distribute the node of a graph as job, doing effectively a pipeline effect using redis queuing mechanism.
Suppose we have a DAG:

  1. A -> B ->C
  2. The user trigger the dag from A,
  3. the triggering event schedule 3 types jobs A, B, C to be executed in the kubernetes cluster wherease each job type has a cardinality (number of pods associated)

This needs to be evalutated in design meeting and follow up discussion and check what currentely kubeflow does.

Expose DAG validation in workflow layer

Hi there,

When we build an online system to create/modify workflows for users, we found it is neccessary to validate whether the DAG created by users is valid. And there is a simple way to implement the Validate() in workflow layer. #71

Thanks!

branch permissions.

jozoppi@DESKTOP-T9826C8:/projects/goflow$ git push --set-upstream origin sample_flows
remote: Permission to s8sg/goflow.git denied to giorgiozoppi.
fatal: unable to access 'https://github.com/s8sg/goflow/': The requested URL returned error: 403
jozoppi@DESKTOP-T9826C8:
/projects/goflow$

[Question] How to set special redis db

Here is FlowService struct

type FlowService struct {
	Port                    int
	RedisURL                string
	RedisPassword           string
	RequestAuthSharedSecret string
	RequestAuthEnabled      bool
	WorkerConcurrency       int
	RetryCount              int
	Flows                   map[string]runtime.FlowDefinitionHandler
	RequestReadTimeout      time.Duration
	RequestWriteTimeout     time.Duration
	OpenTraceUrl            string
	DataStore               sdk.DataStore
	Logger                  sdk.Logger
	EnableMonitoring        bool
	DebugEnabled            bool

	runtime *runtime.FlowRuntime
}

I want to connect to a specifically redis db, but FlowService don't have a field like RedisDB

Is there any example to get started with goflow?

I tried to implement your library. But I can't for Creating More Complex DAG examples as mentioned on READ.md.

I tried following but the functions never executed:

package main

import (
	"fmt"
	"github.com/faasflow/goflow"
	flow "github.com/faasflow/lib/goflow"
	"log"
)

// Workload function
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
	log.Println(fmt.Sprintf("you said \"%s\"", string(data)))
	return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
}

// Workload function
func loadProfile(data []byte, option map[string][]string) ([]byte, error) {
	log.Println(fmt.Sprintf("load profile \"%s\"", string(data)))
	return []byte(fmt.Sprintf("load profile \"%s\"", string(data))), nil
}
// Workload function
func getPresignedURLForImage(data []byte, option map[string][]string) ([]byte, error) {
	log.Println(fmt.Sprintf("image url \"%s\"", string(data)))
	return []byte(fmt.Sprintf("image url \"%s\"", string(data))), nil
}
// Workload function
func detectFace(data []byte, option map[string][]string) ([]byte, error) {
	log.Println(fmt.Sprintf("detect face \"%s\"", string(data)))
	return []byte(fmt.Sprintf("detect face \"%s\"", string(data))), nil
}
// Workload function
func markProfileBasedOnStatus(data []byte, option map[string][]string) ([]byte, error) {
	log.Println(fmt.Sprintf("mask profile \"%s\"", string(data)))
	return []byte(fmt.Sprintf("mask profile \"%s\"", string(data))), nil
}

// Define provide definition of the workflow
func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
	dag := f.Dag()
	dag.Node("get-kyc-image").Apply("load-profile", loadProfile).
		Apply("get-image-url", getPresignedURLForImage).
		Apply("face-detect", detectFace).
		Apply("mark-profile", markProfileBasedOnStatus)
	dag.Edge("get-kyc-image", "face-detect")
	dag.Edge("face-detect", "mark-profile")
	return nil
}

func main() {
	fs := &goflow.FlowService{
		Port:                8080,
		RedisURL:            "localhost:6379",
		OpenTraceUrl:        "localhost:5775",
		WorkerConcurrency:   5,
	}
	fs.Start("myflow", DefineWorkflow)
}

Project Activity?

Hello, I am very very interested in incorporating goflow into one of my projects. I was just wondering if this repo is still active?

Server will panic if use "time.sleep()" in node func

package main

import (
"fmt"
"time"

goflow "github.com/s8sg/goflow"
flow "github.com/s8sg/goflow/flow"
)

// Workload function
func doSomething1(data []byte, option map[string][]string) ([]byte, error) {
time.Sleep(10 * time.Second)
fmt.Println(fmt.Sprintf("you said 1 %s", string(data)))
return []byte(fmt.Sprintf("you said 1 "%s"", string(data))), nil
}

func doSomething2(data []byte, option map[string][]string) ([]byte, error) {
time.Sleep(10 * time.Second)
fmt.Println(fmt.Sprintf("you said 2 %s", string(data)))
return []byte(fmt.Sprintf("you said 2 "%s"", string(data))), nil
}

func doSomething3(data []byte, option map[string][]string) ([]byte, error) {
time.Sleep(10 * time.Second)
fmt.Println(fmt.Sprintf("you said 3 %s", string(data)))
return []byte(fmt.Sprintf("you said 3 "%s"", string(data))), nil
}

// Define provide definition of the workflow
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
dag := workflow.Dag()
dag.Node("test1").Apply("test1", doSomething1)
dag.Node("test2").Apply("test2", doSomething2)
dag.Node("test3").Apply("test3", doSomething3)
dag.Edge("test1", "test2")
dag.Edge("test2", "test3")
return nil
}

func main() {
fs := &goflow.FlowService{
Port: 8080,
RedisURL: "localhost:6379",
OpenTraceUrl: "localhost:5775",
WorkerConcurrency: 5,
}
fs.Register("myflow", DefineWorkflow)
fs.StartWorker()
err := fs.Execute("myflow", &goflow.Request{
RequestId: "request1",
Body: []byte{'a', 'a', 'a'},
})
fmt.Println(err)
err = fs.Execute("myflow", &goflow.Request{
RequestId: "request2",
Body: []byte{'b', 'b', 'b'},
})
fmt.Println(err)
}

[GoFlow-4] Allow GoFlow workload to be completely independent to the flow

Currently Flow and Workload are tightly coupled as workload are defined inside of a flow definition
This has a few drawbacks

  1. You can't scale only a specific workload
  2. Changing a workload will touch the whole flow definition
  3. Workload can't be shared across other flows

To solve this issue GoFlow will support a special kind of Operation called ApplyExternal() like Apply()
It will take a unique ID of the workload that need to be called

Node('n2').ApplyExternal('task1')

To define a workload user need to use the GoFlow Library and Register a Workload using the unique ID

fs.Register('task1', PerformTask1)
fs.StartWorkload(['myflow']) // start workload will take the list of flows

Workload will have the same signature

func PerformTask1(data []byte, option map[string][]string) ([]byte, error) {
  ...
}

[Feature] Dynamically update workflows

Hi @s8sg ,

We are using goflow to build an online system. The main use case is to orchestrate different tasks together like workflows.
For example, initially user has a workflow like

Node-A -> Node-B

But the user now want to modify and changed to

Node-A -> Node-C -> Node-B

Currently, however, after registering workflows, the DAG cannot be changed.

(Of course, we can restart the flow service to achieve the reload)

So I try to contribute in this PR #64

Installation problems

After installing goflow as explained in the guide, I have the following problem:

$ go build -o goflow
flow.go:6:2: no required module provides package github.com/s8sg/goflow/flow/v1; to add it:
        go get github.com/s8sg/goflow/flow/v1
flow.go:7:2: no required module provides package github.com/s8sg/goflow/v1; to add it:
        go get github.com/s8sg/goflow/v1

I try to solve it with the indicated commands but it does not work.

$ go get github.com/s8sg/goflow/flow/v1
go: module github.com/s8sg/goflow@upgrade found (v0.1.0), but does not contain package github.com/s8sg/goflow/flow/v1

Sorry in advance if this is something silly, I am new to learning golang.

[Discussion] Workflow rollback support

in my case, i want to use goflow to create some resources on a public cloud. like creating a cloud vm with public ip.

  1. create a vm using cloud openapi
  2. wait the status of vm is running
  3. create a public ip using cloud openapi
  4. wait the status of public ip is available
  5. bind the public ip with vm
  6. other steps

i want

  1. if I cancel the workflow(like run flow.cancel()), workflow can free the created resources(vm,public ip etc)
  2. if one step failed, workflow can free the created resources(vm,public ip etc)

Allow redis to be configured with password

package main

import (
	"fmt"

	flow "github.com/s8sg/goflow/flow/v1"
	goflow "github.com/s8sg/goflow/v1"
)

// Workload function
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Printf("hello world\n")
	return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
}

// Define provide definition of the workflow
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
	dag := workflow.Dag()
	dag.Node("test", doSomething)
	return nil
}

func main() {
	fs := &goflow.FlowService{
		Port:              8080,
		RedisURL:          "192.168.1.65:16379", // need redis password 123
		OpenTraceUrl:      "192.168.1.65:5775",
		WorkerConcurrency: 5,
		EnableMonitoring:  true,
	}

	fs.Register("myflow", DefineWorkflow)
	fs.Start()
}

[Question] How to restart a flow from where it stopped

I have a flow with multiple nodes: 1 -> 2 -> 3 -> 4. When this flow is executed, Node 1 done, Node 2 done, Node 3 runs for a long time but eventually forces a stop due to the application shutting down. However, when I restart the application, I want the flow to restart from Node 3, but it does not. How can I achieve this?

[Feature] Add support in server to stop, pause, resume and list flow

Currently HTTP server only support execution of the flow with

POST /`flow-name`
GET /`flow-name`

The proposal is to add the support to stop, pause, resume and list flows
The APIS would be

Execute

Execute executes the flow, we will change the domain with /flow prefix

POST /flow/`flow-name` 
GET /flow/`flow-name`

Stop

Stop stops the flow

POST /flow/`flow-name`/stop 

Pause

Pause pause execution of the flow

POST /flow/`flow-name`/pause

Resume

Resume resumes the execution of the flow

POST /flow/`flow-name`/resume

List

List lists all the registered flows

POST /flow/list

[enhancement]: support route params and adding middlewares

We would like to add custom middleware to the internal http server. Also if we can pass query parameters to the endpoints, we can create more generalized flows.

Goflow library can expose its internal server and handlers by sane defaults, hence users can overwrite it.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.