class Proxy::Dynflow::Runner::Dispatcher
Attributes
ticker[R]
Public Class Methods
instance()
click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 8 def self.instance return @instance if @instance @instance = new(Proxy::Dynflow::Core.world.clock, Proxy::Dynflow::Core.world.logger) end
new(clock, logger)
click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 117 def initialize(clock, logger) @mutex = Mutex.new @clock = clock @logger = logger @ticker = ::Proxy::Dynflow::Ticker.spawn('dispatcher-ticker', @clock, @logger, refresh_interval) @runner_actors = {} @runner_suspended_actions = {} end
Public Instance Methods
external_event(runner_id, external_event)
click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 163 def external_event(runner_id, external_event) synchronize do runner_actor = @runner_actors[runner_id] runner_actor&.tell([:external_event, external_event]) end end
finish(runner_id)
click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 155 def finish(runner_id) synchronize do _finish(runner_id) rescue => e _handle_command_exception(runner_id, e, false) end end
handle_command_exception(*args)
click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 176 def handle_command_exception(*args) synchronize { _handle_command_exception(*args) } end
kill(runner_id)
click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 146 def kill(runner_id) synchronize do runner_actor = @runner_actors[runner_id] runner_actor&.tell(:kill) rescue => e _handle_command_exception(runner_id, e, false) end end
refresh_interval()
click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 180 def refresh_interval 1 end
refresh_output(runner_id)
click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 170 def refresh_output(runner_id) synchronize do @runner_actors[runner_id]&.tell([:refresh_output]) end end
start(suspended_action, runner)
click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 130 def start(suspended_action, runner) synchronize do raise "Actor with runner id #{runner.id} already exists" if @runner_actors[runner.id] runner.logger = @logger runner_actor = RunnerActor.spawn("runner-actor-#{runner.id}", self, suspended_action, runner, @clock, @logger) @runner_actors[runner.id] = runner_actor @runner_suspended_actions[runner.id] = suspended_action runner_actor.tell(:start_runner) return runner.id rescue => e _handle_command_exception(runner.id, e) return nil end end
synchronize(&block)
click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 126 def synchronize(&block) @mutex.synchronize(&block) end
Private Instance Methods
_finish(runner_id)
click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 186 def _finish(runner_id) runner_actor = @runner_actors.delete(runner_id) return unless runner_actor @logger.debug("closing session for command [#{runner_id}]," \ "#{@runner_actors.size} actors left ") runner_actor.tell([:start_termination, Concurrent::Promises.resolvable_future]) ensure @runner_suspended_actions.delete(runner_id) end
_handle_command_exception(runner_id, exception, fatal = true)
click to toggle source
# File lib/smart_proxy_dynflow/runner/dispatcher.rb, line 197 def _handle_command_exception(runner_id, exception, fatal = true) @logger.error("error while dispatching request to runner #{runner_id}:"\ "#{exception.class} #{exception.message}:\n #{exception.backtrace.join("\n")}") suspended_action = @runner_suspended_actions[runner_id] if suspended_action suspended_action << Runner::Update.encode_exception('Runner error', exception, fatal) end _finish(runner_id) if fatal end