class Sidekiq::Scheduled::Enq
Constants
- LUA_ZPOPBYSCORE
Public Class Methods
new()
click to toggle source
# File lib/sidekiq/scheduled.rb, line 21 def initialize @lua_zpopbyscore_sha = nil end
Public Instance Methods
enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS)
click to toggle source
# File lib/sidekiq/scheduled.rb, line 25 def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. Sidekiq.redis do |conn| sorted_sets.each do |sorted_set| # Get next item in the queue with score (time to execute) <= now. # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. while (job = zpopbyscore(conn, keys: [sorted_set], argv: [now])) Sidekiq::Client.push(Sidekiq.load_json(job)) Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" } end end end end
Private Instance Methods
zpopbyscore(conn, keys: nil, argv: nil)
click to toggle source
# File lib/sidekiq/scheduled.rb, line 44 def zpopbyscore(conn, keys: nil, argv: nil) @lua_zpopbyscore_sha = conn.script(:load, LUA_ZPOPBYSCORE) if @lua_zpopbyscore_sha.nil? conn.evalsha(@lua_zpopbyscore_sha, keys: keys, argv: argv) rescue Redis::CommandError => e raise unless e.message.start_with?("NOSCRIPT") @lua_zpopbyscore_sha = nil retry end