class Dynflow::Executors::Parallel::Pool

Public Class Methods

new(core, pool_size, transaction_adapter) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 38
def initialize(core, pool_size, transaction_adapter)
  @executor_core = core
  @pool_size     = pool_size
  @free_workers  = Array.new(pool_size) { |i| Worker.spawn("worker-#{i}", reference, transaction_adapter) }
  @jobs          = JobStorage.new
end

Public Instance Methods

handle_persistence_error(error) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 56
def handle_persistence_error(error)
  @executor_core.tell([:handle_persistence_error, error])
end
schedule_work(work) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 45
def schedule_work(work)
  @jobs.add work
  distribute_jobs
end
start_termination(*args) click to toggle source
Calls superclass method Dynflow::Actor#start_termination
# File lib/dynflow/executors/parallel/pool.rb, line 60
def start_termination(*args)
  super
  try_to_terminate
end
worker_done(worker, work) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 50
def worker_done(worker, work)
  @executor_core.tell([:work_finished, work])
  @free_workers << worker
  distribute_jobs
end

Private Instance Methods

distribute_jobs() click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 75
def distribute_jobs
  try_to_terminate
  @free_workers.pop << @jobs.pop until @free_workers.empty? || @jobs.empty?
end
try_to_terminate() click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 67
def try_to_terminate
  if terminating? && @free_workers.size == @pool_size
    @free_workers.map { |worker| worker.ask(:terminate!) }.map(&:wait)
    @executor_core.tell(:finish_termination)
    finish_termination
  end
end