Giter Site home page Giter Site logo

Async task on subscribe about iodine HOT 3 CLOSED

EugeneIstomin avatar EugeneIstomin commented on May 23, 2024
Async task on subscribe

from iodine.

Comments (3)

boazsegev avatar boazsegev commented on May 23, 2024 1

Hi @EugeneIstomin ,

I am not sure what you mean by async in your specific case.

Iodine.run will schedule a task / proc to be performed later. However, that task is still performed within an iodine worker thread, so when you call sleep 5 that task will block the worker thread. If you have only one thread, that task will block iodine completely.

Consider the following as a example:

module AsyncMe
  @thread = nil
  @queue = Queue.new
  @performed_once_and_only_once = Mutex.new
  def self.validate_subscription
    return unless @performed_once_and_only_once.try_lock
    Iodine.subscribe(:events, &AsyncMe.method(:async_task))
    @thread = Thread.new do
      while(true) do
        (@queue.shift).call
      end
    end
  end

  def self.async_task(ch, msg)
    unless Iodine.master?
      Iodine.unsubscribe(:events)
      return
    end
    @queue << Proc.new do
      p Process.pid, 1111
      sleep 5
      p 5555
      Iodine.run { p "done" }
    end
    p "scheduled"
  end
end

# forking mode
Iodine.on_state(:enter_master, &AsyncMe.method(:validate_subscription))
# non-forking mode
Iodine.on_state(:on_start, &AsyncMe.method(:validate_subscription))

As you can see, there's no need to spawn a different thread per task (which is a security risk), instead spawn additional worker thread(s) and manage a queue (cleanup code is missing from the example).

The Iodine.run is used to defer a task within the iodine reactor, which might be useful for small tasks or for breaking CPU heavy tasks into smaller chunks (preventing a thread being blocked)... but it's not designed to run CPU heavy tasks.

Good Luck!

from iodine.

boazsegev avatar boazsegev commented on May 23, 2024 1

Yes, the USR1 signal is used by Iodine to signal a hot-restart (the master process shuts down and respawns the workers).

If you're trying to run cleanup code, consider using Iodine.on_state(:on_finish) { ... } or Iodine.on_state(:start_shutdown) { ... }.

My example might become:

module AsyncMe
  @thread = nil
  @queue = Queue.new
  @performed_once_and_only_once = Mutex.new
  @flag = true
  def self.validate_subscription
    return unless @performed_once_and_only_once.try_lock
    Iodine.subscribe(to: "*", match: :redis, &AsyncMe.method(:async_task))
    @thread = Thread.new do
      while(@flag) do
        (@queue.shift).call
      end
    end
  end

  def self.async_task(ch, msg)
    unless Iodine.master?
      Iodine.unsubscribe(to: "*", match: :redis)
      return
    end
    @queue << Proc.new do
      p Process.pid, 1111
      sleep 5
      p 5555
      Iodine.run { p "done" }
    end
    p "scheduled"
  end

  def self.cleanup
    return unless @thread
    @flag = false
    @queue << Proc.new { puts "Async worker done." }
    @thread.join
    puts "Async worker finished." 
  end
end

# forking mode
Iodine.on_state(:enter_master, &AsyncMe.method(:validate_subscription))
# non-forking mode
Iodine.on_state(:on_start, &AsyncMe.method(:validate_subscription))
# cleanup
Iodine.on_state(:on_finish, &AsyncMe.method(:cleanup))

from iodine.

EugeneIstomin avatar EugeneIstomin commented on May 23, 2024

Thanks, works great.
One problem i discovered is a USR1 trapping:

current handler ( old_sigusr1 = Signal.trap("SIGUSR1") { old_sigusr1.call if old_sigusr1.respond_to?(:call) }) do not kill the older threads, and trapping directly in thread do not run old_sigusr1

    def subscriptionCaller
      Iodine.subscribe("#{ENV['memexDomain']}::calls", &Messagebus.method(:asyncTask))
      @thread = Thread.new do
        Signal.trap("USR1") {@thread.kill}
        while(true) do
          (@queue.shift).call
        end
      end
    end

from iodine.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.