class Proxy::OpenBolt::Executor
Constants
- MAX_CACHED_JOBS
- SHUTDOWN_TIMEOUT
Public Class Methods
new()
click to toggle source
# File lib/smart_proxy_openbolt/executor.rb, line 15 def initialize @pool = Concurrent::FixedThreadPool.new(Plugin.settings.workers.to_i) @jobs = LruCache.new(MAX_CACHED_JOBS) end
Public Instance Methods
add_job(job)
click to toggle source
# File lib/smart_proxy_openbolt/executor.rb, line 20 def add_job(job) raise ArgumentError, "Only Job instances can be added" unless job.is_a?(Job) id = SecureRandom.uuid job.id = id @jobs.put(id, job) @pool.post { job.process } id end
jobs_completed()
click to toggle source
Total number of jobs completed since proxy start
# File lib/smart_proxy_openbolt/executor.rb, line 56 def jobs_completed @pool.completed_task_count end
num_running()
click to toggle source
How many workers are currently busy
# File lib/smart_proxy_openbolt/executor.rb, line 46 def num_running @pool.length end
queue_length()
click to toggle source
How many jobs are waiting in the queue
# File lib/smart_proxy_openbolt/executor.rb, line 51 def queue_length @pool.queue_length end
remove_job(id)
click to toggle source
# File lib/smart_proxy_openbolt/executor.rb, line 41 def remove_job(id) @jobs.delete(id) end
result(id)
click to toggle source
# File lib/smart_proxy_openbolt/executor.rb, line 35 def result(id) job = get_job(id) return :invalid unless job job.result end
running?()
click to toggle source
Still accepting and running jobs, or shutting down?
# File lib/smart_proxy_openbolt/executor.rb, line 61 def running? @pool.running? end
shutdown(timeout = SHUTDOWN_TIMEOUT)
click to toggle source
Stop accepting tasks and wait for in-flight jobs to finish. If timeout is nil, wait forever.
# File lib/smart_proxy_openbolt/executor.rb, line 67 def shutdown(timeout = SHUTDOWN_TIMEOUT) @pool.shutdown @pool.wait_for_termination(timeout) end
status(id)
click to toggle source
# File lib/smart_proxy_openbolt/executor.rb, line 29 def status(id) job = get_job(id) return :invalid unless job job.status end
Private Instance Methods
get_job(id)
click to toggle source
# File lib/smart_proxy_openbolt/executor.rb, line 74 def get_job(id) cached = @jobs.get(id) return cached if cached # Look on disk for a past run that may have happened file = Proxy::OpenBolt.result_file_path(id) begin data = JSON.parse(File.read(file)) return nil if data['schema'].nil? || data['schema'] != 1 return nil if data['status'].nil? # This is only for reading back status and result. Don't try # to fill in the other arguments correctly, and don't assume # they are there after execution. job = Job.new(nil, nil, nil) job.id = id job.update_status(data['status'].to_sym) @jobs.put(id, job) job rescue Errno::ENOENT, JSON::ParserError nil end end