class Dynflow::World

Constants

TriggerResult

Attributes

action_classes[R]
auto_rescue[R]
auto_validity_check[R]
client_dispatcher[R]
clock[R]
connector[R]
coordinator[R]
dead_letter_handler[R]
delayed_executor[R]
execution_plan_cleaner[R]
executor[R]
executor_dispatcher[R]
id[R]
logger_adapter[R]
meta[R]
middleware[R]
persistence[R]
subscription_index[R]
terminated[R]
termination_timeout[R]
throttle_limiter[R]
transaction_adapter[R]
validity_check_timeout[R]

Public Class Methods

new(config) click to toggle source
# File lib/dynflow/world.rb, line 12
def initialize(config)
  @id                     = SecureRandom.uuid
  @clock                  = spawn_and_wait(Clock, 'clock')
  config_for_world        = Config::ForWorld.new(config, self)
  @logger_adapter         = config_for_world.logger_adapter
  config_for_world.validate
  @transaction_adapter    = config_for_world.transaction_adapter
  @persistence            = Persistence.new(self, config_for_world.persistence_adapter,
                                            :backup_deleted_plans => config_for_world.backup_deleted_plans,
                                            :backup_dir => config_for_world.backup_dir)
  @coordinator            = Coordinator.new(config_for_world.coordinator_adapter)
  @executor               = config_for_world.executor
  @action_classes         = config_for_world.action_classes
  @auto_rescue            = config_for_world.auto_rescue
  @exit_on_terminate      = Concurrent::AtomicBoolean.new(config_for_world.exit_on_terminate)
  @connector              = config_for_world.connector
  @middleware             = Middleware::World.new
  @middleware.use Middleware::Common::Transaction if @transaction_adapter
  @client_dispatcher      = spawn_and_wait(Dispatcher::ClientDispatcher, "client-dispatcher", self)
  @dead_letter_handler    = spawn_and_wait(DeadLetterSilencer, 'default_dead_letter_handler', config_for_world.silent_dead_letter_matchers)
  @meta                   = config_for_world.meta
  @auto_validity_check    = config_for_world.auto_validity_check
  @validity_check_timeout = config_for_world.validity_check_timeout
  @throttle_limiter       = config_for_world.throttle_limiter
  @terminated             = Concurrent.event
  @termination_timeout    = config_for_world.termination_timeout
  calculate_subscription_index

  if executor
    @executor_dispatcher = spawn_and_wait(Dispatcher::ExecutorDispatcher, "executor-dispatcher", self, config_for_world.executor_semaphore)
    executor.initialized.wait
  end
  if auto_validity_check
    self.worlds_validity_check
    self.locks_validity_check
  end
  @delayed_executor         = try_spawn(config_for_world, :delayed_executor, Coordinator::DelayedExecutorLock)
  @execution_plan_cleaner   = try_spawn(config_for_world, :execution_plan_cleaner, Coordinator::ExecutionPlanCleanerLock)
  @meta                     = config_for_world.meta
  @meta['delayed_executor'] = true if @delayed_executor
  @meta['execution_plan_cleaner'] = true if @execution_plan_cleaner
  coordinator.register_world(registered_world)
  @termination_barrier = Mutex.new
  @before_termination_hooks = Queue.new

  if config_for_world.auto_terminate
    at_exit do
      @exit_on_terminate.make_false # make sure we don't terminate twice
      self.terminate.wait
    end
  end
  self.auto_execute if config_for_world.auto_execute
  @delayed_executor.start if @delayed_executor
end

Public Instance Methods

action_logger() click to toggle source
# File lib/dynflow/world.rb, line 83
def action_logger
  logger_adapter.action_logger
end
auto_execute() click to toggle source

executes plans that are planned/paused and haven't reported any error yet (usually when no executor was available by the time of planning or terminating)

# File lib/dynflow/world.rb, line 380
def auto_execute
  coordinator.acquire(Coordinator::AutoExecuteLock.new(self)) do
    planned_execution_plans =
        self.persistence.find_execution_plans filters: { 'state' => %w(planned paused), 'result' => (ExecutionPlan.results - [:error]).map(&:to_s) }
    planned_execution_plans.map do |ep|
      if coordinator.find_locks(Dynflow::Coordinator::ExecutionLock.unique_filter(ep.id)).empty?
        execute(ep.id)
      end
    end.compact
  end
rescue Coordinator::LockError => e
  logger.info "auto-executor lock already aquired: #{e.message}"
  []
end
before_termination(&block) click to toggle source
# File lib/dynflow/world.rb, line 67
def before_termination(&block)
  @before_termination_hooks << block
end
delay(*args) click to toggle source
# File lib/dynflow/world.rb, line 161
def delay(*args)
  delay_with_caller(nil, *args)
end
delay_with_caller(caller_action, action_class, delay_options, *args) click to toggle source
# File lib/dynflow/world.rb, line 165
def delay_with_caller(caller_action, action_class, delay_options, *args)
  raise 'No action_class given' if action_class.nil?
  execution_plan = ExecutionPlan.new(self)
  execution_plan.delay(caller_action, action_class, delay_options, *args)
  Scheduled[execution_plan.id]
end
event(execution_plan_id, step_id, event, done = Concurrent.future) click to toggle source
# File lib/dynflow/world.rb, line 192
def event(execution_plan_id, step_id, event, done = Concurrent.future)
  publish_request(Dispatcher::Event[execution_plan_id, step_id, event], done, false)
end
execute(execution_plan_id, done = Concurrent.future) click to toggle source

@return [Concurrent::Edge::Future] containing execution_plan when finished raises when ExecutionPlan is not accepted for execution

# File lib/dynflow/world.rb, line 188
def execute(execution_plan_id, done = Concurrent.future)
  publish_request(Dispatcher::Execution[execution_plan_id], done, true)
end
invalidate(world) click to toggle source

Invalidate another world, that left some data in the runtime, but it's not really running

# File lib/dynflow/world.rb, line 272
def invalidate(world)
  Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld
  coordinator.acquire(Coordinator::WorldInvalidationLock.new(self, world)) do
    if world.is_a? Coordinator::ExecutorWorld
      old_execution_locks = coordinator.find_locks(class: Coordinator::ExecutionLock.name,
                                                   owner_id: "world:#{world.id}")

      coordinator.deactivate_world(world)

      old_execution_locks.each do |execution_lock|
        invalidate_execution_lock(execution_lock)
      end
    end

    coordinator.delete_world(world)
  end
end
invalidate_execution_lock(execution_lock) click to toggle source
# File lib/dynflow/world.rb, line 290
def invalidate_execution_lock(execution_lock)
  begin
    plan = persistence.load_execution_plan(execution_lock.execution_plan_id)
  rescue => e
    if e.is_a?(KeyError)
      logger.error "invalidated execution plan #{execution_lock.execution_plan_id} missing, skipping"
    else
      logger.error e
      logger.error "unexpected error when invalidating execution plan #{execution_lock.execution_plan_id}, skipping"
    end
    coordinator.release(execution_lock)
    return
  end
  unless plan.valid?
    logger.error "invalid plan #{plan.id}, skipping"
    coordinator.release(execution_lock)
    return
  end
  plan.execution_history.add('terminate execution', execution_lock.world_id)

  plan.steps.values.each do |step|
    if step.state == :running
      step.error = ExecutionPlan::Steps::Error.new("Abnormal termination (previous state: #{step.state})")
      step.state = :error
      step.save
    end
  end

  plan.update_state(:paused) if plan.state == :running
  plan.save
  coordinator.release(execution_lock)

  available_executors = coordinator.find_worlds(true)
  if available_executors.any? && !plan.error?
    client_dispatcher.tell([:dispatch_request,
                            Dispatcher::Execution[execution_lock.execution_plan_id],
                            execution_lock.client_world_id,
                            execution_lock.request_id])
  end
rescue Errors::PersistenceError
  logger.error "failed to write data while invalidating execution lock #{execution_lock}"
end
locks_validity_check() click to toggle source
# File lib/dynflow/world.rb, line 368
def locks_validity_check
  orphaned_locks = coordinator.clean_orphaned_locks

  unless orphaned_locks.empty?
    logger.error "invalid coordinator locks found and invalidated: #{orphaned_locks.inspect}"
  end

  return orphaned_locks
end
logger() click to toggle source
# File lib/dynflow/world.rb, line 79
def logger
  logger_adapter.dynflow_logger
end
ping(world_id, timeout, done = Concurrent.future) click to toggle source
# File lib/dynflow/world.rb, line 196
def ping(world_id, timeout, done = Concurrent.future)
  publish_request(Dispatcher::Ping[world_id], done, false, timeout)
end
plan(action_class, *args) click to toggle source
# File lib/dynflow/world.rb, line 172
def plan(action_class, *args)
  ExecutionPlan.new(self).tap do |execution_plan|
    execution_plan.prepare(action_class)
    execution_plan.plan(*args)
  end
end
plan_with_caller(caller_action, action_class, *args) click to toggle source
# File lib/dynflow/world.rb, line 179
def plan_with_caller(caller_action, action_class, *args)
  ExecutionPlan.new(self).tap do |execution_plan|
    execution_plan.prepare(action_class, caller_action: caller_action)
    execution_plan.plan(*args)
  end
end
publish_request(request, done, wait_for_accepted, timeout = nil) click to toggle source
# File lib/dynflow/world.rb, line 200
def publish_request(request, done, wait_for_accepted, timeout = nil)
  accepted = Concurrent.future
  accepted.rescue do |reason|
    done.fail reason if reason
  end
  client_dispatcher.ask([:publish_request, done, request, timeout], accepted)
  accepted.wait if wait_for_accepted
  done
rescue => e
  accepted.fail e
end
registered_world() click to toggle source
# File lib/dynflow/world.rb, line 71
def registered_world
  if executor
    Coordinator::ExecutorWorld.new(self)
  else
    Coordinator::ClientWorld.new(self)
  end
end
reload!() click to toggle source

reload actions classes, intended only for devel

# File lib/dynflow/world.rb, line 92
def reload!
  # TODO what happens with newly loaded classes
  @action_classes = @action_classes.map do |klass|
    begin
      Utils.constantize(klass.to_s)
    rescue NameError
      nil # ignore missing classes
    end
  end.compact
  middleware.clear_cache!
  calculate_subscription_index
end
subscribed_actions(action_class) click to toggle source
# File lib/dynflow/world.rb, line 87
def subscribed_actions(action_class)
  @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : []
end
terminate(future = Concurrent.future) click to toggle source
# File lib/dynflow/world.rb, line 212
def terminate(future = Concurrent.future)
  @termination_barrier.synchronize do
    @terminating ||= Concurrent.future do
      begin
        run_before_termination_hooks

        if delayed_executor
          logger.info "start terminating delayed_executor..."
          delayed_executor.terminate.wait(termination_timeout)
        end

        logger.info "start terminating throttle_limiter..."
        throttle_limiter.terminate.wait(termination_timeout)

        if executor
          connector.stop_receiving_new_work(self)

          logger.info "start terminating executor..."
          executor.terminate.wait(termination_timeout)

          logger.info "start terminating executor dispatcher..."
          executor_dispatcher_terminated = Concurrent.future
          executor_dispatcher.ask([:start_termination, executor_dispatcher_terminated])
          executor_dispatcher_terminated.wait(termination_timeout)
        end

        logger.info "start terminating client dispatcher..."
        client_dispatcher_terminated = Concurrent.future
        client_dispatcher.ask([:start_termination, client_dispatcher_terminated])
        client_dispatcher_terminated.wait(termination_timeout)

        logger.info "stop listening for new events..."
        connector.stop_listening(self)

        if @clock
          logger.info "start terminating clock..."
          clock.ask(:terminate!).wait(termination_timeout)
        end

        coordinator.delete_world(registered_world)
        @terminated.complete
        true
      rescue => e
        logger.fatal(e)
      end
    end.on_completion do
      Thread.new { Kernel.exit } if @exit_on_terminate.true?
    end
  end

  @terminating.tangle(future)
  future
end
terminating?() click to toggle source
# File lib/dynflow/world.rb, line 266
def terminating?
  defined?(@terminating)
end
trigger(action_class = nil, *args, &block) click to toggle source

@return [TriggerResult] blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block

# File lib/dynflow/world.rb, line 144
def trigger(action_class = nil, *args, &block)
  if action_class.nil?
    raise 'Neither action_class nor a block given' if block.nil?
    execution_plan = block.call(self)
  else
    execution_plan = plan(action_class, *args)
  end
  planned = execution_plan.state == :planned

  if planned
    done = execute(execution_plan.id, Concurrent.future)
    Triggered[execution_plan.id, done]
  else
    PlaningFailed[execution_plan.id, execution_plan.errors.first.exception]
  end
end
try_spawn(config_for_world, what, lock_class = nil) click to toggle source
# File lib/dynflow/world.rb, line 395
def try_spawn(config_for_world, what, lock_class = nil)
  object = nil
  return nil if !executor || (object = config_for_world.public_send(what)).nil?

  coordinator.acquire(lock_class.new(self)) if lock_class
  object.spawn.wait
  object
rescue Coordinator::LockError => e
  nil
end
worlds_validity_check(auto_invalidate = true, worlds_filter = {}) click to toggle source
# File lib/dynflow/world.rb, line 333
def worlds_validity_check(auto_invalidate = true, worlds_filter = {})
  worlds = coordinator.find_worlds(false, worlds_filter)

  world_checks = worlds.reduce({}) do |hash, world|
    hash.update(world => ping(world.id, self.validity_check_timeout))
  end
  world_checks.values.each(&:wait)

  results = {}
  world_checks.each do |world, check|
    if check.success?
      result = :valid
    else
      if auto_invalidate
        begin
          invalidate(world)
          result = :invalidated
        rescue => e
          logger.error e
          result = e.message
        end
      else
        result = :invalid
      end
    end
    results[world.id] = result
  end

  unless results.values.all? { |result| result == :valid }
    logger.error "invalid worlds found #{results.inspect}"
  end

  return results
end

Private Instance Methods

calculate_subscription_index() click to toggle source
# File lib/dynflow/world.rb, line 407
def calculate_subscription_index
  @subscription_index =
      action_classes.each_with_object(Hash.new { |h, k| h[k] = [] }) do |klass, index|
        next unless klass.subscribe
        Array(klass.subscribe).each do |subscribed_class|
          index[Utils.constantize(subscribed_class.to_s)] << klass
        end
      end.tap { |o| o.freeze }
end
run_before_termination_hooks() click to toggle source
# File lib/dynflow/world.rb, line 417
def run_before_termination_hooks
  until @before_termination_hooks.empty?
    begin
      @before_termination_hooks.pop.call
    rescue => e
      logger.error e
    end
  end
end
spawn_and_wait(klass, name, *args) click to toggle source
# File lib/dynflow/world.rb, line 427
def spawn_and_wait(klass, name, *args)
  initialized = Concurrent.future
  actor = klass.spawn(name: name, args: args, initialized: initialized)
  initialized.wait
  return actor
end