class Dynflow::Director

Director is responsible for telling what to do next when:

* new execution starts
* an event accurs
* some work is finished

It's public methods (except terminate) return work items that the executor should understand

Constants

Event
UnprocessableEvent

Attributes

logger[R]

Public Class Methods

new(world) click to toggle source
# File lib/dynflow/director.rb, line 79
def initialize(world)
  @world = world
  @logger = world.logger
  @execution_plan_managers = {}
  @plan_ids_in_rescue = Set.new
end

Public Instance Methods

handle_event(event) click to toggle source
# File lib/dynflow/director.rb, line 92
def handle_event(event)
  Type! event, Event
  execution_plan_manager = @execution_plan_managers[event.execution_plan_id]
  if execution_plan_manager
    execution_plan_manager.event(event)
  else
    raise Dynflow::Error, "no manager for #{event.inspect}"
  end
rescue Dynflow::Error => e
  event.result.fail e.message
  raise e
end
start_execution(execution_plan_id, finished) click to toggle source
# File lib/dynflow/director.rb, line 86
def start_execution(execution_plan_id, finished)
  manager = track_execution_plan(execution_plan_id, finished)
  return [] unless manager
  unless_done(manager, manager.start)
end
terminate() click to toggle source
# File lib/dynflow/director.rb, line 110
def terminate
  unless @execution_plan_managers.empty?
    logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..."
    begin
      @execution_plan_managers.values.each do |manager|
        manager.terminate
      end
    rescue Errors::PersistenceError
      logger.error "could not to clean the data properly"
    end
    @execution_plan_managers.values.each do |manager|
      finish_manager(manager)
    end
  end
end
work_finished(work) click to toggle source
# File lib/dynflow/director.rb, line 105
def work_finished(work)
  manager = @execution_plan_managers[work.execution_plan_id]
  unless_done(manager, manager.what_is_next(work))
end

Private Instance Methods

finish_manager(manager) click to toggle source
# File lib/dynflow/director.rb, line 138
def finish_manager(manager)
  @execution_plan_managers.delete(manager.execution_plan.id)
  if rescue?(manager)
    rescue!(manager)
  else
    set_future(manager)
  end
end
rescue!(manager) click to toggle source
# File lib/dynflow/director.rb, line 153
def rescue!(manager)
  # TODO: after moving to concurrent-ruby actors, there should be better place
  # to put this logic of making sure we don't run rescues in endless loop
  @plan_ids_in_rescue << manager.execution_plan.id
  rescue_plan_id = manager.execution_plan.rescue_plan_id
  if rescue_plan_id
    @world.executor.execute(rescue_plan_id, manager.future, false)
  else
    set_future(manager)
  end
end
rescue?(manager) click to toggle source
# File lib/dynflow/director.rb, line 147
def rescue?(manager)
  return false if @world.terminating?
  @world.auto_rescue && manager.execution_plan.state == :paused &&
    !@plan_ids_in_rescue.include?(manager.execution_plan.id)
end
set_future(manager) click to toggle source
# File lib/dynflow/director.rb, line 185
def set_future(manager)
  @plan_ids_in_rescue.delete(manager.execution_plan.id)
  manager.future.success manager.execution_plan
end
track_execution_plan(execution_plan_id, finished) click to toggle source
# File lib/dynflow/director.rb, line 165
def track_execution_plan(execution_plan_id, finished)
  execution_plan = @world.persistence.load_execution_plan(execution_plan_id)

  if @execution_plan_managers[execution_plan_id]
    raise Dynflow::Error,
          "cannot execute execution_plan_id:#{execution_plan_id} it's already running"
  end

  if execution_plan.state == :stopped
    raise Dynflow::Error,
          "cannot execute execution_plan_id:#{execution_plan_id} it's stopped"
  end

  @execution_plan_managers[execution_plan_id] =
      ExecutionPlanManager.new(@world, execution_plan, finished)
rescue Dynflow::Error => e
  finished.fail e
  nil
end
unless_done(manager, work_items) click to toggle source
# File lib/dynflow/director.rb, line 128
def unless_done(manager, work_items)
  return [] unless manager
  if manager.done?
    finish_manager(manager)
    return []
  else
    return work_items
  end
end