class Dynflow::Executors::Parallel::Core

Attributes

logger[R]

Public Class Methods

new(world, pool_size) click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 8
def initialize(world, pool_size)
  @logger     = world.logger
  @world      = Type! world, World
  @pool       = Pool.spawn('pool', reference, pool_size, world.transaction_adapter)
  @terminated = nil
  @director   = Director.new(@world)
end

Public Instance Methods

dead_letter_routing() click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 56
def dead_letter_routing
  @world.dead_letter_handler
end
finish_termination() click to toggle source
Calls superclass method Dynflow::Actor#finish_termination
# File lib/dynflow/executors/parallel/core.rb, line 50
def finish_termination
  @director.terminate
  logger.error '... Dynflow Core has shut down.'
  super
end
handle_event(event) click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 25
def handle_event(event)
  Type! event, Director::Event
  if terminating?
    raise Dynflow::Error,
          "cannot accept event: #{event} core is terminating"
  end
  feed_pool(@director.handle_event(event))
end
handle_execution(execution_plan_id, finished) click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 16
def handle_execution(execution_plan_id, finished)
  if terminating?
    raise Dynflow::Error,
          "cannot accept execution_plan_id:#{execution_plan_id} core is terminating"
  end

  feed_pool(@director.start_execution(execution_plan_id, finished))
end
handle_persistence_error(error) click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 38
def handle_persistence_error(error)
  logger.fatal "PersistenceError in executor: terminating"
  logger.fatal error
  @world.terminate
end
start_termination(*args) click to toggle source
Calls superclass method Dynflow::Actor#start_termination
# File lib/dynflow/executors/parallel/core.rb, line 44
def start_termination(*args)
  super
  logger.info 'shutting down Dynflow Core ...'
  @pool.tell([:start_termination, Concurrent.future])
end
work_finished(work) click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 34
def work_finished(work)
  feed_pool(@director.work_finished(work))
end

Private Instance Methods

feed_pool(work_items) click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 68
def feed_pool(work_items)
  return if terminating?
  return if work_items.nil?
  work_items = [work_items] if work_items.is_a? Director::WorkItem
  work_items.all? { |i| Type! i, Director::WorkItem }
  work_items.each { |new_work| @pool.tell([:schedule_work, new_work]) }
end
on_message(message) click to toggle source
Calls superclass method Dynflow::MethodicActor#on_message
# File lib/dynflow/executors/parallel/core.rb, line 62
def on_message(message)
  super
rescue Errors::PersistenceError => e
  self.tell([:handle_persistence_error, e])
end