class Dynflow::Action

rubocop:disable Metrics/ClassLength

Constants

ERROR
OutputReference
Phase
SUSPEND
Skip

Attributes

caller_action_id[R]
caller_execution_plan_id[R]
execution_plan_id[R]
finalize_step_id[R]
id[R]
input[R]
phase[R]
plan_step_id[R]
run_step_id[R]
world[R]

Public Class Methods

all_children() click to toggle source
# File lib/dynflow/action.rb, line 27
def self.all_children
  children.values.inject(children.values) do |children, child|
    children + child.all_children
  end
end
children() click to toggle source
# File lib/dynflow/action.rb, line 38
def self.children
  @children ||= {}
end
constantize(action_name) click to toggle source
Calls superclass method
# File lib/dynflow/action.rb, line 80
def self.constantize(action_name)
  super action_name
rescue NameError
  Action::Missing.generate(action_name)
end
inherited(child) click to toggle source
Calls superclass method
# File lib/dynflow/action.rb, line 33
def self.inherited(child)
  children[child.name] = child
  super child
end
middleware() click to toggle source
# File lib/dynflow/action.rb, line 42
def self.middleware
  @middleware ||= Middleware::Register.new
end
new(attributes, world) click to toggle source
# File lib/dynflow/action.rb, line 92
def initialize(attributes, world)
  Type! attributes, Hash

  @phase             = Type! attributes.fetch(:phase), Phase
  @world             = Type! world, World
  @step              = Type! attributes.fetch(:step, nil), ExecutionPlan::Steps::Abstract, NilClass
  raise ArgumentError, 'Step reference missing' if phase?(Executable) && @step.nil?
  @execution_plan_id = Type! attributes.fetch(:execution_plan_id), String
  @id                = Type! attributes.fetch(:id), Integer
  @plan_step_id      = Type! attributes.fetch(:plan_step_id), Integer
  @run_step_id       = Type! attributes.fetch(:run_step_id), Integer, NilClass
  @finalize_step_id  = Type! attributes.fetch(:finalize_step_id), Integer, NilClass

  @execution_plan    = Type!(attributes.fetch(:execution_plan), ExecutionPlan) if phase? Present

  @caller_execution_plan_id  = Type!(attributes.fetch(:caller_execution_plan_id, nil), String, NilClass)
  @caller_action_id          = Type!(attributes.fetch(:caller_action_id, nil), Integer, NilClass)

  getter =-> key, required do
    required ? attributes.fetch(key) : attributes.fetch(key, {})
  end

  @input  = OutputReference.deserialize getter.(:input, phase?(Run, Finalize, Present))
  @output = OutputReference.deserialize getter.(:output, false) if phase? Run, Finalize, Present
end
subscribe() click to toggle source

FIND define subscriptions in world independent on action's classes,

limited only by in/output formats

@return [nil, Class] a child of Action

# File lib/dynflow/action.rb, line 49
def self.subscribe
  nil
end

Protected Class Methods

new_from_hash(hash, world) click to toggle source
# File lib/dynflow/action.rb, line 360
def self.new_from_hash(hash, world)
  new(hash, world)
end

Public Instance Methods

action_logger() click to toggle source
# File lib/dynflow/action.rb, line 187
def action_logger
  phase! Executable
  world.action_logger
end
all_planned_actions(filter_class = Action) click to toggle source

@param [Class] filter_class return only actions which are kind of `filter_class` @return [Array<Action>] of all (including indirectly) planned actions by this action, returned actions are in Present phase

# File lib/dynflow/action.rb, line 211
def all_planned_actions(filter_class = Action)
  phase! Present
  mine = planned_actions
  (mine + mine.reduce([]) { |arr, action| arr + action.all_planned_actions }).
      select { |a| a.is_a?(filter_class) }
end
caller_action() click to toggle source
# File lib/dynflow/action.rb, line 152
def caller_action
  plase! Present
  return nil if @caller_action_id
  return @caller_action if @caller_action

  caller_execution_plan = if @caller_execution_plan_id == execution_plan.id
                            execution_plan
                          else
                            world.persistence.load_execution_plan(@caller_execution_plan_id)
                          end
  @caller_action = world.persistence.load_action_for_presentation(caller_execution_plan, @caller_action_id)
end
error() click to toggle source
# File lib/dynflow/action.rb, line 259
def error
  raise "error data not available" if @step.nil?
  @step.error
end
execute(*args) click to toggle source
# File lib/dynflow/action.rb, line 264
def execute(*args)
  phase! Executable
  self.send phase.execute_method_name, *args
end
execute_delay(delay_options, *args) click to toggle source
# File lib/dynflow/action.rb, line 289
def execute_delay(delay_options, *args)
  with_error_handling(true) do
    world.middleware.execute(:delay, self, delay_options, *args) do |*new_args|
      @serializer = delay(*new_args).tap do |serializer|
        serializer.perform_serialization!
      end
    end
  end
end
execution_plan() click to toggle source
# File lib/dynflow/action.rb, line 182
def execution_plan
  phase! Plan, Present
  @execution_plan
end
finalize_step() click to toggle source
# File lib/dynflow/action.rb, line 223
def finalize_step
  phase! Present
  execution_plan.steps.fetch(finalize_step_id) if finalize_step_id
end
from_subscription?() click to toggle source
# File lib/dynflow/action.rb, line 177
def from_subscription?
  phase! Plan
  @from_subscription
end
humanized_state() click to toggle source

@override to define more descriptive state information for the action: used in Dynflow console

# File lib/dynflow/action.rb, line 255
def humanized_state
  state.to_s
end
input=(hash) click to toggle source
# File lib/dynflow/action.rb, line 131
def input=(hash)
  Type! hash, Hash
  phase! Plan
  @input = Utils.indifferent_hash(hash)
end
label() click to toggle source
# File lib/dynflow/action.rb, line 127
def label
  self.class.name
end
output() click to toggle source
# File lib/dynflow/action.rb, line 143
def output
  if phase? Plan
    @output_reference or
      raise 'plan_self has to be invoked before being able to reference the output'
  else
    @output
  end
end
output=(hash) click to toggle source
# File lib/dynflow/action.rb, line 137
def output=(hash)
  Type! hash, Hash
  phase! Run
  @output = Utils.indifferent_hash(hash)
end
phase!(*phases) click to toggle source
# File lib/dynflow/action.rb, line 122
def phase!(*phases)
  phase?(*phases) or
    raise TypeError, "Wrong phase #{phase}, required #{phases}"
end
phase?(*phases) click to toggle source
# File lib/dynflow/action.rb, line 118
def phase?(*phases)
  Match? phase, *phases
end
plan_step() click to toggle source
# File lib/dynflow/action.rb, line 192
def plan_step
  phase! Present
  execution_plan.steps.fetch(plan_step_id)
end
planned_actions(filter = Action) click to toggle source

@param [Class] filter_class return only actions which are kind of `filter_class` @return [Array<Action>] of directly planned actions by this action, returned actions are in Present phase

# File lib/dynflow/action.rb, line 200
def planned_actions(filter = Action)
  phase! Present
  plan_step.
      planned_steps(execution_plan).
      map { |s| s.action(execution_plan) }.
      select { |a| a.is_a?(filter) }
end
required_step_ids(input = self.input) click to toggle source

@api private @return [Array<Fixnum>] - ids of steps referenced from action

# File lib/dynflow/action.rb, line 271
def required_step_ids(input = self.input)
  results   = []
  recursion =-> value do
    case value
    when Hash
      value.values.each { |v| recursion.(v) }
    when Array
      value.each { |v| recursion.(v) }
    when ExecutionPlan::OutputReference
      results << value.step_id
    else
      # no reference hidden in this arg
    end
    results
  end
  recursion.(input)
end
run_step() click to toggle source
# File lib/dynflow/action.rb, line 218
def run_step
  phase! Present
  execution_plan.steps.fetch(run_step_id) if run_step_id
end
serializer() click to toggle source
# File lib/dynflow/action.rb, line 299
def serializer
  raise "The action must be delayed in order to access the serializer" if @serializer.nil?
  @serializer
end
set_plan_context(execution_plan, triggering_action, from_subscription) click to toggle source
# File lib/dynflow/action.rb, line 165
def set_plan_context(execution_plan, triggering_action, from_subscription)
  phase! Plan
  @execution_plan    = Type! execution_plan, ExecutionPlan
  @triggering_action = Type! triggering_action, Action, NilClass
  @from_subscription = Type! from_subscription, TrueClass, FalseClass
end
state() click to toggle source
# File lib/dynflow/action.rb, line 248
def state
  raise "state data not available" if @step.nil?
  @step.state
end
steps() click to toggle source
# File lib/dynflow/action.rb, line 228
def steps
  [plan_step, run_step, finalize_step]
end
to_hash() click to toggle source
# File lib/dynflow/action.rb, line 232
def to_hash
  recursive_to_hash(
      { class:                     self.class.name,
        execution_plan_id:         execution_plan_id,
        id:                        id,
        plan_step_id:              plan_step_id,
        run_step_id:               run_step_id,
        finalize_step_id:          finalize_step_id,
        caller_execution_plan_id:  caller_execution_plan_id,
        caller_action_id:          caller_action_id,
        input:                     input },
      if phase? Run, Finalize, Present
        { output: output }
      end)
end
triggering_action() click to toggle source
# File lib/dynflow/action.rb, line 172
def triggering_action
  phase! Plan
  @triggering_action
end

Protected Instance Methods

delay(delay_options, *args) click to toggle source
# File lib/dynflow/action.rb, line 320
def delay(delay_options, *args)
  Serializers::Noop.new(args)
end
finalize() click to toggle source

Add this method to implement the action's *Finalize phase* behaviour. It can use DB in this phase.

# File lib/dynflow/action.rb, line 351
def finalize
  # just a rdoc placeholder
end
plan(*args) click to toggle source

@override to implement the action's *Plan phase* behaviour. By default it plans itself and expects input-hash. Use plan_self and plan_action methods to plan actions. It can use DB in this phase.

# File lib/dynflow/action.rb, line 328
def plan(*args)
  if from_subscription?
    # if the action is triggered by subscription, by default use the
    # input of parent action.
    # should be replaced by referencing the input from input format
    plan_self(input.merge(triggering_action.input))
  else
    # in this case, the action was triggered by plan_action. Use
    # the argument specified there.
    plan_self(*args)
  end
  self
end
run(event = nil) click to toggle source

Add this method to implement the action's *Run phase* behaviour. It should not use DB in this phase.

# File lib/dynflow/action.rb, line 344
def run(event = nil)
  # just a rdoc placeholder
end
run_accepts_events?() click to toggle source
# File lib/dynflow/action.rb, line 356
def run_accepts_events?
  method(:run).arity != 0
end
save_state() click to toggle source
# File lib/dynflow/action.rb, line 315
def save_state
  phase! Executable
  @step.save
end
state=(state) click to toggle source
# File lib/dynflow/action.rb, line 306
def state=(state)
  phase! Executable
  @world.logger.debug format('%13s %s:%2d %9s >> %9s in phase %8s %s',
                             'Step', execution_plan_id, @step.id,
                             self.state, state,
                             phase.to_s_humanized, self.class)
  @step.state = state
end

Private Instance Methods

check_serializable(what) click to toggle source
# File lib/dynflow/action.rb, line 539
def check_serializable(what)
  Match! what, :input, :output
  value = send what
  recursive_to_hash value # it raises when not serializable
rescue => e
  value.replace not_serializable: true
  raise e
end
concurrence(&block) click to toggle source

DSL for plan phase

# File lib/dynflow/action.rb, line 368
def concurrence(&block)
  phase! Plan
  @execution_plan.switch_flow(Flows::Concurrence.new([]), &block)
end
error!(error) click to toggle source

DSL to terminate action execution and set it to error

# File lib/dynflow/action.rb, line 415
def error!(error)
  phase! Executable
  set_error(error)
  throw ERROR
end
execute_finalize() click to toggle source
# File lib/dynflow/action.rb, line 527
def execute_finalize
  phase! Finalize
  @input     = OutputReference.dereference @input, world.persistence
  self.state = :running
  save_state
  with_error_handling do
    world.middleware.execute(:finalize, self) do
      finalize
    end
  end
end
execute_plan(*args) click to toggle source
# File lib/dynflow/action.rb, line 457
def execute_plan(*args)
  phase! Plan
  self.state = :running
  save_state

  # when the error occurred inside the planning, catch that
  # before getting out of the planning phase
  with_error_handling(!root_action?) do
    concurrence do
      world.middleware.execute(:plan, self, *args) do |*new_args|
        plan(*new_args)
      end
    end

    subscribed_actions = world.subscribed_actions(self.class)
    if subscribed_actions.any?
      # we encapsulate the flow for this action into a concurrence and
      # add the subscribed flows to it as well.
      trigger_flow = @execution_plan.current_run_flow.sub_flows.pop
      @execution_plan.switch_flow(Flows::Concurrence.new([trigger_flow].compact)) do
        subscribed_actions.each do |action_class|
          new_plan_step = @execution_plan.add_plan_step(action_class, self)
          new_plan_step.execute(@execution_plan, self, true, *args)
        end
      end
    end

    check_serializable :input
  end
end
execute_run(event) click to toggle source
# File lib/dynflow/action.rb, line 488
def execute_run(event)
  phase! Run
  @world.logger.debug format('%13s %s:%2d got event %s',
                             'Step', execution_plan_id, @step.id, event) if event
  @input = OutputReference.dereference @input, world.persistence

  case
  when state == :running
    raise NotImplementedError, 'recovery after restart is not implemented'

  when [:pending, :error, :skipping, :suspended].include?(state)
    if event && state != :suspended
      raise 'event can be processed only when in suspended state'
    end

    self.state = :running unless self.state == :skipping
    save_state
    with_error_handling do
      event = Skip if state == :skipping

      # we run the Skip event only when the run accepts events
      if event != Skip || run_accepts_events?
        result = catch(SUSPEND) do
          world.middleware.execute(:run, self, *[event].compact) do |*args|
            run(*args)
          end
        end

        self.state = :suspended if result == SUSPEND
      end

      check_serializable :output
    end

  else
    raise "wrong state #{state} when event:#{event}"
  end
end
plan_action(action_class, *args) click to toggle source
# File lib/dynflow/action.rb, line 396
def plan_action(action_class, *args)
  phase! Plan
  @execution_plan.add_plan_step(action_class, self).execute(@execution_plan, self, false, *args)
end
plan_self(input = {}) click to toggle source
# File lib/dynflow/action.rb, line 378
def plan_self(input = {})
  phase! Plan
  self.input.update input

  if self.respond_to?(:run)
    run_step          = @execution_plan.add_run_step(self)
    @run_step_id      = run_step.id
    @output_reference = OutputReference.new(@execution_plan.id, run_step.id, id)
  end

  if self.respond_to?(:finalize)
    finalize_step     = @execution_plan.add_finalize_step(self)
    @finalize_step_id = finalize_step.id
  end

  return self # to stay consistent with plan_action
end
root_action?() click to toggle source
# File lib/dynflow/action.rb, line 548
def root_action?
  @triggering_action.nil?
end
sequence(&block) click to toggle source
# File lib/dynflow/action.rb, line 373
def sequence(&block)
  phase! Plan
  @execution_plan.switch_flow(Flows::Sequence.new([]), &block)
end
set_error(error) click to toggle source
# File lib/dynflow/action.rb, line 449
def set_error(error)
  phase! Executable
  Type! error, Exception, String
  action_logger.error error
  self.state  = :error
  @step.error = ExecutionPlan::Steps::Error.new(error)
end
suspend(&block) click to toggle source
# File lib/dynflow/action.rb, line 408
def suspend(&block)
  phase! Run
  block.call suspended_action if block
  throw SUSPEND, SUSPEND
end
suspended_action() click to toggle source

DSL for run phase

# File lib/dynflow/action.rb, line 403
def suspended_action
  phase! Run
  @suspended_action ||= Action::Suspended.new(self)
end
with_error_handling(propagate_error = nil, &block) click to toggle source
# File lib/dynflow/action.rb, line 421
def with_error_handling(propagate_error = nil, &block)
  raise "wrong state #{self.state}" unless [:scheduling, :skipping, :running].include?(self.state)

  begin
    catch(ERROR) { block.call }
  rescue Exception => error
    set_error(error)
    # reraise low-level exceptions
    raise error unless Type? error, StandardError, ScriptError
  end

  case self.state
  when :scheduling
    self.state = :pending
  when :running
    self.state = :success
  when :skipping
    self.state = :skipped
  when :suspended, :error
  else
    raise "wrong state #{self.state}"
  end

  if propagate_error && self.state == :error
    raise(@step.error.exception)
  end
end