class Dynflow::Dispatcher::ClientDispatcher
Constants
- TrackedRequest
Public Class Methods
new(world)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 29 def initialize(world) @world = Type! world, World @last_id = 0 @tracked_requests = {} @terminated = nil end
Public Instance Methods
dispatch_request(request, client_world_id, request_id)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 53 def dispatch_request(request, client_world_id, request_id) executor_id = match request, (on ~Execution do |execution| AnyExecutor end), (on ~Event do |event| find_executor(event.execution_plan_id) end), (on Ping.(~any) do |receiver_id| receiver_id end) envelope = Envelope[request_id, client_world_id, executor_id, request] if Dispatcher::UnknownWorld === envelope.receiver_id raise Dynflow::Error, "Could not find an executor for #{envelope}" end connector.send(envelope).value! rescue => e log(Logger::ERROR, e) respond(envelope, Failed[e.message]) if envelope end
dispatch_response(envelope)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 74 def dispatch_response(envelope) return unless @tracked_requests.key?(envelope.request_id) match envelope.message, (on ~Accepted do @tracked_requests[envelope.request_id].accept! end), (on ~Failed do |msg| resolve_tracked_request(envelope.request_id, Dynflow::Error.new(msg.error)) end), (on Done | Pong do resolve_tracked_request(envelope.request_id) end) end
publish_request(future, request, timeout)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 36 def publish_request(future, request, timeout) track_request(future, request, timeout) do |tracked_request| dispatch_request(request, @world.id, tracked_request.id) end end
start_termination(*args)
click to toggle source
Calls superclass method
Dynflow::Actor#start_termination
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 46 def start_termination(*args) super @tracked_requests.values.each { |tracked_request| tracked_request.fail!(Dynflow::Error.new('Dispatcher terminated')) } @tracked_requests.clear finish_termination end
timeout(request_id)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 42 def timeout(request_id) resolve_tracked_request(request_id, Dynflow::Error.new("Request timeout")) end
Private Instance Methods
find_executor(execution_plan_id)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 90 def find_executor(execution_plan_id) execution_lock = @world.coordinator.find_locks(class: Coordinator::ExecutionLock.name, id: "execution-plan:#{execution_plan_id}").first if execution_lock execution_lock.world_id else Dispatcher::UnknownWorld end rescue => e log(Logger::ERROR, e) Dispatcher::UnknownWorld end
reset_tracked_request(tracked_request)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 114 def reset_tracked_request(tracked_request) if tracked_request.finished.completed? raise Dynflow::Error.new('Can not reset resolved tracked request') end unless tracked_request.accepted.completed? tracked_request.accept! # otherwise nobody would set the accept future end @tracked_requests[tracked_request.id] = TrackedRequest[tracked_request.id, tracked_request.request, Concurrent.future, tracked_request.finished] end
resolve_tracked_request(id, error = nil)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 124 def resolve_tracked_request(id, error = nil) return unless @tracked_requests.key?(id) if error @tracked_requests.delete(id).fail! error else tracked_request = @tracked_requests[id] resolve_to = match tracked_request.request, (on Execution.(execution_plan_id: ~any) do |uuid| @world.persistence.load_execution_plan(uuid) end), (on Event | Ping do true end) @tracked_requests.delete(id).success! resolve_to end end
track_request(finished, request, timeout) { |tracked_request| ... }
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 103 def track_request(finished, request, timeout) id = @last_id += 1 tracked_request = TrackedRequest[id, request, Concurrent.future, finished] @tracked_requests[id] = tracked_request @world.clock.ping(self, timeout, [:timeout, id]) if timeout yield tracked_request rescue Dynflow::Error => e resolve_tracked_request(tracked_request.id, e) log(Logger::ERROR, e) end