class Proxy::Dynflow::Runner::Dispatcher

Attributes

ticker[R]

Public Class Methods

instance() click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 8
def self.instance
  return @instance if @instance

  @instance = new(Proxy::Dynflow::Core.world.clock,
                  Proxy::Dynflow::Core.world.logger)
end
new(clock, logger) click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 117
def initialize(clock, logger)
  @mutex  = Mutex.new
  @clock  = clock
  @logger = logger
  @ticker = ::Proxy::Dynflow::Ticker.spawn('dispatcher-ticker', @clock, @logger, refresh_interval)
  @runner_actors = {}
  @runner_suspended_actions = {}
end

Public Instance Methods

external_event(runner_id, external_event) click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 163
def external_event(runner_id, external_event)
  synchronize do
    runner_actor = @runner_actors[runner_id]
    runner_actor&.tell([:external_event, external_event])
  end
end
finish(runner_id) click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 155
def finish(runner_id)
  synchronize do
    _finish(runner_id)
  rescue => e
    _handle_command_exception(runner_id, e, false)
  end
end
handle_command_exception(*args) click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 176
def handle_command_exception(*args)
  synchronize { _handle_command_exception(*args) }
end
kill(runner_id) click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 146
def kill(runner_id)
  synchronize do
    runner_actor = @runner_actors[runner_id]
    runner_actor&.tell(:kill)
  rescue => e
    _handle_command_exception(runner_id, e, false)
  end
end
refresh_interval() click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 180
def refresh_interval
  1
end
refresh_output(runner_id) click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 170
def refresh_output(runner_id)
  synchronize do
    @runner_actors[runner_id]&.tell([:refresh_output])
  end
end
start(suspended_action, runner) click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 130
def start(suspended_action, runner)
  synchronize do
    raise "Actor with runner id #{runner.id} already exists" if @runner_actors[runner.id]

    runner.logger = @logger
    runner_actor = RunnerActor.spawn("runner-actor-#{runner.id}", self, suspended_action, runner, @clock, @logger)
    @runner_actors[runner.id] = runner_actor
    @runner_suspended_actions[runner.id] = suspended_action
    runner_actor.tell(:start_runner)
    return runner.id
  rescue => e
    _handle_command_exception(runner.id, e)
    return nil
  end
end
synchronize(&block) click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 126
def synchronize(&block)
  @mutex.synchronize(&block)
end

Private Instance Methods

_finish(runner_id) click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 186
def _finish(runner_id)
  runner_actor = @runner_actors.delete(runner_id)
  return unless runner_actor

  @logger.debug("closing session for command [#{runner_id}]," \
                "#{@runner_actors.size} actors left ")
  runner_actor.tell([:start_termination, Concurrent::Promises.resolvable_future])
ensure
  @runner_suspended_actions.delete(runner_id)
end
_handle_command_exception(runner_id, exception, fatal = true) click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 197
def _handle_command_exception(runner_id, exception, fatal = true)
  @logger.error("error while dispatching request to runner #{runner_id}:"\
                "#{exception.class} #{exception.message}:\n #{exception.backtrace.join("\n")}")
  suspended_action = @runner_suspended_actions[runner_id]
  if suspended_action
    suspended_action << Runner::Update.encode_exception('Runner error', exception, fatal)
  end
  _finish(runner_id) if fatal
end