class Dynflow::Director
Director is responsible for telling what to do next when:
* new execution starts * an event accurs * some work is finished
It's public methods (except terminate) return work items that the executor should understand
Constants
- Event
- UnprocessableEvent
Attributes
logger[R]
Public Class Methods
new(world)
click to toggle source
# File lib/dynflow/director.rb, line 79 def initialize(world) @world = world @logger = world.logger @execution_plan_managers = {} @rescued_steps = {} end
Public Instance Methods
handle_event(event)
click to toggle source
# File lib/dynflow/director.rb, line 92 def handle_event(event) Type! event, Event execution_plan_manager = @execution_plan_managers[event.execution_plan_id] if execution_plan_manager execution_plan_manager.event(event) else raise Dynflow::Error, "no manager for #{event.inspect}" end rescue Dynflow::Error => e event.result.fail e.message raise e end
start_execution(execution_plan_id, finished)
click to toggle source
# File lib/dynflow/director.rb, line 86 def start_execution(execution_plan_id, finished) manager = track_execution_plan(execution_plan_id, finished) return [] unless manager unless_done(manager, manager.start) end
terminate()
click to toggle source
# File lib/dynflow/director.rb, line 110 def terminate unless @execution_plan_managers.empty? logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..." begin @execution_plan_managers.values.each do |manager| manager.terminate end rescue Errors::PersistenceError logger.error "could not to clean the data properly" end @execution_plan_managers.values.each do |manager| finish_manager(manager) end end end
work_finished(work)
click to toggle source
# File lib/dynflow/director.rb, line 105 def work_finished(work) manager = @execution_plan_managers[work.execution_plan_id] unless_done(manager, manager.what_is_next(work)) end
Private Instance Methods
finish_manager(manager)
click to toggle source
# File lib/dynflow/director.rb, line 138 def finish_manager(manager) @execution_plan_managers.delete(manager.execution_plan.id) if rescue?(manager) rescue!(manager) else set_future(manager) end end
rescue!(manager)
click to toggle source
# File lib/dynflow/director.rb, line 161 def rescue!(manager) # TODO: after moving to concurrent-ruby actors, there should be better place # to put this logic of making sure we don't run rescues in endless loop @rescued_steps[manager.execution_plan.id] ||= Set.new @rescued_steps[manager.execution_plan.id].merge(manager.execution_plan.failed_steps.map(&:id)) rescue_plan_id = manager.execution_plan.rescue_plan_id if rescue_plan_id @world.executor.execute(rescue_plan_id, manager.future, false) else set_future(manager) end end
rescue?(manager)
click to toggle source
# File lib/dynflow/director.rb, line 147 def rescue?(manager) if @world.terminating? || !(@world.auto_rescue && manager.execution_plan.state == :paused) false elsif !@rescued_steps.key?(manager.execution_plan.id) # we have not rescued this plan yet true else # we have rescued this plan already, but a different step has failed now # we do this check to prevent endless loop, if we always failed on the same steps failed_step_ids = manager.execution_plan.failed_steps.map(&:id).to_set (failed_step_ids - @rescued_steps[manager.execution_plan.id]).any? end end
set_future(manager)
click to toggle source
# File lib/dynflow/director.rb, line 194 def set_future(manager) @rescued_steps.delete(manager.execution_plan.id) manager.future.success manager.execution_plan end
track_execution_plan(execution_plan_id, finished)
click to toggle source
# File lib/dynflow/director.rb, line 174 def track_execution_plan(execution_plan_id, finished) execution_plan = @world.persistence.load_execution_plan(execution_plan_id) if @execution_plan_managers[execution_plan_id] raise Dynflow::Error, "cannot execute execution_plan_id:#{execution_plan_id} it's already running" end if execution_plan.state == :stopped raise Dynflow::Error, "cannot execute execution_plan_id:#{execution_plan_id} it's stopped" end @execution_plan_managers[execution_plan_id] = ExecutionPlanManager.new(@world, execution_plan, finished) rescue Dynflow::Error => e finished.fail e nil end
unless_done(manager, work_items)
click to toggle source
# File lib/dynflow/director.rb, line 128 def unless_done(manager, work_items) return [] unless manager if manager.done? finish_manager(manager) return [] else return work_items end end