Comments (20)
See here for an excellent response!
from workerpool.
Also, here is the fixed version on go playground
from workerpool.
@gammazero I think this is great! Your code def makes way more sense (get rid of my crappy defaults, and your use of context is cleaner), but the issue is still happening..
From what I make of it, it just boils down to the fact that we can't reach into a goroutine and stop it from running (from the outside). We have to provide code inside the goroutine with a mechanism to exit (in the Job.Task func).
Either that or the foundation of my design is flawed.
Here is the same playground, but with the error I have been facing..but with job "b" using a different context than job "a".
Let's say I wanted job "b" to keep going and not cancel due to a separate, unaffiliated, job.
from workerpool.
Just one of many like it.
It's possible I am way off here, but I take that to mean: once I give workerpool a func()
(the Job.Task
, essentially), it will run said func()
until one of two things happen..
- the "
wp
" or "main" thread I have created a new workerpool object on is ended - the
func()
eventually runs until completion
So, the context is canceled, but how on earth would workerpool
know to kill that goroutine?
It makes sense that something like this is impossible (without providing code inside Job.Task
a way to stop itself)... how could we just reach over to X goroutine and kill it from Y goroutine?
I could be wrong, but that's what I've made of this after hours and hours on Google lol
from workerpool.
@gammazero - Could you please advice?
from workerpool.
Once a goroutine is started, there no way to kill it unless there is something inside that goroutine that is looking for a signal, waiting on a context or channel, etc. In the case of workerpool, and workerpoolxt, the pool is providing the goroutine and calling someone else's function in that goroutine. There is no telling if that function will ever return and allow your goroutine to finish or run another task. The only thing that workerpoolxt can do is to run the task function in another goroutine and wait for the task to finish or for a timer to expire or some other signal to give up waiting (such as context being canceled). When this signal to give up happens, the worker goroutine can return and report an error, but the task goroutine is still running somewhere in the background and may or may not ever finish. All you have done is given up waiting for it to do so.
The danger of letting this happen is that you could build up a number of abandoned goroutines that are still running, which defeats the purpose of workerpool (to limit the amount of concurrency). A way to deal with this is to wait for the task function to finish, and when the signal to give up waiting happens, return an error on a result channel and then go back to waiting (possibly forever) on the task function. This way a pool goroutine is still in use and not available to run more tasks until the task actually completes.
Another idea is to expose the result channel directly to the caller. This way they can wait for a result to show up for a much or as little as they want, and can wait in a select along with many other channels that. The can give up waiting whenever they want, and even go back and check the result channel later if they want. The caller can certainly just put their own result channel into their task function, but the added value of workerpoolxt is that when the result appears, they can also get the other information like task execution time from the result.
from workerpool.
@gammazero thank you! So I was on the right track and I did understand things correctly.
I think forcing all jobs to cancel if one times out is the easiest, most straight forward way to handle it (like your playground showed).
The only qualm I have is, if every job will be using the same context, as the fixed playground showed, wouldn't it make more sense to provide the context within the constructor?
from workerpool.
@gammazero this is what I mean by just providing the context via the constructor.
The end result is identical to passing a ctx along with each Job. On second thought... I am way off here... because passing the context along with each job will at least give each job a chance at running for X time (in the case of using timeouts). Otherwise it's like saying "sucks to be job 200 because if it doesn't start within X time it won't run at all".
Please correct me if I'm wrong, but providing context at the "pool" level versus at the "job" level is not an apples:apples comparison (like I thought). Your idea of providing context with each job, albeit the same context, is actually structurally sound.
It's the difference between:
- Using context at the pool level
// -> 48 jobs timed out
- ...And using context at the job level, albeit the same context
// -> 29 jobs timed out
from workerpool.
Or does it make more sense to do something like you did with pacer?
Like you said, treat jobs as their own thing? So a job just runs on a workerpool (job.run(wp)
)? I have been attempting to separate Job from WorkerPoolXT over the past week or so, and I'm beginning to think WorkerPoolXT is the limiting factor here.
In getting rid of WorkerPoolXT, there wouldn't be as much abstraction for consumers but in this case that isn't necessarily a bad thing. Less abstraction is almost needed.
Thoughts? What would you do?
from workerpool.
@oze4 Yes, like pacer is what I am think would work best. Pacer is concerned with spacing job execution in time. Wokerpool is concerned with limiting concurrency. These two things do not depend on each other, but can be used together.
pacedTask := pacer.Pace(func() {
fmt.Println("Hello World")
})
// I can run a paced task using a workerpool
wp := workerpool.New(5)
wp.Submit(pacedTask)
// Or, run it using a goroutine
go pacedTask()
I think it would be nice to do something similar with a monitored task. Also, it would be nice if the caller could choose to have the jobs send back responses on separate channels, or on the same channel.
wp3 := workerpool.New(3)
resultsChan := make(chan Result)
// Create a small job
smallJob := NewJob("smallfry", resultsChan, func() (interface{}, error) {
fmt.Println("Hello World")
return nil, nil
})
// Run as many small jobs as we want concurrently
go smallJob.Run()
// Create a big job that will timeout after an hour, or can be canceled.
ctx, cancel := context.WithoutTimeout(context.Background(), 60 * time.Minute)
defer cancel()
bigJob := NewJob("bigfish", resultsChan, func() (interface{}, error) {
mp, err := findUnknownMersennePrime(ctx)
if err != nil {
return nil, err
}
fmt.Println("found one:", mp)
return mp, nil
})
// Never run more than 3 big jobs at once
wp3.Submit(bigJob.Run)
And then you can wait for the all the jobs the same way, no matter how they were run.
timout := time.After(time.Minute)
for i := 0; i < 2; i++ {
select {
case result := <-resultsChan:
if result.Err() != nil {
return fmt.Errorf("job %q failed: %v", result.Name(), result.Err())
}
fmt.Println("job finished:", result.Name())
return nil
case <-timeout:
fmt.Println("tired of waiting - going to do some other stuff, will check back later")
break
}
}
...
The caller could also have chosen to use different channels for the different jobs.
This is only my opinion on one way to implement a flexible API, and is certainly not the only way to do this. It really depends on what kind of problem you are trying to solve. Here is a another approach that emulates JS Promises to give more ideas: https://github.com/chebyrash/promise
from workerpool.
@gammazero this is so clean!...and makes much more sense. Everything about it just feels way better.
You've prob been like "wtf is this guy doing?" haha
I've learned a ton from you/this package. Thank you.
from workerpool.
@gammazero I wanted to get your thoughts on something, whenever you have some free time.
In the fixed Playground you provided, I discovered that it is also broken. This is the same playground (with extra logging), and this is the same playground (no extra logging, but the point is it shows how all you have to do is sleep at the end and it breaks it).
I also found this gist from #34 which also has the same issue, as this playground shows.
I have been troubleshooting this, but I can't seem to find a way to stop a func once submitted.
Is it even possible to even stop a func from running after being submitted?
from workerpool.
Should workerpool accept a func with a ctx param? I have no idea if that is correct, but from what I've been learning that may be one resolution?
from workerpool.
from workerpool.
@gammazero Understood.
I guess what I meant was, can workerpool expose a way for me to stop it? Similar to how promise uses reject/resolve?
Like a SubmitWithContext(func(ctx context.Context))
method? And internally listen for ctx.Done in order to exit early if needed?
from workerpool.
Something like this, is what I had in mind (from the perspective of the caller):
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
// Calling done will kill the goroutine
wp.SubmitDone(func(done func()) {
go func(c context.Context, done func()) {
select {
case <-c.Done():
switch c.Err() {
default:
done() // Calling done will kill the goroutine
}
}
}(ctx, done)
// do whatever
done() // Calling done will kill the goroutine
})
I apologize if this is a stupid question/idea.
Edit: I don't want to continue to do dumb stuff, so if this is dumb, you can tell me it's dumb. Otherwise I'll keep being dumb and it'll be all your fault 😉
Edit 2: I got this idea from playing around with Promise.. This Playground (code below) is what I was doing (not sure if this is wrong as well)...
package main
import (
"context"
"errors"
"fmt"
"os"
"time"
"github.com/chebyrash/promise"
)
type someStruct struct {
Foo string
}
func getresults(c context.Context, reject func(error)) {
select {
case <-c.Done():
switch c.Err() {
default:
reject(c.Err())
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
defer cancel()
jobA := promise.New(func(resolve func(interface{}), reject func(error)) {
go getresults(ctx, reject)
time.Sleep(time.Second)
if true {
someval := someStruct{Foo: "bar"}
resolve(someval)
} else {
reject(errors.New("false"))
}
}).Then(func(somevalGetsPassedHere interface{}) interface{} {
// Do something with the value before returning if you want
//
// Type assertion
finalsomeval := somevalGetsPassedHere.(someStruct)
fmt.Println("Shows how we can access/manipulate fields BEFORE returning : ", finalsomeval.Foo)
return finalsomeval
}).Catch(func(err error) error {
// Do something with the error before returning if you want
fmt.Println("-There was an error-")
return err
})
// Blocks
r, e := jobA.Await()
if e != nil {
fmt.Println("Error : ", e.Error())
os.Exit(1)
}
result, ok := r.(someStruct)
if !ok {
fmt.Println("type assertion failed")
os.Exit(1)
}
fmt.Println(result)
os.Exit(0)
}
from workerpool.
from workerpool.
I promise I'm not trying to be a pain in the butt lol... I can't seem to wrap my head around one thing, though..
Since the goroutine which my func()
will run in is created by workerpool, is it not possible for workerpool to then provide an "escape hatch" for the caller to end that goroutine?
In my head, from the callers perspective, it would look like:
(workerpool provides a done
param much like Promise does with resolve
- the only difference is we don't care about capturing return, but capturing a response could easily be abstracted)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
// Calling done will kill the goroutine
wp := workerpool.New(10)
wp.SubmitDone(func(done func()) {
go func(c context.Context, done func()) {
select {
case <-c.Done():
switch c.Err() {
default:
done() // Calling done will kill the goroutine
}
}
}(ctx, done)
// do whatever
done() // Calling done will kill the goroutine
})
I plan on continuing to test code/etc until I understand, though. Not sure if this makes it worse, but I've been testing different ideas and I'm not sure if my implementation is wrong or if it's just not possible (hence these questions lol).
Have a good weekend - don't feel obligated to respond! Thanks for everything....again..
from workerpool.
from workerpool.
Disregard my (now deleted) comment. I need to do more testing/learning for myself before bugging you. I apologize. Have a good weekend!
from workerpool.
Related Issues (20)
- New release with `Pause` method HOT 3
- Race condition testing HOT 3
- Pause a workerpool through keyboard HOT 16
- How to avoid task queueing (handling priority) HOT 2
- support to change maxWorkers dynamically HOT 2
- codecov in .travis.yml - security issue HOT 1
- Possible memory leak HOT 8
- Document how to handle errors, please HOT 2
- Why startWorker() and not worker()? HOT 2
- [feature request] naming workerpools
- Ability configure idle timeout. HOT 1
- Cannot work with deque 0.2.0 HOT 1
- What is the purpose of "r := r"? HOT 2
- Make Workers open and close a database connection HOT 3
- Bug - strange behavior HOT 1
- Wait on Submit until queue has HOT 1
- Is it possible for this pool to experience queue overflow? HOT 2
- required: update golang version
- run in background
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from workerpool.