class Dynflow::Dispatcher::ExecutorDispatcher

Public Class Methods

new(world, semaphore) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 4
def initialize(world, semaphore)
  @world           = Type! world, World
  @current_futures = Set.new
end

Public Instance Methods

handle_request(envelope) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 9
def handle_request(envelope)
  match(envelope.message,
        on(Execution) { perform_execution(envelope, envelope.message) },
        on(Event)     { perform_event(envelope, envelope.message) })
end

Protected Instance Methods

perform_event(envelope, event_request) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 44
def perform_event(envelope, event_request)
  future = on_finish do |f|
    f.then do
      respond(envelope, Done)
    end.rescue do |reason|
      respond(envelope, Failed[reason.to_s])
    end
  end
  @world.executor.event(event_request.execution_plan_id, event_request.step_id, event_request.event, future)
rescue Dynflow::Error => e
  future.fail(e) if future && !future.completed?
end
perform_execution(envelope, execution) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 17
def perform_execution(envelope, execution)
  allocate_executor(execution.execution_plan_id, envelope.sender_id, envelope.request_id)
  execution_lock = Coordinator::ExecutionLock.new(@world, execution.execution_plan_id, envelope.sender_id, envelope.request_id)
  future = on_finish do |f|
    f.then do |plan|
      when_done(plan, envelope, execution, execution_lock)
    end.rescue do |reason|
      @world.coordinator.release(execution_lock)
      respond(envelope, Failed[reason.to_s])
    end
  end
  @world.executor.execute(execution.execution_plan_id, future)
  respond(envelope, Accepted)
rescue Dynflow::Error => e
  future.fail(e) if future && !future.completed?
  respond(envelope, Failed[e.message])
end
start_termination(*args) click to toggle source
Calls superclass method Dynflow::Actor#start_termination
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 57
def start_termination(*args)
  super
  if @current_futures.empty?
    reference.tell(:finish_termination)
  else
    Concurrent.zip(*@current_futures).then { reference.tell(:finish_termination) }
  end
end
when_done(plan, envelope, execution, execution_lock) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 35
def when_done(plan, envelope, execution, execution_lock)
  if plan.state == :running
    @world.invalidate_execution_lock(execution_lock)
  else
    @world.coordinator.release(execution_lock)
    respond(envelope, Done)
  end
end

Private Instance Methods

allocate_executor(execution_plan_id, client_world_id, request_id) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 68
def allocate_executor(execution_plan_id, client_world_id, request_id)
  execution_lock = Coordinator::ExecutionLock.new(@world, execution_plan_id, client_world_id, request_id)
  @world.coordinator.acquire(execution_lock)
end
finish_execution(future) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 85
def finish_execution(future)
  @current_futures.delete(future)
end
on_finish() { |future).rescue { |reason| logger.error("Unexpected fail on future| ... } click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 73
def on_finish
  raise "Dispatcher terminating: no new work can be started" if terminating?
  future = Concurrent.future
  callbacks_future = (yield future).rescue { |reason| @world.logger.error("Unexpected fail on future #{reason}") }
  # we track currently running futures to make sure to not
  # terminate until the execution is finished (including
  # cleaning of locks etc)
  @current_futures << callbacks_future
  callbacks_future.on_completion! { reference.tell([:finish_execution, callbacks_future]) }
  return future
end