How to debounce Sidekiq jobs ?
Here you will learn about the concept of debouncing, when to use it and how to implement it with Sidekiq
Debouncing is one of those programming concepts I've read about several times over the years, but I couldn’t seem to grasp fully. Whenever I read about it, I feel like I understand what it means in the moment, but as soon as I go back to work, the concept kind of fades away from my mind. Fortunately, I recently encountered a problem where debouncing was the exact solution I needed, and that’s how I came to really understand what it's all about. I decided to write about this so that I don’t forget in the future, hoping it can help you, like it helped me.
The problem i was facing
A few times a day, we receive a lot of consecutive webhooks in a short time span, around 10 minutes. My requirement was to launch a job only at the receipt of the last webhook within this timespan.
Solution: Implement debouncing
The idea is to implement a new method called perform_debounce
that will handle the debouncing.
When I debounce a job by 10 minutes, essentially what I’m doing is scheduling a job to run in 10 minutes. If, within those 10 minutes, I receive another webhook, then I will again call perform_debounce
for another 10 minutes.
When a job is scheduled through debouncing, we want to leverage the around_perform
hook provided by Active Job to perform a check before actually running the job. It will first check if there are other jobs scheduled, and if so, it will finish right away. This way, only the last job will be truly performed.
I developed the following module that can be included in any Sidekiq job. In our case we include it in the ApplicationJob from which derived all our jobs.
module Sidekiq
module Debounce
BUFFER = 1 # second.
DEFAULT_DELAY = 60 # seconds
def self.included(base)
base.extend(ClassMethods)
base.class_attribute(:throttle_settings)
base.class_attribute(:debounce_settings)
base.around_perform do |job, block|
options = job.arguments.extract_options!
debounce_enabled = options.delete(:debounce)
job.arguments.concat(Array.wrap(options)) if options.present? # some arguments is key value format
if debounce_settings && debounce_enabled
process_debounce(job) { block.call }
else
block.call
end
end
end
module ClassMethods
def debounce(*args)
self.debounce_settings = args.extract_options!
end
def perform_debounce(*params)
# Refresh the timestamp in redis with debounce delay added.
delay = debounce_settings[:duration] || DEFAULT_DELAY
redis_client.set(key(params), now + delay)
# Schedule the job with not only debounce delay added, but also BUFFER.
# BUFFER helps prevent race condition between this line and the one above.
params.push({ debounce: true })
set(wait_until: now + delay + BUFFER).perform_later(*params)
end
def key(params)
return lock_key if respond_to?(:lock_key)
params_key = Array.wrap(params).map do |param|
param.try(:to_global_id) || param
end.join(", ")
"#{name}:#{params_key}"
end
def now
Time.now.to_i
end
end
def process_debounce(job)
yield if perform?(*job.arguments)
end
def perform?(*params)
# Only the last job should come after the timestamp.
timestamp = redis_client.get(self.class.key(params))
# But because of BUFFER, there could be multiple last jobs enqueued within
# the span of BUFFER. The first one will clear the timestamp, and the rest
# will skip when they see that the timestamp is gone.
return false if timestamp.nil?
return false if Time.now.to_i < timestamp.to_i
# Avoid race condition, only the first one del return 1, others are 0
redis_client.del(self.class.key(params)) == 1
end
end
end
If you want to be able to use this module, you will also need to declare a class_attribute in your at the ApplicationJob level to be able to access redis such as
class ApplicationJob < ActiveJob::Base
include Sidekiq::Debounce
class_attribute :redis_client
self.redis_client = Redis.new(db: 0)
end