class Proxy::OpenBolt::Executor

Constants

MAX_CACHED_JOBS
SHUTDOWN_TIMEOUT

Public Class Methods

new() click to toggle source
# File lib/smart_proxy_openbolt/executor.rb, line 15
def initialize
  @pool = Concurrent::FixedThreadPool.new(Plugin.settings.workers.to_i)
  @jobs = LruCache.new(MAX_CACHED_JOBS)
end

Public Instance Methods

add_job(job) click to toggle source
# File lib/smart_proxy_openbolt/executor.rb, line 20
def add_job(job)
  raise ArgumentError, "Only Job instances can be added" unless job.is_a?(Job)
  id = SecureRandom.uuid
  job.id = id
  @jobs.put(id, job)
  @pool.post { job.process }
  id
end
jobs_completed() click to toggle source

Total number of jobs completed since proxy start

# File lib/smart_proxy_openbolt/executor.rb, line 56
def jobs_completed
  @pool.completed_task_count
end
num_running() click to toggle source

How many workers are currently busy

# File lib/smart_proxy_openbolt/executor.rb, line 46
def num_running
  @pool.length
end
queue_length() click to toggle source

How many jobs are waiting in the queue

# File lib/smart_proxy_openbolt/executor.rb, line 51
def queue_length
  @pool.queue_length
end
remove_job(id) click to toggle source
# File lib/smart_proxy_openbolt/executor.rb, line 41
def remove_job(id)
  @jobs.delete(id)
end
result(id) click to toggle source
# File lib/smart_proxy_openbolt/executor.rb, line 35
def result(id)
  job = get_job(id)
  return :invalid unless job
  job.result
end
running?() click to toggle source

Still accepting and running jobs, or shutting down?

# File lib/smart_proxy_openbolt/executor.rb, line 61
def running?
  @pool.running?
end
shutdown(timeout = SHUTDOWN_TIMEOUT) click to toggle source

Stop accepting tasks and wait for in-flight jobs to finish. If timeout is nil, wait forever.

# File lib/smart_proxy_openbolt/executor.rb, line 67
def shutdown(timeout = SHUTDOWN_TIMEOUT)
  @pool.shutdown
  @pool.wait_for_termination(timeout)
end
status(id) click to toggle source
# File lib/smart_proxy_openbolt/executor.rb, line 29
def status(id)
  job = get_job(id)
  return :invalid unless job
  job.status
end

Private Instance Methods

get_job(id) click to toggle source
# File lib/smart_proxy_openbolt/executor.rb, line 74
def get_job(id)
  cached = @jobs.get(id)
  return cached if cached

  # Look on disk for a past run that may have happened
  file = Proxy::OpenBolt.result_file_path(id)
  begin
    data = JSON.parse(File.read(file))
    return nil if data['schema'].nil? || data['schema'] != 1
    return nil if data['status'].nil?
    # This is only for reading back status and result. Don't try
    # to fill in the other arguments correctly, and don't assume
    # they are there after execution.
    job = Job.new(nil, nil, nil)
    job.id = id
    job.update_status(data['status'].to_sym)
    @jobs.put(id, job)
    job
  rescue Errno::ENOENT, JSON::ParserError
    nil
  end
end