module Dynflow::Action::WithSubPlans

Constants

SubPlanFinished

Public Instance Methods

calculate_time_distribution() click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 92
def calculate_time_distribution
  time, count = input[:concurrency_control][:time]
  unless time.nil? || time.is_a?(Hash)
    # Assume concurrency level 1 unless stated otherwise
    level = input[:concurrency_control].fetch(:level, {}).fetch(:free, 1)
    semaphore = ::Dynflow::Semaphores::Stateful.new(nil, level,
                                                    :interval => time.to_f / (count * level),
                                                    :time_span => time)
    input[:concurrency_control][:time] = semaphore.to_hash
  end
end
cancel!() click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 65
def cancel!
  @world.throttle_limiter.cancel!(execution_plan_id)
  sub_plans('state' => 'running').each(&:cancel)
  suspend
end
check_for_errors!() click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 211
def check_for_errors!
  fail "A sub task failed" if output[:failed_count] > 0
end
counts_set?() click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 207
def counts_set?
  output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count]
end
create_sub_plans() click to toggle source

@abstract when the logic for the initiation of the subtasks

is different from the default one.

@returns a triggered task or array of triggered tasks @example

def create_sub_plans
  trigger(MyAction, "Hello")
end

@example

def create_sub_plans
  [trigger(MyAction, "Hello 1"), trigger(MyAction, "Hello 2")]
end
# File lib/dynflow/action/with_sub_plans.rb, line 57
def create_sub_plans
  raise NotImplementedError
end
distribute_over_time(time_span, count) click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 104
def distribute_over_time(time_span, count)
  input[:concurrency_control] ||= {}
  input[:concurrency_control][:time] = [time_span, count]
end
done?() click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 172
def done?
  if counts_set?
    output[:total_count] - output[:success_count] - output[:failed_count] <= 0
  else
    false
  end
end
increase_counts(planned, failed, track_total = true) click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 119
def increase_counts(planned, failed, track_total = true)
  output[:total_count]   = output.fetch(:total_count, 0) + planned + failed if track_total
  output[:failed_count]  = output.fetch(:failed_count, 0) + failed
  output[:pending_count] = output.fetch(:pending_count, 0) + planned
  output[:success_count] ||= 0
end
initiate() click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 28
def initiate
  if uses_concurrency_control
    calculate_time_distribution
    world.throttle_limiter.initialize_plan(execution_plan_id, input[:concurrency_control])
  end
  spawn_plans
end
limit_concurrency_level(level) click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 87
def limit_concurrency_level(level)
  input[:concurrency_control] ||= {}
  input[:concurrency_control][:level] = ::Dynflow::Semaphores::Stateful.new(level).to_hash
end
mark_as_done(plan_id, success) click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 163
def mark_as_done(plan_id, success)
  if success
    output[:success_count] += 1
  else
    output[:failed_count] += 1
  end
  output[:pending_count] -= 1
end
notify_on_finish(plans) click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 153
def notify_on_finish(plans)
  suspend do |suspended_action|
    plans.each do |plan|
      plan.finished.on_completion! do |success, value|
        suspended_action << SubPlanFinished[plan.id, success && (value.result == :success)]
      end
    end
  end
end
on_finish() click to toggle source

@api method to be called after all the sub tasks finished

# File lib/dynflow/action/with_sub_plans.rb, line 62
def on_finish
end
recalculate_counts() click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 188
def recalculate_counts
  output.update(total_count: 0,
                failed_count: 0,
                success_count: 0,
                pending_count: 0)
  sub_plans.each do |sub_plan|
    output[:total_count] += 1
    if sub_plan.state == :stopped
      if sub_plan.error?
        output[:failed_count] += 1
      else
        output[:success_count] += 1
      end
    else
      output[:pending_count] += 1
    end
  end
end
resume() click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 137
def resume
  if sub_plans.all? { |sub_plan| sub_plan.error_in_plan? }
    # We're starting over and need to reset the counts
    %w(total failed pending success).each { |key| output.delete("#{key}_count".to_sym) }
    initiate
  else
    recalculate_counts
    try_to_finish or fail "Some sub plans are still not finished"
  end
end
run(event = nil) click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 10
def run(event = nil)
  match event,
        (on nil do
           if output[:total_count]
             resume
           else
             initiate
           end
         end),
        (on SubPlanFinished do
           mark_as_done(event.execution_plan_id, event.success)
           try_to_finish or suspend
         end),
        (on Action::Cancellable::Cancel do
           cancel!
         end)
end
run_progress() click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 180
def run_progress
  if counts_set? && output[:total_count] > 0
    (output[:success_count] + output[:failed_count]).to_f / output[:total_count]
  else
    0.1
  end
end
spawn_plans() click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 36
def spawn_plans
  sub_plans = create_sub_plans
  sub_plans = Array[sub_plans] unless sub_plans.is_a? Array
  wait_for_sub_plans sub_plans
end
sub_plans(filter = {}) click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 148
def sub_plans(filter = {})
  @sub_plans ||= world.persistence.find_execution_plans(filters: { 'caller_execution_plan_id' => execution_plan_id,
                                                                   'caller_action_id' => self.id }.merge(filter) )
end
trigger(*args) click to toggle source

Helper for creating sub plans

# File lib/dynflow/action/with_sub_plans.rb, line 72
def trigger(*args)
  if uses_concurrency_control
    trigger_with_concurrency_control(*args)
  else
    world.trigger { world.plan_with_caller(self, *args) }
  end
end
trigger_with_concurrency_control(*args) click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 80
def trigger_with_concurrency_control(*args)
  record = world.plan_with_caller(self, *args)
  records = [[record.id], []]
  records.reverse! unless record.state == :planned
  @world.throttle_limiter.handle_plans!(execution_plan_id, *records).first
end
try_to_finish() click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 126
def try_to_finish
  if done?
    world.throttle_limiter.finish(execution_plan_id)
    check_for_errors!
    on_finish
    return true
  else
    return false
  end
end
uses_concurrency_control() click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 215
def uses_concurrency_control
  @uses_concurrency_control = input.key? :concurrency_control
end
wait_for_sub_plans(sub_plans) click to toggle source
# File lib/dynflow/action/with_sub_plans.rb, line 109
def wait_for_sub_plans(sub_plans)
  planned, failed = sub_plans.partition(&:planned?)
  increase_counts(planned.count, failed.count)
  if planned.any?
    notify_on_finish(planned)
  else
    check_for_errors!
  end
end