class Dynflow::ThrottleLimiter::Core

Public Class Methods

new(world) click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 50
def initialize(world)
  @world = world
  @semaphores = {}
end

Public Instance Methods

cancel(parent_id, reason = nil) click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 97
def cancel(parent_id, reason = nil)
  if @semaphores.key?(parent_id)
    reason ||= 'The task was cancelled.'
    @semaphores[parent_id].waiting.each do |triggered|
      cancel_plan_id(triggered.execution_plan_id, reason)
      triggered.future.reject(reason)
    end
    finish(parent_id)
  end
end
finish(parent_id) click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 108
def finish(parent_id)
  @semaphores.delete(parent_id)
end
handle_plans(parent_id, planned_ids, failed_ids) click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 60
def handle_plans(parent_id, planned_ids, failed_ids)
  failed = failed_ids.map do |plan_id|
    ::Dynflow::World::Triggered[plan_id, Concurrent::Promises.resolvable_future].tap do |triggered|
      execute_triggered(triggered)
    end
  end

  planned_ids.map do |child_id|
    ::Dynflow::World::Triggered[child_id, Concurrent::Promises.resolvable_future].tap do |triggered|
      triggered.future.on_resolution! { self << [:release, parent_id] }
      execute_triggered(triggered) if @semaphores[parent_id].wait(triggered)
    end
  end + failed
end
initialize_plan(plan_id, semaphores_hash) click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 55
def initialize_plan(plan_id, semaphores_hash)
  @semaphores[plan_id] = create_semaphores(semaphores_hash)
  set_up_clock_for(plan_id, true)
end
observe(parent_id = nil) click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 75
def observe(parent_id = nil)
  if parent_id.nil?
    @semaphores.reduce([]) do |acc, cur|
      acc << { cur.first => cur.last.waiting }
    end
  elsif @semaphores.key? parent_id
    @semaphores[parent_id].waiting
  else
    []
  end
end
release(plan_id, key = :level) click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 87
def release(plan_id, key = :level)
  return unless @semaphores.key? plan_id
  set_up_clock_for(plan_id) if key == :time
  semaphore = @semaphores[plan_id]
  semaphore.release(1, key) if semaphore.children.key?(key)
  if semaphore.has_waiting? && semaphore.get == 1
    execute_triggered(semaphore.get_waiting)
  end
end

Private Instance Methods

cancel_plan_id(plan_id, reason) click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 114
def cancel_plan_id(plan_id, reason)
  plan = @world.persistence.load_execution_plan(plan_id)
  steps = plan.run_steps
  steps.each do |step|
    step.state = :error
    step.error = ::Dynflow::ExecutionPlan::Steps::Error.new(reason)
    step.save
  end
  plan.update_state(:stopped)
  plan.save
end
create_semaphores(hash) click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 140
def create_semaphores(hash)
  semaphores = hash.keys.reduce(Utils.indifferent_hash({})) do |acc, key|
    acc.merge(key => ::Dynflow::Semaphores::Stateful.new_from_hash(hash[key]))
  end
  ::Dynflow::Semaphores::Aggregating.new(semaphores)
end
execute_triggered(triggered) click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 126
def execute_triggered(triggered)
  @world.execute(triggered.execution_plan_id, triggered.finished)
end
set_up_clock_for(plan_id, initial = false) click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 130
def set_up_clock_for(plan_id, initial = false)
  if @semaphores[plan_id].children.key? :time
    timeout_message = 'The task could not be started within the maintenance window.'
    interval = @semaphores[plan_id].children[:time].meta[:interval]
    timeout = @semaphores[plan_id].children[:time].meta[:time_span]
    @world.clock.ping(self, interval, [:release, plan_id, :time])
    @world.clock.ping(self, timeout, [:cancel, plan_id, timeout_message]) if initial
  end
end