gammazero / workerpool Goto Github PK
View Code? Open in Web Editor NEWConcurrency limiting goroutine pool
License: MIT License
Concurrency limiting goroutine pool
License: MIT License
Running the example from README
with go 1.11 on macOS 10.13.6, I see that in many runs, it mostly miss many requests, how so?
$ ./foo
Handling request: beta
Handling request: alpha
$ ./foo
Handling request: beta
Handling request: alpha
$ ./foo
Handling request: beta
Handling request: gamma
Handling request: delta
Handling request: epsilon
Handling request: alpha
$ ./foo
Handling request: beta
Handling request: alpha
$ ./foo
Handling request: beta
Handling request: alpha
$ ./foo
Handling request: beta
Handling request: alpha
Are there any plans to publish a release with the new Pause method?
I use this package in my application and I observe that the memory consumption is increased over time.
I added goleak to search for leaks in one of the existing tests and found the following:
Goroutine 6 in state select, with github.com/gammazero/workerpool.(*WorkerPool).dispatch on top of the stack:
goroutine 6 [select]:
github.com/gammazero/workerpool.(*WorkerPool).dispatch(0x14000108080)
/Users/myuser/ws/workerpool/workerpool.go:184 +0x180
created by github.com/gammazero/workerpool.New
/Users/myuser/ws/workerpool/workerpool.go:37 +0x108
]
got error
../go/pkg/mod/github.com/gammazero/[email protected]/workerpool.go:50:15: cannot use generic type deque.Deque[T any] without instantiation
FYI
I noticed the following tests are inconsistent/broken..
TestWaitingQueueSizeRace
TestStopRace
# fails when count > ~30
go test -race -run ^TestWaitingQueueSizeRace$ -count=100
# this fails sometimes (run back to back to back, etc..)
go test -race -run ^TestWaitingQueueSizeRace$ -count=30
# without `-race` you need a high count, and run over and over
go test -run ^TestStopRace$ -count=2000
# this fails every time (I have yet to get it to pass)
go test -race -run ^TestStopRace$ -count=100
# this passes sometimes
go test -race -run ^TestStopRace$ -count=40
Output:
MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
PASS
ok github.com/gammazero/workerpool 0.180s
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
PASS
ok github.com/gammazero/workerpool 0.177s
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
--- FAIL: TestStopRace (0.00s)
workerpool_test.go:331: Stop should not return in any goroutine
FAIL
exit status 1
FAIL github.com/gammazero/workerpool 0.172s
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
PASS
ok github.com/gammazero/workerpool 0.168s
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
--- FAIL: TestStopRace (0.00s)
workerpool_test.go:331: Stop should not return in any goroutine
--- FAIL: TestStopRace (0.00s)
workerpool_test.go:331: Stop should not return in any goroutine
FAIL
exit status 1
FAIL github.com/gammazero/workerpool 0.169s
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
Hello, first of all nice package!
I have a question regarding usage: I'm using RabbitMQ as a task queue, and I would like to run multiple workers that fetch from that task queue.
Now what I would like to achieve is to ack messages (and pull new messages) once a single worker is finished with a task and ready to pull new.
What I learned from the code is that when using Submit()
you are building up a queue in memory - this is where I want to use RabbitMQ for persisted messages.
Now when using SubmitWait()
it seems that the whole pool is stopping and waiting. Is this intended?
I expected that the worker that is currently running the task is blocking the function call, but all other workers would keep running (and pulling messages when free).
I tried a pool of 8 workers, but when using SubmitWait()
everything still gets triggered in a serial fashion.
Thanks for the great project! It's really useful. I have a suggestion, you might want to start versioning this package. This makes it easier to make dependencies more explicit in other go projects, and provides more information regarding changes to the code.
Submit()
tries to send message to taskQueue
func (p *WorkerPool) Submit(task func()) {
if task != nil {
p.taskQueue <- task
}
}
https://github.com/gammazero/workerpool/blob/master/workerpool.go#L109
stop()
closes the taskQueue.
func (p *WorkerPool) stop(wait bool) {
p.stopOnce.Do(func() {
// Signal that workerpool is stopping, to unpause any paused workers.
close(p.stopSignal)
// Acquire stopLock to wait for any pause in progress to complete. All
// in-progress pauses will complete because the stopSignal unpauses the
// workers.
p.stopLock.Lock()
// The stopped flag prevents any additional paused workers. This makes
// it safe to close the taskQueue.
p.stopped = true
p.stopLock.Unlock()
p.wait = wait
// Close task queue and wait for currently running tasks to finish.
close(p.taskQueue)
})
<-p.stoppedChan
}
Both of these can be happening in separate goroutines, and it can trigger a panic of writing to a closed chan.
We really want the feature to change maxWorkers
dynamically. Any plan to support this feature? I try to implement a SetMaxWorkers
for WorkerPool
but find that the implementation of Pause
rely on a static value of maxWorkers
which make the situation complicated.
Putting this here for visibility...
Example of how to record "execution duration" + run each job with an "auto cancelling" timeout.
I understand it is not mint code, so please feel free to suggest changes, or supply your own improved version.
package main
import (
"context"
"fmt"
"time"
"github.com/gammazero/workerpool"
)
// JobResult holds a jobs result
type JobResult struct {
// Public
Data interface{}
// Private
err error
runtime time.Duration
name string
}
// Name returns name. It is written like this so the consumer
// cannot change the name outside of supplying one via the Job
func (jr *JobResult) Name() string {
return jr.name
}
// Runtime returns job execution runtime
func (jr *JobResult) Runtime() time.Duration {
return jr.runtime
}
// Error holds job errors, if any
func (jr *JobResult) Error() error {
return jr.err
}
// SetError sets an error on our result
func (jr *JobResult) SetError(e error) {
jr.err = e
}
// Job holds job data
type Job struct {
Name string
Task func() JobResult
}
func wrapJob(timeout time.Duration, resultsChan chan JobResult, job Job) func() {
// Create our context with timeout per job
timeoutContext, timeoutCancel := context.WithTimeout(context.Background(), timeout)
return func() {
timerStart := time.Now()
// Start goroutine using our context, which contains our timeout.
go func(ctx context.Context, done context.CancelFunc, resChan chan JobResult, todo Job, startTime time.Time) {
// Get result from job
result := todo.Task()
// Set name & execution time after job completion
result.runtime = time.Since(startTime)
result.name = todo.Name
// If the timeout has been hit then `timeoutContext.Err()`
// will be != nil and we should not send it on our results chan.
//
// Without this check we would send this job twice due to the fact
// we cannot cancel in-flight requests.
//
// Lets say we have a long running task, how would we cancel it
// in-flight? Whether http request or simply running `time.Sleep(time.Hour*999999)`?
//
// Instead we just don't do anything with the return, hence this check.
if timeoutContext.Err() == nil {
resChan <- result
}
// Forcefully cancel our context.
// Cancelling forcefully is not bad, essentially it means success
done()
}(timeoutContext, timeoutCancel, resultsChan, job, timerStart)
select {
// If our timeout is hit *or* cancelled forcefully, we wind up here...
case <-timeoutContext.Done():
// ...that is why we check for error
switch timeoutContext.Err() {
// ...specifically the timeout error.
case context.DeadlineExceeded:
// Place a result on our results channel that contains
// an error, which we can check for later.
resultsChan <- JobResult{
err: context.DeadlineExceeded,
name: job.Name,
runtime: time.Since(timerStart),
}
}
}
}
}
var jobs = []Job{{
Name: "job1",
Task: func() JobResult {
// THIS JOB WILL ERROR ON PURPOSE
// This will surpass our timeout and should get cancelled
// ...you can do whatever you want in these jobs
time.Sleep(time.Second * 3)
// Don't have to set the name here
return JobResult{Data: map[string]string{"Whatever": "You want"}}
}}, {
Name: "job2",
Task: func() JobResult {
// THIS JOB WILL SUCCEED
time.Sleep(time.Millisecond * 300)
resultFromCurl := "i am a result"
return JobResult{Data: resultFromCurl}
}},
}
func main() {
// Set timeout here (or per job)
jobTimeout := time.Duration(time.Second * 1) // 1 second
// Create results channel with T type where T is whatever type you need
jobResultsChannel := make(chan JobResult, len(jobs))
// Create workerpool
numWorkers := 10
pool := workerpool.New(numWorkers)
// Submit jobs to workerpool using our wrapper func
for _, job := range jobs {
pool.Submit(wrapJob(jobTimeout, jobResultsChannel, job))
}
// Wait for jobs to finish and close results channel
pool.StopWait()
close(jobResultsChannel)
// Do whatever you want with results
for jobResult := range jobResultsChannel {
runTime := int(jobResult.Runtime() / time.Millisecond)
str := "[%dms] : '%s' : JobSuccess : %s\n"
data := jobResult.Data
if jobResult.Error() != nil { // You should always check for errors
str = "[%dms] : '%s' : JobError : %s\n"
data = jobResult.Error()
}
fmt.Printf(str, runTime, jobResult.Name(), data)
}
}
//// Output:
// [303ms] 'job2' : JobSuccess : i am a result
// [1001ms] 'job1' : JobError : context deadline exceeded
A concurrent put to a same dynamodb resource fails in aws-sdk-go so i was thinking if i could redirect put calls to specific workers (1 worker for each table and there can be a lot of tables) i would be able to solve this problem
Hello dear developer, I'm looking for a way to pause a workerpool with pressing some key("P" as an example) and resume work with the same key. And I want to completely stop workerpooI with another key(maybe with "S" key). I have construction as an your example
Is there a way to stop jobs in mid-execution?
I switched workerpoolxt to use context but the job still runs even though the context has been cancelled... I'm not really sure how to fix this, or if it is even possible.
I have created a POC that reproduces this issue (code below) which you can also view/run on The Go Playground
Any help would be greatly appreciated!!
0 from job a
1 from job a
Job 'a' should have stopped here
2 from job a
3 from job a
4 from job a
[{a context deadline exceeded <nil>} {b <nil> from b}]
0 from job a
1 from job a
Job 'a' should have stopped here
[{a context deadline exceeded <nil>} {b <nil> from b}]
POC Code:
package main
import (
"context"
"fmt"
"time"
"github.com/gammazero/workerpool"
)
func main() {
runner := newRunner(context.Background(), 10)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second*3))
defer cancel()
runner.do(job{
Name: "a",
Context: ctx,
Task: func() jobResult {
for i := 0; i < 10000; i++ {
time.Sleep(time.Second * 1)
fmt.Println(i, "from job a")
}
return jobResult{Data: "from a"}
},
})
runner.do(job{
Name: "b",
Task: func() jobResult {
time.Sleep(time.Duration(time.Second * 6))
return jobResult{Data: "from b"}
},
})
results := runner.getjobResults()
fmt.Println(results)
}
type runner struct {
*workerpool.WorkerPool
defaultCtx context.Context
kill chan struct{}
result chan jobResult
results []jobResult
}
func (r *runner) processResults() {
for {
select {
case res, ok := <-r.result:
if !ok {
goto Done
}
r.results = append(r.results, res)
}
}
Done:
<-r.kill
}
func newRunner(ctx context.Context, numRunners int) *runner {
r := &runner{
WorkerPool: workerpool.New(numRunners),
kill: make(chan struct{}),
result: make(chan jobResult),
defaultCtx: ctx,
}
go r.processResults()
return r
}
func (r *runner) do(j job) {
r.Submit(r.wrap(&j))
}
func (r *runner) getjobResults() []jobResult {
r.StopWait()
close(r.result)
r.kill <- struct{}{}
return r.results
}
func (r *runner) wrap(job *job) func() {
return func() {
if job.Context == nil {
job.Context = r.defaultCtx
}
job.childContext, job.done = context.WithCancel(job.Context)
job.result = make(chan jobResult)
go job.Run()
r.result <- job.getResult()
}
}
type job struct {
Name string
Task func() jobResult
Context context.Context
result chan jobResult
childContext context.Context
stopped chan struct{}
done context.CancelFunc
}
func (j *job) Run() {
result := j.Task()
result.name = j.Name
j.result <- result
j.done()
}
func (j *job) getResult() jobResult {
select {
case r := <-j.result:
return r
case <-j.childContext.Done():
fmt.Printf("Job '%s' should have stopped here\n", j.Name)
switch j.childContext.Err() {
default:
return jobResult{name: j.Name, Error: j.childContext.Err()}
}
}
}
type jobResult struct {
name string
Error error
Data interface{}
}
Hi,
I want to run multiple requests and get one response (struct or something) that all done.
I mean if I've 10 request and the longest took 10 sec i want after 10 seconds get the all the responses with id or something for each request to know who success and who has failed, is it possible ?
thanks
Hi,
I'm in a need of blocking Submit
until the queue of tasks is small enough.
I imagine something like SubmitBlock( ctx context.Context, minQueueDepth uint, f func())
that waits for the queue to be small enough, inserts the func and immediately returns.
Would you like to add something like that or accept a PR?
Hi there,
I'm using your library on every project that I have. But there is a line in your example that I don't understand. I'm not an expert on Go, maybe it's something very basic. So forgive me if it's a silly question.
for _, r := range requests {
r := r
wp.Submit(func() {
fmt.Println("Handling request:", r)
})
}
Why we are re-defining r
there at r := r
? We already have the value of r
. The code doesn't work without that line. It became a mystery for me.
Currently, idle timeout is hardcoded, but I encountered a use case where it will be useful to change it to a higher number. Any chance you can accept a PR for this @gammazero?
Hello!
Why are you using startWorker()
and not worker()
?
Lines 192 to 200 in 85cc841
Maybe you have some examples, how I can pass db connection to worker? Not to specific task.
Main idea is: Up some workers with 1 connection per worker and do sql requests(tasks) using worker sql connection.
Ubuntu 20.04 LTS
go version go1.19.3 linux/amd64
github.com/gammazero/workerpool v1.1.3
package main
import (
"fmt"
"github.com/gammazero/workerpool"
)
type JobAccount struct {
Id int
Username string
}
func main() {
list := []string{
"demo1", "demo2", "demo3",
}
var accounts []*JobAccount
for k, v := range list {
accounts = append(accounts, &JobAccount{
Id: k,
Username: v,
})
}
wp := workerpool.New(2)
for _, account := range accounts {
fmt.Printf("loop %+v\n", account.Username)
wp.Submit(func() {
fmt.Printf("go rountine %+v\n", account.Username)
})
}
wp.StopWait()
}
Here actual output
loop demo1
go rountine demo1
loop demo2
loop demo3
go rountine demo3
go rountine demo3
I was expecting
loop demo1
go rountine demo1
loop demo2
loop demo3
go rountine demo2
go rountine demo3
How do I say tracking progress on which job is done and % of done job overall? Like displaying completed, queued, waiting to be queued jobs.
Good day,
https://codecov.io/bash recently outputted a security issue. Details can be found at https://about.codecov.io/security-update/
The following file and code are impacted. Please follow the recommended actions in the link above to secure your env and code integrity.
file: .travis.yml
Code:
after_success:
bash <(curl -s https://codecov.io/bash)
I was looking at your implementation for a worker pool in Go and I really like the way you approached it; in that you can submit at any time and it adds it to the queue. Most implementations seem to operate off of a set of known tasks a priori.
Secondly, it doesn't delay the submission channel and this is very important because in my case if I lag even just a few milliseconds (or 10s of milliseconds) from the event firing, I will miss the next event. So far so good.
However, what I am struggling to figure out is how to put a delay in somewhere so that I can set the time between executions for each task without interfering with the incoming events. This delay is especially needed for the first set of workers that get added to the pool because they get fired off simultaneously. I need at least some spacing for the web server to breathe a bit before I lay siege on it. ;)
Any suggestions on where I can add this delay?
Can this thread pool run in the background?
Hi Gammazero,
i'm using your great workerpool, and i have a problem. I use the submit() function to add jobs that are processed by the workers. But these workers need to perform database tasks. When i submit my worker function and put the db.Open and db.Close inside, i have too many connections.
My Question is now : is it possible to let the workers themselves open and close the connections, so that i always have the limited number of connections open. That would be awesome
Thanks in advance
Adrian
Stumble upon this lib and it looks good but I have a concern the Stop()
method could be named better.
When I read example code I had impression Stop()
would wait for current processes to complete, clear execution tasks queue and quit. But in reality seems it's wait until all tasks completed.
Should not it be named something like Close()
or CloseAndWait()
or something?
May be even better if there are 2 methods Close()
to signal there are no more tasks and Wait()
to wait for tasks completion.
First, so many thanks for your workerpool
module that is exactly what I've been looking for!
In my projects I tend to use several worker pools and I use them for transient work only, so these are not worker pools kept around all the time a service is up. Instead, I create worker pools as needed for individual requests (and there are some reasons for not sharing certain workers across different service requests).
When unit testing my service handlers for goroutine leaks (using Gomega's at this time still brand new gleak
goroutine leak tester) I noticed that it would be quite nice for diagnosing if workerpool
s and their workers could be individually identified by some name. The name of a worker could simply be the one of its pool for simplicity. Would you be interested in a PR?
In a use case I'd like to use this workerpool for I have a need for a more granular control over the queue (e.g. set priorities for tasks, which may also repeatedly change while the task in waiting in the queue).
I can implement the queueing logic myself, outside of this library, but I don't see how I could prevent the library from queueing the tasks within itself.
The only way there seems to be is by checking whether the queue is not empty and prevent submitting new tasks, e.g.
if wp.WaitingQueueSize() > 0 {
// custom queue logic
} else {
wp.Submit(task)
}
That would however still result in potentially 1 task stuck in the WP queue, which may change priority until it is actually executed and therefore delay execution of higher priority tasks.
Would there be interest in adding a method to check the number of active workers?
Just as we have a StopWait()
api that will wait until all tasks in the queue are
a. submitted to the worker goroutines
b. wait until all of the existing + newly submitted goroutines have run to completion
and then shuts down the worker pool, I wonder if we others have requested a
Flush|FlushWait()
API that does the same as Stop|StopWait()
but doesn't shut down the workergroup, instead allowing the worker goroutines to continue to run and keep processing any tasks submitted after the calll to Flush|FlushWait()
.
The idea being that it may be useful to ensure that all currently executing tasks and queued tasks in a workerpool have completed, to ensure a known "sync" state, but then continue to run normally after that.
something like this
package main
import (
"fmt"
"github.com/gammazero/workerpool"
"time"
)
var isPaused = 1
func main() {
go func() {
time.Sleep(time.Second * 10)
isPaused = 0
}()
wp := workerpool.New(2)
requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"}
for _, r := range requests {
r := r
wp.Submit(func() {
pause()
fmt.Println("Handling request:", r)
})
}
wp.StopWait()
}
func pause() int {
if isPaused == 1 {
time.Sleep(time.Millisecond * 100)
return pause()
} else {
return 0
}
}
How do you surface errors in your tasks?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.