异步队列 sidekiq 指南

Sidekiq 是为 Ruby 打造的一个全功能后台处理框架,可以很方便地集成到大部分 Rails 程序中。网上关于 Sidekiq 的中文资料还是比较少,这里我结合官方文档给大家做简要介绍。版本:4.1.4


更新记录

  • 2016.07.06: 初稿

快速入门

开发者已经尽量考虑到集成的问题,所以只需要简单几步即可完成 sidekiq 的使用。

首先,在 Gemfile 中添加对应包 gem 'sidekiq'

接着,在 app/workers 文件夹中添加一个 worker,来处理异步请求,比如

class HardWorker
include Sidekiq::Worker
def perform(name, count)
# do something
end
end

然后在其他地方就可以提交异步任务了,如

# 常规提交
HardWorker.perform_async('wdxtub', 1)
# 延迟提交
HardWorker.perform_in(5.minutes, 'wdxtub', 1)
# 也可以通过 delay 来新建任务
User.delay.do_some_stuff(current_user.id, 20)

最后在 Rails 应用的根目录启动 sidekiq 就会开始执行,命令为 bundle exec sidekiq

基础知识

Sidekiq 客户端通常运行在应用进程中(比方说一个 puma 或 passenger 进程),允许我们创建 job 之后处理,一般来说有下面两种方式

# 第一种方法:常规调用
MyWorker.perform_async(1, 2, 3)
# 第二种方法:底层通用调用
Sidekiq::Client.push('class' => MyWorker, 'args' => [1, 2, 3])

这两种方法是等价的,执行的时候会生成一个表示这个 job 的 hash,之后会转成一个 JSON 字符串并放到 Redis 队列中。这也意味着 worker 中的参数必须是简单的 JSON 类型(数字、字符串、布尔值、数组、哈希),如果是复杂的 Ruby 对象(比如 Date, Time 或者 ActiveRecord)则会出现无法正确序列化的问题。

每个 Sidekiq 服务器进程会从 Redis 中拉取 job 并进行处理。和 web 进程一样,Sidekiq 会启动 Rails 所以所有的 job 和 worker 都可以使用全部的 Rails API(包括 Active Record)。服务器会实例化 worker 并调用其中的 perform 函数,剩下的就可以自由发挥了。

最佳实践

让每个 job 的参数小而简单

从前面我们知道,Sidekiq 会把 perform_async 函数的参数保存在 Redis 中,而常见的错误做法是这样的:

quote = Quote.find(quote_id)
SomeWorker.perform_async(quote)

这有什么问题呢?问题很大!把整个 quote 对象都保存在 Redis 中了,而且由于操作是通过异步队列来完成的,很可能在执行到这句话时,quote 本身都改变了,所以我们尽可能让每个 job 的参数小而简单,比方说可以改成像这样

SomeWorker.perform_async(quote_id)

另外除了数据本身可能改变这个问题外,还有序列化和反序列化的问题。Sidekiq 使用 JSON.dumpJSON.load 来完成存取操作,所以如果不是简单的 JSON 数据类型,在这一步很可能出错。

让每个 job 满足幂等性和事务性

幂等性保证了任务可以执行很多次,不会出现结果不一致的情况;事务性则是保证了结果不因为在 job 执行半路出现问题。我们唯一能做的假设是 Sidekiq 至少会执行每个 job 一次。

拥抱并发

Sidekiq 是为并发设计的,所以在设计任务时一定要以并发的想法去思考。如果 Sidekiq 占用流量太高的话,可以考虑用连接池来限制连接数量。

Redis 的使用

Sidekiq 使用 Redis 来保存所有的 job 和操作数据。默认会去连接位于 localhost:6379 的 Redis 服务器。但是在生产环境中可能需要自定义地址,具体需要修改 config/initializers/sidekiq.rb 文件

# 这里的地址和端口号(1643)都需要配置成正确的
Sidekiq.configure_server do |config|
config.redis = { url: 'redis://redis.wdxtub.com:1643/12' }
end
Sidekiq.configure_client do |config|
config.redis = { url: 'redis://redis.wdxtub.com:1643/12' }
end

如果要使用 UNIX socket,URL 应该类似于 unix://#{Rails.root}/tmp/sockets/redis.sock

另外部署在云端的化,根据不同的配置,也有不同的讲究,具体参考这里

错误处理

最佳实践

  1. 使用错误相关的服务,比如 Honeybadger, Airbrake, Rollbar, BugSnag, Sentry, Exceptiontrap, Raygun 等等,它们的特性和价格都差不多,但是一定要挑一个来用
  2. 让 Sidekiq 捕获 job 引发的错误,内置的重试机制会按照规则来重试,并通过错误通知服务管理员
  3. 如果在 25 次重试还没有修复 bug,那么会停止继续尝试并把 job 放到 Dead Job 队列中,可以在 6 个月内通过 Web UI 手动处理
  4. 6 个月之后,这个 job 将被丢弃

更多详细信息参考这里

配置文件

配置文件是 config/sidekiq.yml,只有在需要设置高级选项的时候才需要创建这个文件,一个例子为:

---
:concurrency: 5
:pidfile: tmp/pids/sidekiq.pid
staging:
:concurrency: 10
production:
:concurrency: 20
:queues:
- default
- [myqueue, 2]

如果不这样命名,或者不放在这个位置的话,在命令行中用 -C 来指定 sidekiq -C config/myapp_sidekiq.yml

队列

默认情况中 sidekiq 会使用 Redis 中名为 default 的队列,如果想用多个队列的话,要么在命令行中指定,要么在配置文件中指定。每个队列都可以配置对应的权重,如果一个队列的权重是 2,另一个是 1 的化,那么检查前一个队列的频率是第二个的两倍。

用命令行参数 sidekiq -q critical,2 -q default

用配置文件

# ...
:queues:
- [critical, 2]
- default

如果想让队列按照特定顺序执行,在列出来的时候就要按顺序,先处理的就在前面。

用命令行参数 sidekiq -q critical -q default -q low

用配置文件

# ...
:queues:
- critical
- default
- low

如果想让队列随机执行的话,就把每个队列的权重都设为 1。比如:

# ...
:queues:
- ["wd", 1]
- ["xt", 1]
- ["ub", 1]

也可以给 worker 指定队列,如

class ImportantWorker
include Sidekiq::Worker
sidekiq_options queue: 'critical'
def perform(*important_args)
puts "Doing critical work"
end
end

但是开发组并不建议用太多队列,设计之初就没有这么打算。更多具体的信息请参考这里

计划任务

这一部分我们可以控制双方的行为,既可以控制任务本身,也可以控制获取任务的方式,比如

# 控制任务本身
MyWorker.perform_in(3.hours, 'wdxtub', 1)
MyWorker.perform_at(3.hours.from_now, 'wdxtub', 1)
# 控制获取任务的方式
# 这里是平均 15 秒才去抓去一次任务
Sidekiq.configure_server do |config|
config.average_scheduled_poll_interval = 15
end

Delay 扩展

Delay 扩展提供了一种非常简单的异步调用方式,默认来说,所有的类方法和 ActionMailer 都可以异步执行,例如

# ActionMailer
UserMailer.delay.welcome_email(@user.id)
UserMailer.delay_for(5.days).find_more_friends_email(@user.id)
UserMailer.delay_until(5.days.from_now).find_more_friends_email(@user.id)
# ActiveRecord
User.delay.delete_old_users('some', 'params')
User.delay_for(2.weeks).whatever
User.delay_until(2.weeks.from_now).whatever
# 类方法
MyClass.delay.some_method(1, 'bob', true)
# 高级选项
MyClass.delay(:retry => false).some_method(1, 2, 3)
MyClass.delay(:queue => 'low').some_method(1, 2, 3)
MyClass.delay_for(10.minutes, :retry => false).some_method(1, 2, 3)

如果想要禁用的话,可以这么做:

# config/initializers/sidekiq.rb
# Sidekiq.hook_rails! # uncomment if you want the delay extensions, but prefixed with `sidekiq_`
Sidekiq.remove_delay!

总结

本文覆盖了 Sidekiq 最基本的使用,应该足够理解其工作机制并看懂已有的代码。更多的内容都可以在参考链接中找到,如果有需要请自行点击下面的链接。

参考链接

捧个钱场?