Giter Site home page Giter Site logo

Stop job while running about workerpool HOT 20 CLOSED

gammazero avatar gammazero commented on July 20, 2024
Stop job while running

from workerpool.

Comments (20)

oze4 avatar oze4 commented on July 20, 2024

See here for an excellent response!

from workerpool.

gammazero avatar gammazero commented on July 20, 2024

Also, here is the fixed version on go playground

from workerpool.

oze4 avatar oze4 commented on July 20, 2024

@gammazero I think this is great! Your code def makes way more sense (get rid of my crappy defaults, and your use of context is cleaner), but the issue is still happening..

From what I make of it, it just boils down to the fact that we can't reach into a goroutine and stop it from running (from the outside). We have to provide code inside the goroutine with a mechanism to exit (in the Job.Task func).

Either that or the foundation of my design is flawed.

Here is the same playground, but with the error I have been facing..but with job "b" using a different context than job "a".

Let's say I wanted job "b" to keep going and not cancel due to a separate, unaffiliated, job.

from workerpool.

oze4 avatar oze4 commented on July 20, 2024

Just one of many like it.

It's possible I am way off here, but I take that to mean: once I give workerpool a func() (the Job.Task, essentially), it will run said func() until one of two things happen..

  1. the "wp" or "main" thread I have created a new workerpool object on is ended
  2. the func() eventually runs until completion

So, the context is canceled, but how on earth would workerpool know to kill that goroutine?

It makes sense that something like this is impossible (without providing code inside Job.Task a way to stop itself)... how could we just reach over to X goroutine and kill it from Y goroutine?

I could be wrong, but that's what I've made of this after hours and hours on Google lol

from workerpool.

BredSt avatar BredSt commented on July 20, 2024

@gammazero - Could you please advice?

from workerpool.

gammazero avatar gammazero commented on July 20, 2024

Once a goroutine is started, there no way to kill it unless there is something inside that goroutine that is looking for a signal, waiting on a context or channel, etc. In the case of workerpool, and workerpoolxt, the pool is providing the goroutine and calling someone else's function in that goroutine. There is no telling if that function will ever return and allow your goroutine to finish or run another task. The only thing that workerpoolxt can do is to run the task function in another goroutine and wait for the task to finish or for a timer to expire or some other signal to give up waiting (such as context being canceled). When this signal to give up happens, the worker goroutine can return and report an error, but the task goroutine is still running somewhere in the background and may or may not ever finish. All you have done is given up waiting for it to do so.

The danger of letting this happen is that you could build up a number of abandoned goroutines that are still running, which defeats the purpose of workerpool (to limit the amount of concurrency). A way to deal with this is to wait for the task function to finish, and when the signal to give up waiting happens, return an error on a result channel and then go back to waiting (possibly forever) on the task function. This way a pool goroutine is still in use and not available to run more tasks until the task actually completes.

Another idea is to expose the result channel directly to the caller. This way they can wait for a result to show up for a much or as little as they want, and can wait in a select along with many other channels that. The can give up waiting whenever they want, and even go back and check the result channel later if they want. The caller can certainly just put their own result channel into their task function, but the added value of workerpoolxt is that when the result appears, they can also get the other information like task execution time from the result.

from workerpool.

oze4 avatar oze4 commented on July 20, 2024

@gammazero thank you! So I was on the right track and I did understand things correctly.

I think forcing all jobs to cancel if one times out is the easiest, most straight forward way to handle it (like your playground showed).

The only qualm I have is, if every job will be using the same context, as the fixed playground showed, wouldn't it make more sense to provide the context within the constructor?

from workerpool.

oze4 avatar oze4 commented on July 20, 2024

@gammazero this is what I mean by just providing the context via the constructor.

The end result is identical to passing a ctx along with each Job. On second thought... I am way off here... because passing the context along with each job will at least give each job a chance at running for X time (in the case of using timeouts). Otherwise it's like saying "sucks to be job 200 because if it doesn't start within X time it won't run at all".

Please correct me if I'm wrong, but providing context at the "pool" level versus at the "job" level is not an apples:apples comparison (like I thought). Your idea of providing context with each job, albeit the same context, is actually structurally sound.

It's the difference between:

from workerpool.

oze4 avatar oze4 commented on July 20, 2024

Or does it make more sense to do something like you did with pacer?

Like you said, treat jobs as their own thing? So a job just runs on a workerpool (job.run(wp))? I have been attempting to separate Job from WorkerPoolXT over the past week or so, and I'm beginning to think WorkerPoolXT is the limiting factor here.

In getting rid of WorkerPoolXT, there wouldn't be as much abstraction for consumers but in this case that isn't necessarily a bad thing. Less abstraction is almost needed.

Thoughts? What would you do?

from workerpool.

gammazero avatar gammazero commented on July 20, 2024

@oze4 Yes, like pacer is what I am think would work best. Pacer is concerned with spacing job execution in time. Wokerpool is concerned with limiting concurrency. These two things do not depend on each other, but can be used together.

pacedTask := pacer.Pace(func() {
    fmt.Println("Hello World")
})

// I can run a paced task using a workerpool
wp := workerpool.New(5)
wp.Submit(pacedTask)

// Or, run it using a goroutine
go pacedTask()

I think it would be nice to do something similar with a monitored task. Also, it would be nice if the caller could choose to have the jobs send back responses on separate channels, or on the same channel.

wp3 := workerpool.New(3)
resultsChan := make(chan Result)

// Create a small job
smallJob := NewJob("smallfry", resultsChan, func() (interface{}, error) {
    fmt.Println("Hello World")
    return nil, nil
})
// Run as many small jobs as we want concurrently
go smallJob.Run()

// Create a big job that will timeout after an hour, or can be canceled.
ctx, cancel := context.WithoutTimeout(context.Background(), 60 * time.Minute)
defer cancel()
bigJob  := NewJob("bigfish", resultsChan, func() (interface{}, error) {
    mp, err := findUnknownMersennePrime(ctx)
    if err != nil {
        return nil, err
    }
    fmt.Println("found one:", mp)
    return mp, nil
})
// Never run more than 3 big jobs at once
wp3.Submit(bigJob.Run)

And then you can wait for the all the jobs the same way, no matter how they were run.

timout := time.After(time.Minute)
for i := 0; i < 2; i++ {
    select {
    case result := <-resultsChan:
        if result.Err() != nil {
            return fmt.Errorf("job %q failed: %v", result.Name(), result.Err())
        }
        fmt.Println("job finished:", result.Name())
        return nil
    case <-timeout:
        fmt.Println("tired of waiting - going to do some other stuff, will check back later")
        break
    }
}
...

The caller could also have chosen to use different channels for the different jobs.

This is only my opinion on one way to implement a flexible API, and is certainly not the only way to do this. It really depends on what kind of problem you are trying to solve. Here is a another approach that emulates JS Promises to give more ideas: https://github.com/chebyrash/promise

from workerpool.

oze4 avatar oze4 commented on July 20, 2024

@gammazero this is so clean!...and makes much more sense. Everything about it just feels way better.

You've prob been like "wtf is this guy doing?" haha

I've learned a ton from you/this package. Thank you.

from workerpool.

oze4 avatar oze4 commented on July 20, 2024

@gammazero I wanted to get your thoughts on something, whenever you have some free time.

In the fixed Playground you provided, I discovered that it is also broken. This is the same playground (with extra logging), and this is the same playground (no extra logging, but the point is it shows how all you have to do is sleep at the end and it breaks it).

I also found this gist from #34 which also has the same issue, as this playground shows.

I have been troubleshooting this, but I can't seem to find a way to stop a func once submitted.

Is it even possible to even stop a func from running after being submitted?

from workerpool.

oze4 avatar oze4 commented on July 20, 2024

Should workerpool accept a func with a ctx param? I have no idea if that is correct, but from what I've been learning that may be one resolution?

from workerpool.

gammazero avatar gammazero commented on July 20, 2024

from workerpool.

oze4 avatar oze4 commented on July 20, 2024

@gammazero Understood.

I guess what I meant was, can workerpool expose a way for me to stop it? Similar to how promise uses reject/resolve?

Like a SubmitWithContext(func(ctx context.Context)) method? And internally listen for ctx.Done in order to exit early if needed?

from workerpool.

oze4 avatar oze4 commented on July 20, 2024

Something like this, is what I had in mind (from the perspective of the caller):

    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
    defer cancel()

    // Calling done will kill the goroutine
    wp.SubmitDone(func(done func()) {
        go func(c context.Context, done func()) {
            select {
            case <-c.Done():
                switch c.Err() {
                default:
                    done() // Calling done will kill the goroutine
                }
            }
        }(ctx, done)
        
        // do whatever

        done() // Calling done will kill the goroutine
    })

I apologize if this is a stupid question/idea.

Edit: I don't want to continue to do dumb stuff, so if this is dumb, you can tell me it's dumb. Otherwise I'll keep being dumb and it'll be all your fault 😉

Edit 2: I got this idea from playing around with Promise.. This Playground (code below) is what I was doing (not sure if this is wrong as well)...

package main

import (
	"context"
	"errors"
	"fmt"
	"os"
	"time"

	"github.com/chebyrash/promise"
)

type someStruct struct {
	Foo string
}

func getresults(c context.Context, reject func(error)) {
	select {
	case <-c.Done():
		switch c.Err() {
		default:
			reject(c.Err())
		}
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
	defer cancel()

	jobA := promise.New(func(resolve func(interface{}), reject func(error)) {
		go getresults(ctx, reject)

		time.Sleep(time.Second)

		if true {
			someval := someStruct{Foo: "bar"}
			resolve(someval)
		} else {
			reject(errors.New("false"))
		}

	}).Then(func(somevalGetsPassedHere interface{}) interface{} {
		// Do something with the value before returning if you want
		//
		// Type assertion
		finalsomeval := somevalGetsPassedHere.(someStruct)
		fmt.Println("Shows how we can access/manipulate fields BEFORE returning : ", finalsomeval.Foo)
		return finalsomeval
	}).Catch(func(err error) error {
		// Do something with the error before returning if you want
		fmt.Println("-There was an error-")
		return err
	})

	// Blocks
	r, e := jobA.Await()
	if e != nil {
		fmt.Println("Error : ", e.Error())
		os.Exit(1)
	}

	result, ok := r.(someStruct)
	if !ok {
		fmt.Println("type assertion failed")
		os.Exit(1)
	}

	fmt.Println(result)
	os.Exit(0)
}

from workerpool.

gammazero avatar gammazero commented on July 20, 2024

from workerpool.

oze4 avatar oze4 commented on July 20, 2024

I promise I'm not trying to be a pain in the butt lol... I can't seem to wrap my head around one thing, though..

Since the goroutine which my func() will run in is created by workerpool, is it not possible for workerpool to then provide an "escape hatch" for the caller to end that goroutine?

In my head, from the callers perspective, it would look like:

(workerpool provides a done param much like Promise does with resolve - the only difference is we don't care about capturing return, but capturing a response could easily be abstracted)

    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
    defer cancel()

    // Calling done will kill the goroutine
    wp := workerpool.New(10)
    wp.SubmitDone(func(done func()) {
        go func(c context.Context, done func()) {
            select {
            case <-c.Done():
                switch c.Err() {
                default:
                    done() // Calling done will kill the goroutine
                }
            }
        }(ctx, done)
        
        // do whatever

        done() // Calling done will kill the goroutine
    })

I plan on continuing to test code/etc until I understand, though. Not sure if this makes it worse, but I've been testing different ideas and I'm not sure if my implementation is wrong or if it's just not possible (hence these questions lol).

Have a good weekend - don't feel obligated to respond! Thanks for everything....again..

from workerpool.

gammazero avatar gammazero commented on July 20, 2024

from workerpool.

oze4 avatar oze4 commented on July 20, 2024

Disregard my (now deleted) comment. I need to do more testing/learning for myself before bugging you. I apologize. Have a good weekend!

from workerpool.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.