Giter Site home page Giter Site logo

workerpool's Issues

Losing tasks?

Running the example from README with go 1.11 on macOS 10.13.6, I see that in many runs, it mostly miss many requests, how so?

$ ./foo
Handling request: beta
Handling request: alpha
$ ./foo
Handling request: beta
Handling request: alpha
$ ./foo
Handling request: beta
Handling request: gamma
Handling request: delta
Handling request: epsilon
Handling request: alpha
$ ./foo
Handling request: beta
Handling request: alpha
$ ./foo
Handling request: beta
Handling request: alpha
$ ./foo
Handling request: beta
Handling request: alpha

Possible memory leak

I use this package in my application and I observe that the memory consumption is increased over time.
I added goleak to search for leaks in one of the existing tests and found the following:

Goroutine 6 in state select, with github.com/gammazero/workerpool.(*WorkerPool).dispatch on top of the stack:
        goroutine 6 [select]:
        github.com/gammazero/workerpool.(*WorkerPool).dispatch(0x14000108080)
        	/Users/myuser/ws/workerpool/workerpool.go:184 +0x180
        created by github.com/gammazero/workerpool.New
        	/Users/myuser/ws/workerpool/workerpool.go:37 +0x108
        ]

Race condition testing

FYI

I noticed the following tests are inconsistent/broken..

  • TestWaitingQueueSizeRace
  • TestStopRace
# fails when count > ~30
go test -race -run ^TestWaitingQueueSizeRace$ -count=100

# this fails sometimes (run back to back to back, etc..)
go test -race -run ^TestWaitingQueueSizeRace$ -count=30

# without `-race` you need a high count, and run over and over
go test -run ^TestStopRace$ -count=2000

# this fails every time (I have yet to get it to pass)
go test -race -run ^TestStopRace$ -count=100

# this passes sometimes
go test -race -run ^TestStopRace$ -count=40

Output:

  • Example of how you need to run over and over to reveal inconsistencies
MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
PASS
ok      github.com/gammazero/workerpool 0.180s
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
PASS
ok      github.com/gammazero/workerpool 0.177s
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
--- FAIL: TestStopRace (0.00s)
    workerpool_test.go:331: Stop should not return in any goroutine
FAIL
exit status 1
FAIL    github.com/gammazero/workerpool 0.172s
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
PASS
ok      github.com/gammazero/workerpool 0.168s
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
--- FAIL: TestStopRace (0.00s)
    workerpool_test.go:331: Stop should not return in any goroutine
--- FAIL: TestStopRace (0.00s)
    workerpool_test.go:331: Stop should not return in any goroutine
FAIL
exit status 1
FAIL    github.com/gammazero/workerpool 0.169s
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$
MY-COMPUTER:workerpool user$

SubmitWait on a worker level

Hello, first of all nice package!

I have a question regarding usage: I'm using RabbitMQ as a task queue, and I would like to run multiple workers that fetch from that task queue.

Now what I would like to achieve is to ack messages (and pull new messages) once a single worker is finished with a task and ready to pull new.

What I learned from the code is that when using Submit() you are building up a queue in memory - this is where I want to use RabbitMQ for persisted messages.

Now when using SubmitWait() it seems that the whole pool is stopping and waiting. Is this intended?

I expected that the worker that is currently running the task is blocking the function call, but all other workers would keep running (and pulling messages when free).

I tried a pool of 8 workers, but when using SubmitWait() everything still gets triggered in a serial fashion.

Start versioning project

Thanks for the great project! It's really useful. I have a suggestion, you might want to start versioning this package. This makes it easier to make dependencies more explicit in other go projects, and provides more information regarding changes to the code.

Race between Submit and Stop

Submit() tries to send message to taskQueue

func (p *WorkerPool) Submit(task func()) {
	if task != nil {
		p.taskQueue <- task
	}
}

https://github.com/gammazero/workerpool/blob/master/workerpool.go#L109

stop() closes the taskQueue.

func (p *WorkerPool) stop(wait bool) {
	p.stopOnce.Do(func() {
		// Signal that workerpool is stopping, to unpause any paused workers.
		close(p.stopSignal)
		// Acquire stopLock to wait for any pause in progress to complete. All
		// in-progress pauses will complete because the stopSignal unpauses the
		// workers.
		p.stopLock.Lock()
		// The stopped flag prevents any additional paused workers. This makes
		// it safe to close the taskQueue.
		p.stopped = true
		p.stopLock.Unlock()
		p.wait = wait
		// Close task queue and wait for currently running tasks to finish.
		close(p.taskQueue)
	})
	<-p.stoppedChan
}

Both of these can be happening in separate goroutines, and it can trigger a panic of writing to a closed chan.

support to change maxWorkers dynamically

We really want the feature to change maxWorkers dynamically. Any plan to support this feature? I try to implement a SetMaxWorkers for WorkerPool but find that the implementation of Pause rely on a static value of maxWorkers which make the situation complicated.

How to get execution time of each job & run each job with a timeout?

Putting this here for visibility...

Example of how to record "execution duration" + run each job with an "auto cancelling" timeout.

I understand it is not mint code, so please feel free to suggest changes, or supply your own improved version.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/gammazero/workerpool"
)

// JobResult holds a jobs result
type JobResult struct {
	// Public
	Data interface{}

	// Private
	err     error
	runtime time.Duration
	name    string
}

// Name returns name. It is written like this so the consumer
// cannot change the name outside of supplying one via the Job
func (jr *JobResult) Name() string {
	return jr.name
}

// Runtime returns job execution runtime
func (jr *JobResult) Runtime() time.Duration {
	return jr.runtime
}

// Error holds job errors, if any
func (jr *JobResult) Error() error {
	return jr.err
}

// SetError sets an error on our result
func (jr *JobResult) SetError(e error) {
	jr.err = e
}

// Job holds job data
type Job struct {
	Name string
	Task func() JobResult
}

func wrapJob(timeout time.Duration, resultsChan chan JobResult, job Job) func() {
	// Create our context with timeout per job
	timeoutContext, timeoutCancel := context.WithTimeout(context.Background(), timeout)

	return func() {
		timerStart := time.Now()
		// Start goroutine using our context, which contains our timeout.
		go func(ctx context.Context, done context.CancelFunc, resChan chan JobResult, todo Job, startTime time.Time) {
			// Get result from job
			result := todo.Task()

			// Set name & execution time after job completion
			result.runtime = time.Since(startTime)
			result.name = todo.Name

			// If the timeout has been hit then `timeoutContext.Err()`
			// will be != nil and we should not send it on our results chan.
			//
			// Without this check we would send this job twice due to the fact
			// we cannot cancel in-flight requests.
			//
			// Lets say we have a long running task, how would we cancel it
			// in-flight? Whether http request or simply running `time.Sleep(time.Hour*999999)`?
			//
			// Instead we just don't do anything with the return, hence this check.
			if timeoutContext.Err() == nil {
				resChan <- result
			}

			// Forcefully cancel our context.
			// Cancelling forcefully is not bad, essentially it means success
			done()
		}(timeoutContext, timeoutCancel, resultsChan, job, timerStart)

		select {
		// If our timeout is hit *or* cancelled forcefully, we wind up here...
		case <-timeoutContext.Done():
			// ...that is why we check for error
			switch timeoutContext.Err() {
			// ...specifically the timeout error.
			case context.DeadlineExceeded:
				// Place a result on our results channel that contains
				// an error, which we can check for later.
				resultsChan <- JobResult{
					err:     context.DeadlineExceeded,
					name:    job.Name,
					runtime: time.Since(timerStart),
				}
			}
		}
	}
}

var jobs = []Job{{
	Name: "job1",
	Task: func() JobResult {
		// THIS JOB WILL ERROR ON PURPOSE
		// This will surpass our timeout and should get cancelled
		// ...you can do whatever you want in these jobs
		time.Sleep(time.Second * 3)
		// Don't have to set the name here
		return JobResult{Data: map[string]string{"Whatever": "You want"}}
	}}, {
	Name: "job2",
	Task: func() JobResult {
		// THIS JOB WILL SUCCEED
		time.Sleep(time.Millisecond * 300)
		resultFromCurl := "i am a result"
		return JobResult{Data: resultFromCurl}
	}},
}

func main() {
	// Set timeout here (or per job)
	jobTimeout := time.Duration(time.Second * 1) // 1 second

	// Create results channel with T type where T is whatever type you need
	jobResultsChannel := make(chan JobResult, len(jobs))

	// Create workerpool
	numWorkers := 10
	pool := workerpool.New(numWorkers)

	// Submit jobs to workerpool using our wrapper func
	for _, job := range jobs {
		pool.Submit(wrapJob(jobTimeout, jobResultsChannel, job))
	}

	// Wait for jobs to finish and close results channel
	pool.StopWait()
	close(jobResultsChannel)

	// Do whatever you want with results
	for jobResult := range jobResultsChannel {
		runTime := int(jobResult.Runtime() / time.Millisecond)
		str := "[%dms] : '%s' : JobSuccess : %s\n"
		data := jobResult.Data

		if jobResult.Error() != nil { // You should always check for errors
			str = "[%dms] : '%s' : JobError : %s\n"
			data = jobResult.Error()
		}

		fmt.Printf(str, runTime, jobResult.Name(), data)
	}
}
//// Output:
// [303ms] 'job2' : JobSuccess : i am a result
// [1001ms] 'job1' : JobError : context deadline exceeded

Submitting a task to a specific worker

A concurrent put to a same dynamodb resource fails in aws-sdk-go so i was thinking if i could redirect put calls to specific workers (1 worker for each table and there can be a lot of tables) i would be able to solve this problem

Pause a workerpool through keyboard

Hello dear developer, I'm looking for a way to pause a workerpool with pressing some key("P" as an example) and resume work with the same key. And I want to completely stop workerpooI with another key(maybe with "S" key). I have construction as an your example

Stop job while running

Is there a way to stop jobs in mid-execution?

I switched workerpoolxt to use context but the job still runs even though the context has been cancelled... I'm not really sure how to fix this, or if it is even possible.

I have created a POC that reproduces this issue (code below) which you can also view/run on The Go Playground

Any help would be greatly appreciated!!

  • Current output
0 from job a
1 from job a
Job 'a' should have stopped here
2 from job a
3 from job a
4 from job a
[{a context deadline exceeded <nil>} {b <nil> from b}]
  • Expected output
0 from job a
1 from job a
Job 'a' should have stopped here
[{a context deadline exceeded <nil>} {b <nil> from b}]

POC Code:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/gammazero/workerpool"
)

func main() {
	runner := newRunner(context.Background(), 10)

	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second*3))
	defer cancel()

	runner.do(job{
		Name:    "a",
		Context: ctx,
		Task: func() jobResult {
			for i := 0; i < 10000; i++ {
				time.Sleep(time.Second * 1)
				fmt.Println(i, "from job a")
			}
			return jobResult{Data: "from a"}
		},
	})

	runner.do(job{
		Name: "b",
		Task: func() jobResult {
			time.Sleep(time.Duration(time.Second * 6))
			return jobResult{Data: "from b"}
		},
	})

	results := runner.getjobResults()
	fmt.Println(results)
}

type runner struct {
	*workerpool.WorkerPool
	defaultCtx context.Context
	kill       chan struct{}
	result     chan jobResult
	results    []jobResult
}

func (r *runner) processResults() {
	for {
		select {
		case res, ok := <-r.result:
			if !ok {
				goto Done
			}
			r.results = append(r.results, res)
		}
	}
Done:
	<-r.kill
}

func newRunner(ctx context.Context, numRunners int) *runner {
	r := &runner{
		WorkerPool: workerpool.New(numRunners),
		kill:       make(chan struct{}),
		result:     make(chan jobResult),
		defaultCtx: ctx,
	}
	go r.processResults()
	return r
}

func (r *runner) do(j job) {
	r.Submit(r.wrap(&j))
}

func (r *runner) getjobResults() []jobResult {
	r.StopWait()
	close(r.result)
	r.kill <- struct{}{}
	return r.results
}

func (r *runner) wrap(job *job) func() {
	return func() {
		if job.Context == nil {
			job.Context = r.defaultCtx
		}
		job.childContext, job.done = context.WithCancel(job.Context)
		job.result = make(chan jobResult)
		go job.Run()
		r.result <- job.getResult()
	}
}

type job struct {
	Name         string
	Task         func() jobResult
	Context      context.Context
	result       chan jobResult
	childContext context.Context
	stopped      chan struct{}
	done         context.CancelFunc
}

func (j *job) Run() {
	result := j.Task()
	result.name = j.Name
	j.result <- result
	j.done()
}

func (j *job) getResult() jobResult {
	select {
	case r := <-j.result:
		return r
	case <-j.childContext.Done():
		fmt.Printf("Job '%s' should have stopped here\n", j.Name)
		switch j.childContext.Err() {
		default:
			return jobResult{name: j.Name, Error: j.childContext.Err()}
		}
	}
}

type jobResult struct {
	name  string
	Error error
	Data  interface{}
}

Get response from all worker pulls

Hi,

I want to run multiple requests and get one response (struct or something) that all done.
I mean if I've 10 request and the longest took 10 sec i want after 10 seconds get the all the responses with id or something for each request to know who success and who has failed, is it possible ?

thanks

Wait on Submit until queue has

Hi,
I'm in a need of blocking Submit until the queue of tasks is small enough.
I imagine something like SubmitBlock( ctx context.Context, minQueueDepth uint, f func()) that waits for the queue to be small enough, inserts the func and immediately returns.

Would you like to add something like that or accept a PR?

What is the purpose of "r := r"?

Hi there,

I'm using your library on every project that I have. But there is a line in your example that I don't understand. I'm not an expert on Go, maybe it's something very basic. So forgive me if it's a silly question.

for _, r := range requests {
		r := r
		wp.Submit(func() {
			fmt.Println("Handling request:", r)
		})
	}

Why we are re-defining r there at r := r? We already have the value of r. The code doesn't work without that line. It became a mystery for me.

Ability configure idle timeout.

Currently, idle timeout is hardcoded, but I encountered a use case where it will be useful to change it to a higher number. Any chance you can accept a PR for this @gammazero?

Possible memory leak?

When I use something like this

package main

import (
	"fmt"
	"time"
	"github.com/gammazero/workerpool"
)

func main() {
	wp := workerpool.New(2)
	z := 0
	for {
		r := z
		wp.Submit(func() {
			fmt.Println("Handling request:", r)
			time.Sleep(time.Second * 3)
		})
		z++
	}
	wp.StopWait()
}

leak

Why startWorker() and not worker()?

Hello!

Why are you using startWorker() and not worker()?

workerpool/workerpool.go

Lines 192 to 200 in 85cc841

case p.workerQueue <- task:
default:
// Create a new worker, if not at max.
if workerCount < p.maxWorkers {
wg.Add(1)
go startWorker(task, p.workerQueue, &wg)
workerCount++
} else {
// Enqueue task to be executed by next available worker.

Maybe you have some examples, how I can pass db connection to worker? Not to specific task.
Main idea is: Up some workers with 1 connection per worker and do sql requests(tasks) using worker sql connection.

Bug - strange behavior

Ubuntu 20.04 LTS
go version go1.19.3 linux/amd64
github.com/gammazero/workerpool v1.1.3

package main

import (
	"fmt"
	"github.com/gammazero/workerpool"
)

type JobAccount struct {
	Id       int
	Username string
}

func main() {
	list := []string{
		"demo1", "demo2", "demo3",
	}

	var accounts []*JobAccount

	for k, v := range list {
		accounts = append(accounts, &JobAccount{
			Id:       k,
			Username: v,
		})
	}

	wp := workerpool.New(2)

	for _, account := range accounts {
		fmt.Printf("loop %+v\n", account.Username)

		wp.Submit(func() {
			fmt.Printf("go rountine %+v\n", account.Username)
		})
	}

	wp.StopWait()
}

Here actual output

loop demo1
go rountine demo1
loop demo2
loop demo3
go rountine demo3
go rountine demo3

I was expecting

loop demo1
go rountine demo1
loop demo2
loop demo3
go rountine demo2
go rountine demo3

How to monitor progression on pool?

How do I say tracking progress on which job is done and % of done job overall? Like displaying completed, queued, waiting to be queued jobs.

Delay between workers?

I was looking at your implementation for a worker pool in Go and I really like the way you approached it; in that you can submit at any time and it adds it to the queue. Most implementations seem to operate off of a set of known tasks a priori.

Secondly, it doesn't delay the submission channel and this is very important because in my case if I lag even just a few milliseconds (or 10s of milliseconds) from the event firing, I will miss the next event. So far so good.

However, what I am struggling to figure out is how to put a delay in somewhere so that I can set the time between executions for each task without interfering with the incoming events. This delay is especially needed for the first set of workers that get added to the pool because they get fired off simultaneously. I need at least some spacing for the web server to breathe a bit before I lay siege on it. ;)

Any suggestions on where I can add this delay?

Make Workers open and close a database connection

Hi Gammazero,

i'm using your great workerpool, and i have a problem. I use the submit() function to add jobs that are processed by the workers. But these workers need to perform database tasks. When i submit my worker function and put the db.Open and db.Close inside, i have too many connections.

My Question is now : is it possible to let the workers themselves open and close the connections, so that i always have the limited number of connections open. That would be awesome

Thanks in advance
Adrian

Should Stop() be renamed or split into Close() and Wait()?

Stumble upon this lib and it looks good but I have a concern the Stop() method could be named better.

When I read example code I had impression Stop() would wait for current processes to complete, clear execution tasks queue and quit. But in reality seems it's wait until all tasks completed.

Should not it be named something like Close() or CloseAndWait() or something?

May be even better if there are 2 methods Close() to signal there are no more tasks and Wait() to wait for tasks completion.

[feature request] naming workerpools

First, so many thanks for your workerpool module that is exactly what I've been looking for!

In my projects I tend to use several worker pools and I use them for transient work only, so these are not worker pools kept around all the time a service is up. Instead, I create worker pools as needed for individual requests (and there are some reasons for not sharing certain workers across different service requests).

When unit testing my service handlers for goroutine leaks (using Gomega's at this time still brand new gleak goroutine leak tester) I noticed that it would be quite nice for diagnosing if workerpools and their workers could be individually identified by some name. The name of a worker could simply be the one of its pool for simplicity. Would you be interested in a PR?

How to avoid task queueing (handling priority)

In a use case I'd like to use this workerpool for I have a need for a more granular control over the queue (e.g. set priorities for tasks, which may also repeatedly change while the task in waiting in the queue).

I can implement the queueing logic myself, outside of this library, but I don't see how I could prevent the library from queueing the tasks within itself.

The only way there seems to be is by checking whether the queue is not empty and prevent submitting new tasks, e.g.

if wp.WaitingQueueSize() > 0 {
  // custom queue logic
} else {
  wp.Submit(task)
}

That would however still result in potentially 1 task stuck in the WP queue, which may change priority until it is actually executed and therefore delay execution of higher priority tasks.

Would there be interest in adding a method to check the number of active workers?

Consider providing a `FlushWait()` API

Just as we have a StopWait() api that will wait until all tasks in the queue are
a. submitted to the worker goroutines
b. wait until all of the existing + newly submitted goroutines have run to completion
and then shuts down the worker pool, I wonder if we others have requested a

Flush|FlushWait() API that does the same as Stop|StopWait() but doesn't shut down the workergroup, instead allowing the worker goroutines to continue to run and keep processing any tasks submitted after the calll to Flush|FlushWait().

The idea being that it may be useful to ensure that all currently executing tasks and queued tasks in a workerpool have completed, to ensure a known "sync" state, but then continue to run normally after that.

What the best way for pause pool?

something like this

package main

import (
	"fmt"
	"github.com/gammazero/workerpool"
	"time"
)

var isPaused = 1

func main() {
	go func() {
		time.Sleep(time.Second * 10)
		isPaused = 0
	}()
	wp := workerpool.New(2)
	requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"}

	for _, r := range requests {
		r := r
		wp.Submit(func() {
			pause()
			fmt.Println("Handling request:", r)
		})
	}

	wp.StopWait()
}

func pause() int {
	if isPaused == 1 {
		time.Sleep(time.Millisecond * 100)
		return pause()
	} else {
		return 0
	}
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.