class Sidekiq::Scheduled::Enq

Constants

LUA_ZPOPBYSCORE

Public Class Methods

new() click to toggle source
# File lib/sidekiq/scheduled.rb, line 21
def initialize
  @lua_zpopbyscore_sha = nil
end

Public Instance Methods

enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS) click to toggle source
# File lib/sidekiq/scheduled.rb, line 25
def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS)
  # A job's "score" in Redis is the time at which it should be processed.
  # Just check Redis for the set of jobs with a timestamp before now.
  Sidekiq.redis do |conn|
    sorted_sets.each do |sorted_set|
      # Get next item in the queue with score (time to execute) <= now.
      # We need to go through the list one at a time to reduce the risk of something
      # going wrong between the time jobs are popped from the scheduled queue and when
      # they are pushed onto a work queue and losing the jobs.
      while (job = zpopbyscore(conn, keys: [sorted_set], argv: [now]))
        Sidekiq::Client.push(Sidekiq.load_json(job))
        Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
      end
    end
  end
end

Private Instance Methods

zpopbyscore(conn, keys: nil, argv: nil) click to toggle source
# File lib/sidekiq/scheduled.rb, line 44
def zpopbyscore(conn, keys: nil, argv: nil)
  @lua_zpopbyscore_sha = conn.script(:load, LUA_ZPOPBYSCORE) if @lua_zpopbyscore_sha.nil?

  conn.evalsha(@lua_zpopbyscore_sha, keys: keys, argv: argv)
rescue Redis::CommandError => e
  raise unless e.message.start_with?("NOSCRIPT")

  @lua_zpopbyscore_sha = nil
  retry
end