📅 2025/1/12
📦 go版本: 1.23.3
💻 MacOs 14.6
🏆 Asynq
🍪 介绍
🏢 https://github.com/hibiken/asynq
Asynq
是一个 Go
语言异步任务框架,它以 Redis
作为消息队列,具备可伸缩性和简易性,大致应该是使用了redis的list数据结构
异步性是指操作系统能够同时处理多个任务,并按照不同的速度进行处理和响应。它允许非阻塞调用、事件驱动、多任务并行执行,并通过回调机制处理异步操作的结果。通过异步性,操作系统提高了效率、资源利用率,并提供了更好的用户体验。
Asynq异步任务解决方案:
- 客户端将任务放入队列
- 服务器从队列中提取任务并为每个任务启动一个工作线程(协程)
- 多个工作协程并行处理任务
任务队列是一种将工作分配到多台机器的机制。系统可以由多个工作服务器和代理组成,实现高可用性和水平扩展。 可以看一下我的Rocketmq介绍
特性
- 保证至少执行一次任务
- 任务调度
- 重试失败的任务
- 在工作线程崩溃时自动恢复任务
- 加权优先级队列
- 严格优先级队列
- 由于Redis中的写操作快速,添加任务的延迟低
- 使用唯一选项对任务进行去重
- 允许为每个任务设置超时和截止时间
- 允许聚合一组任务以批量执行多个连续操作
- 灵活的处理程序接口,支持中间件
- 允许暂停队列以停止从队列中处理任务
- 周期性任务
- 支持Redis Cluster以自动分片和实现高可用性
- 支持Redis Sentinel实现高可用性
- 与Prometheus集成,以收集和可视化队列指标
- Web界面,用于检查和远程控制队列和任务
- 命令行界面,用于检查和远程控制队列和任务
🍵 开启入门
教程地址: https://www.tizi365.com/topic/14001.html
首先需要安装redis(版本最新即可,自行安装),以及asynq
go install github.com/hibiken/asynq@latest
对应的需要创建客户端和消费端两个服务目录如下:
├── client
│ └── client.go
├── go.mod
├── go.sum
├── task
│ └── task.go
└── workers
└── workers.go
client 和 workers 分别表示客户端和消费端,task用于保存创建和执行任务相关操作,开始之前先熟悉一下基本的asynq操作
1️⃣ Asynq连接redis
使用RedisClientOpt结构体来指定与本地运行的Redis服务器的连接。
type RedisClientOpt struct {
// 要使用的网络类型,可以是 tcp 或 unix。
// 默认是 tcp。
Network string
// Redis 服务器地址,格式为 "host:port"。
Addr string
// 在使用 Redis ACLs 时,用于验证当前连接的用户名。
// 参见:https://redis.io/commands/auth
Username string
// 用于验证当前连接的密码。
// 参见:https://redis.io/commands/auth
Password string
// 连接到服务器后要选择的 Redis 数据库。
// 参见:https://redis.io/commands/select
DB int
// 建立新连接时的拨号超时。
// 默认是 5 秒。
DialTimeout time.Duration
// 套接字读取的超时时间。
// 如果达到超时,读取命令将以超时错误失败,而不是阻塞。
//
// 使用值 -1 表示无超时,0 表示默认值。
// 默认是 3 秒。
ReadTimeout time.Duration
// 套接字写入的超时时间。
// 如果达到超时,写入命令将以超时错误失败,而不是阻塞。
//
// 使用值 -1 表示无超时,0 表示默认值。
// 默认是 ReadTimeout。
WriteTimeout time.Duration
// 套接字连接的最大数量。
// 默认是每个 CPU 核心 10 个连接,由 runtime.NumCPU 报告。
PoolSize int
// 用于连接到服务器的 TLS 配置。
// 只有设置此字段时才会协商 TLS。
TLSConfig *tls.Config
}
示例配置
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// 如果不需要密码,可省略
Password: "mypassword",
// 为asynq使用一个专用的数据库编号。
// 默认情况下,Redis提供16个数据库(0..15)
DB: 0,
}
2️⃣ 任务
在asynq中,工作单元封装在称为Task的类型中,它概念上具有两个字段:Type和Payload。
// Type是一个字符串值,用于指示任务的类型。
func (t *Task) Type() string
// Payload是任务执行所需的数据。
func (t *Task) Payload() []byte
使用 NewTask
即可创建任务,可以看到接受两个东西一个所类名称,一个是 payload
// NewTask 返回一个给定类型名称和负载数据的新任务。可以传递选项来配置任务处理行为。
func NewTask(typename string, payload []byte, opts ...Option) *Task
这些就是需要了解的基本类型,接下来开始操作
3️⃣ 客户端代码
客户端任务是创建任务并且加入任务队列,所以需要连接``redis`,使用NewClient来创建客户端
// NewClient 返回一个给定 redis 连接选项的新 Client 实例
func NewClient(r RedisConnOpt) *Client
创建任务使用``NewTask 即可创建任务,这里我将
NewTask 任务封装至
task`包中,相应的tasktype 和 payload也都封装到里面了
Enqueue
将给定任务放入队列中,如果任务入队成功,Enqueue 返回 TaskInfo 和 nil 错误,否则返回非 nil 错误,参数 opts
指定任务处理的行为。如果存在冲突的 Option 值,则最后一个值会覆盖其他值。提供给 NewTask
的任何选项都可以被传递给 Enqueue
的选项覆盖。默认情况下,最大重试次数设置为 25,超时设置为 30 分钟,如果未提供 ProcessAt
或 ProcessIn
选项,任务将立即挂起
// 入队
func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error)
// ProcessIn 返回一个选项,用于指定相对于当前时间处理给定任务的时间。
func ProcessIn(d time.Duration) Option
// ProcessAt 返回一个选项来指定何时处理给定任务。
func ProcessAt(t time.Time) Option
代码如下:
这里创建了两个任务,分别是welcome和reminder
func main() {
// 连接redis
redisClient := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
DB: 0,
})
defer redisClient.Close()
t1, err := task.NewWelcomeEmailTask("222@qq.com")
if err != nil {
panic(err)
}
t2, err := task.NewReminderEmailTask("222@qq.com")
if err != nil {
panic(err)
}
// 立即处理
t1Info, err := redisClient.Enqueue(t1)
if err != nil {
panic(err)
}
fmt.Println("执行成功: ", t1Info)
// 延时处理
t2Info, err := redisClient.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
panic(err)
}
fmt.Println("执行成功: ", t2Info)
}
对应的创建task代码
// 任务类型的列表。
const (
TypeWelcomeEmail = "email:welcome"
TypeReminderEmail = "email:reminder"
)
// 与任何与电子邮件相关的任务相关的任务负载。
type emailTaskPayload struct {
// 电子邮件收件人的ID。
UserID string
}
// 新键任务
func NewWelcomeEmailTask(id string) (*asynq.Task, error) {
// payload 切换为[]byte
payload, err := json.Marshal(emailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
// 新键任务
func NewReminderEmailTask(id string) (*asynq.Task, error) {
payload, err := json.Marshal(emailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
4️⃣ 消费端代码
同样的也需要连redis不过使用 NewServer
来创建,多了一个传参 Config
作用是配置指定服务器的后台任务处理行为
// NewServer 返回一个给定 redis 连接选项和服务器配置的新服务器。
func NewServer(r RedisConnOpt, cfg Config) *Server
Config类型
type Config struct {
// 同时处理任务的最大数量。
//
// 如果设置为零或负值,NewServer 将覆盖该值为当前进程可使用的 CPU 数量。
Concurrency int
// BaseContext 可选地指定一个函数,该函数返回此服务器上调用 Handler 的基础上下文。
//
// 如果 BaseContext 为 nil,则默认值为 context.Background()。
// 如果定义了此函数,则它必须返回一个非空的上下文
BaseContext func() context.Context
// TaskCheckInterval 指定在所有队列为空时检查新任务以进行处理的时间间隔。
//
// 如果未设置、为零或负值,则时间间隔设置为 1 秒。
//
// 注意:将此值设置得过低可能会给 redis 增加显著负载。
//
// 默认情况下,TaskCheckInterval 被设置为 1 秒。
TaskCheckInterval time.Duration
// 用于计算失败任务重试延迟的函数。
//
// 默认情况下,它使用指数退避算法来计算延迟。
RetryDelayFunc RetryDelayFunc
// 用于确定 Handler 返回的错误是否为失败情况的谓词函数。
// 如果该函数返回 false,则 Server 不会增加任务的重试计数器,
// 并且 Server 不会记录队列统计信息(已处理和失败统计信息)以避免扭曲队列的错误率。
//
// 默认情况下,如果给定的错误是非 nil 的,则该函数返回 true。
IsFailure func(error) bool
// 要按给定优先级值处理的队列列表。键是队列的名称,值是关联的优先级值。
//
// 如果设置为 nil 或未指定,服务器将仅处理 "default" 队列。
//
// 为了避免使低优先级队列饥饿,按如下方式处理优先级:
//
// 示例:
//
// Queues: map[string]int{
// "critical": 6,
// "default": 3,
// "low": 1,
// }
//
// 根据上述配置,并且所有队列都不为空,"critical"、"default"、"low" 队列中的任务应分别按 60%、30%、10% 的时间进行处理。
//
// 如果队列具有零或负的优先级值,则该队列将被忽略。
Queues map[string]int
// StrictPriority 表示是否应严格处理队列优先级。
//
// 如果设置为 true,则首先处理具有最高优先级的队列中的任务。
// 只有当这些具有较高优先级的队列为空时,才处理较低优先级队列中的任务。
StrictPriority bool
ErrorHandler ErrorHandler
// Logger 指定服务器实例使用的日志记录器。
//
// 如果未设置,则使用默认日志记录器。
Logger Logger
// LogLevel 指定要启用的最低日志级别。
//
// 如果未设置,默认使用 InfoLevel。
LogLevel LogLevel
// ShutdownTimeout 指定在停止服务器时等待工作者完成其任务
// 的持续时间,之后强制中止它们。
//
// 如果未设置或为零,则使用默认的 8 秒超时。
ShutdownTimeout time.Duration
// HealthCheckFunc 定期调用,并传入在连接到 redis 服务器的 ping 过程中遇到的任何错误。
HealthCheckFunc func(error)
// HealthCheckInterval 指定健康检查之间的间隔。
//
// 如果未设置或为零,则时间间隔设置为 15 秒。
HealthCheckInterval time.Duration
// DelayedTaskCheckInterval 指定在 'scheduled' 和 'retry'
// 任务上运行的检查间隔,并在它们准备好被处理时将它们转发到 'pending' 状态。
//
// 如果未设置或为零,则间隔设置为 5 秒。
DelayedTaskCheckInterval time.Duration
// GroupGracePeriod 指定服务器在聚合组内任务之前等待传入任务的时间量。
// 如果在这段时间内接收到传入任务,服务器将再等待相同长度的另一个周期,直到达到 GroupMaxDelay(如果指定)。
//
// 如果未设置或为零,则宽限期设置为 1 分钟。
// GroupGracePeriod 的最小持续时间为 1 秒。如果指定的值小于 1 秒,则调用 NewServer 将引发 panic。
GroupGracePeriod time.Duration
// GroupMaxDelay 指定服务器在聚合组内任务之前等待传入任务的最大时间量。
//
// 如果未设置或为零,则不使用延迟限制。
GroupMaxDelay time.Duration
// GroupMaxSize 指定可以在组内聚合到单个任务中的最大任务数量。
// 如果达到 GroupMaxSize,则服务器将立即将任务聚合成一个。
//
// 如果未设置或为零,则不使用大小限制。
GroupMaxSize int
// GroupAggregator 指定用于将组内的多个任务聚合成一个任务的聚合函数。
//
// 如果未设置或为 nil,则服务器上的组聚合功能将被禁用。
GroupAggregator GroupAggregator
// JanitorInterval 指定过期已完成任务的 janitor 检查的平均间隔。
//
// 如果未设置或为零,则使用默认的 8 秒间隔。
JanitorInterval time.Duration
// JanitorBatchSize 指定在一次运行中要删除的过期已完成任务的数量。
//
// 如果未设置或为零,则使用默认的批量大小 100。
// 确保不要将大的数字作为批量大小,以防止运行时间过长的脚本。
JanitorBatchSize int
}
这里的asynq.NewServeMux() 是 作用是用来注册handler
server的Run接口作用就是运行 handler
// Run 启动任务处理并阻塞,直到 收到退出程序的操作系统信号。一旦它收到信号,它就会优雅地关闭所有活跃的工作线程和其他 goroutine 来处理任务
func (srv *Server) Run(handler Handler) error
handler的概念和gin中的差不多,是处理任务逻辑的,这里Run接受一个Handler接口,它有一个 ProcessTask
方法,如果任务处理成功,ProcessTask 应返回 nil,如果 ProcessTask 返回非零错误或恐慌,如果剩余重试次数,任务将在延迟后重试,否则任务将被存档,此规则的一个例外是 ProcessTask
返回 SkipRetry
错误。如果返回的错误是 SkipRetry
或错误包裹 SkipRetry
,则跳过重试,并立即归档任务,此规则的另一个例外是当 ProcessTask
返回 RevokeTask
错误时。如果返回的错误是 RevokeTask
或错误包装了 RevokeTask
,则不会重试或存档该任务
type Handler interface {
// 如果任务成功处理,ProcessTask应返回nil。
// 如果ProcessTask返回一个非nil错误或导致panic,任务将稍后重试。
ProcessTask(context.Context, *Task) error
}
实现一个 handler
最简单的方法是定义一个具有相同签名的函数,并在将其传递给 Run
时使用 asynq.HandlerFunc
适配器类型。
这里没有指定任何type,也就表示是消费所有类型的type
// 自定义的handler
func handler(ctx context.Context, t *asynq.Task) error {
}
// 如果要被server run则可以使用asynq.HandlerFunc
server.Run(asynq.HandlerFunc(handler))
来看看 HandlerFunc
的构造
// HandlerFunc 类型是一个适配器,允许使用普通函数作为 Handler。如果 f 是具有适当签名的函数,则 HandlerFunc(f) 是调用 f 的 Handler
type HandlerFunc func(context.Context, *Task) error
// 实现了Handler接口,它会调用调用HandlerFunc(context.Context, *Task)
func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error
但是我的代码使用 ServeMux
来创建handler。就像来自 "net/http"
包的 ServeMux
一样,你可以通过调用Handle或HandleFunc来注册一个handler。ServeMux满足Handler接口,所以可以将其传递给Run,你可以注册多个处理器。它会将每个任务的类型与注册的模式列表进行匹配,并调用与任务类型名称最接近的模式对应的处理器。
// ServeMux 是异步任务的多路复用器。它将每个任务的类型与注册类型列表进行匹配,并调用与任务类型名称最匹配的类型的处理程序。
// 较长的类型优先于较短的类型,因此,如果同时为“images”和“images:thumbnails”注册了处理程序,则类型名以“images:thumbnails”开头的任务将调用后者的处理程序,而前者将被调用。接收类型名称以“images”开头的任务。
type ServeMux struct {
}
// 创建一个ServerMux
func NewServeMux() *ServeMux
// Handle 注册给定类型的处理程序。如果类型的处理程序已存在,则 Handle 会出现panic
func (mux *ServeMux) Handle(pattern string, handler Handler)
// HandleFunc 注册给定类型的处理函数。
func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error)
// 处理程序返回用于给定任务的处理程序。它总是返回一个非零处理程序。处理程序还返回与任务匹配的注册类型。 如果没有适用于该任务的已注册处理程序,则处理程序将返回“未找到”处理程序,该处理程序会返回错误。
func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string)
// ProcessTask 将任务分派给其类型与任务类型最匹配的处理程序。
func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error
// 使用将 MiddlewareFunc 附加到链中。中间件按照它们应用于 ServeMux 的顺序执行。
func (mux *ServeMux) Use(mws ...MiddlewareFunc)
这里的 HandlerFunc
方法我也封装到了 task
内
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{
Addr: "localhost:6379",
DB: 0,
},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", task.HandleWelcomeEmailTask)
mux.HandleFunc("email:reminder", task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
panic(err)
}
}
handler函数
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
// 从t.Payload 解析出payload信息
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
// 打印
log.Printf(" [*] 向用户 %s 发送欢迎电子邮件", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] 向用户 %s 发送提醒电子邮件", p.UserID)
return nil
}
5️⃣ 运行程序
运行这两个程序。让我们先运行client程序来创建和调度任务。
> go run client/client.go
执行成功: &{c9e044e5-b078-414e-b5a7-e5da47fddae0 default email:welcome [123 34 85 115 101 114 73 68 34 58 34 50 50 50 64 113 113 46 99 111 109 34 125] pending 25 0 0001-01-01 00:00:00 +0000 UTC 30m0s 0001-01-01 00:00:00 +0000 UTC 2025-01-12 18:12:06.960881 +0800 CST m=+0.001739835 false 0s 0001-01-01 00:00:00 +0000 UTC []}
执行成功: &{0e72bca0-c275-4b2e-997b-914f3bb4e582 default email:reminder [123 34 85 115 101 114 73 68 34 58 34 50 50 50 64 113 113 46 99 111 109 34 125] scheduled 25 0 0001-01-01 00:00:00 +0000 UTC 30m0s 0001-01-01 00:00:00 +0000 UTC 2025-01-13 18:12:06.970658 +0800 CST m=+86400.011517001 false 0s 0001-01-01 00:00:00 +0000 UTC []}
这将创建两个任务:一个立即处理的任务和一个在24小时后处理的任务。
让我们使用asynq的命令行界面来检查任务。在此界面中会显示队列列表,以上代码都是默认在default中运行,点进去就可以查看task的运行状态了
asynq dash
最后,让我们启动workers程序来处理任务。
go run workers/workers.go
注意:该程序不会退出,直到发送一个信号来终止程序
运行后应该能够在终端中看到一些文本输出,表示任务已成功处理。
6️⃣ 使用中间件
中间件就是使用了装饰器模式,也就是说,它接受一个handler运行你在运行hanlder之前或者之后在做一些其他事情,下面定义了有个中间件,它接受一个handler并且执行handler,执行完毕打印耗时
func LoggerMiddler(handler asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
// 获取当前时间
start_time := time.Now()
// 执行任务
err := handler.ProcessTask(ctx, t)
if err != nil {
return err
}
// 获取结束时间
// time.Since可以快捷计算耗时
fmt.Println("任务执行完成,耗时:", time.Since(start_time))
return nil
})
}
定义完成之后该如何使用呢?你可以直接使用中间件包裹handler比如:
middler.LoggerMiddler(asynq.HandlerFunc(task.HandleReminderEmailTask))
如果使用servermux则可以直接Use
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", task.HandleWelcomeEmailTask)
mux.HandleFunc("email:reminder", task.HandleReminderEmailTask)
mux.Use(middler.LoggerMiddler)
启动测试一下
## 执行client创建任务
only/asynq/quickstart via 🐹 v1.23.3 on 🐳 v27.3.1 (orbstack) took 8m 10.0s
➜ go run client/client.go
执行成功: &{513a7743-f0e7-4eef-a26b-5b1291c46fd8 default email:welcome [123 34 85 115 101 114 73 68 34 58 34 50 50 50 64 113 113 46 99 111 109 34 125] pending 25 0 0001-01-01 00:00:00 +0000 UTC 30m0s 0001-01-01 00:00:00 +0000 UTC 2025-01-13 21:58:18.68871 +0800 CST m=+0.002020751 false 0s 0001-01-01 00:00:00 +0000 UTC []}
执行成功: &{f6ac043a-d0e4-42c8-9316-962c31eb7d5f default email:reminder [123 34 85 115 101 114 73 68 34 58 34 50 50 50 64 113 113 46 99 111 109 34 125] scheduled 25 0 0001-01-01 00:00:00 +0000 UTC 30m0s 0001-01-01 00:00:00 +0000 UTC 2025-01-14 21:58:18.699056 +0800 CST m=+86400.012366501 false 0s 0001-01-01 00:00:00 +0000 UTC []}
// 查看任务
only/asynq/quickstart via 🐹 v1.23.3 on 🐳 v27.3.1 (orbstack)
➜ asynq task ls --queue=default --state=pending
ID Type Payload
-- ---- -------
513a7743-f0e7-4eef-a26b-5b1291c46fd8 email:welcome {"UserID":"222@qq.com"}
only/asynq/quickstart via 🐹 v1.23.3 on 🐳 v27.3.1 (orbstack)
➜ asynq task ls --queue=default --state=scheduled
ID Type Payload Process In
-- ---- ------- ----------
5baa8b51-d1fe-4bc3-922c-8083b2537308 email:reminder {"UserID":"222@qq.com"} in 23h49m39s
## 运行server
only/asynq/quickstart via 🐹 v1.23.3 on 🐳 v27.3.1 (orbstack)
➜ go run workers/workers.go
asynq: pid=21643 2025/01/13 13:59:39.491731 INFO: Starting processing
asynq: pid=21643 2025/01/13 13:59:39.491829 INFO: Send signal TSTP to stop processing new tasks
asynq: pid=21643 2025/01/13 13:59:39.491869 INFO: Send signal TERM or INT to terminate the process
2025/01/13 21:59:39 [*] 向用户 222@qq.com 发送欢迎电子邮件
任务执行完成,耗时: 383.917µs ## 打印耗时了
如果不同应用需要使用到不同的中间件可以使用如下方法
productHandlers := asynq.NewServeMux()
productHandlers.Use(productMiddleware) // 对所有产品任务应用共享逻辑
productHandlers.HandleFunc("product:update", productUpdateTaskHandler)
// ... 注册其他“产品”任务处理函数
orderHandlers := asynq.NewServeMux()
orderHandler.Use(orderMiddleware) // 对所有订单任务应用共享逻辑
orderHandlers.HandleFunc("order:refund", orderRefundTaskHandler)
// ... 注册其他“订单”任务处理函数
// 顶级处理函数
mux := asynq.NewServeMux()
mux.Use(someGlobalMiddleware) // 对所有任务应用共享逻辑
mux.Handle("product:", productHandlers)
mux.Handle("order:", orderHandlers)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
🍵 Task生命周期
task有如下状态:
- Scheduled:任务正在等待未来处理(仅适用于具有ProcessAt或ProcessIn选项的任务)。
- Pending:任务已准备好进行处理,并将由一个空闲的工作器接收。
- Active:任务正在被工作器处理(即处理程序正在处理该任务)。
- Retry:工作器无法处理任务,任务正在等待将来重试。
- Archived:任务达到最大重试次数,并存储在归档中以供手动检查。
- Completed:任务已成功处理,并保留到保留时间到期为止(仅适用于具有Retention选项的任务)。
+-------------+ +--------------+ +--------------+ +-------------+
| | | | | | Success | |
| Scheduled |----------->| Pending |--------->| Active |---------> | Completed |
| (Optional) | | | | | | (Optional) |
+-------------+ +--------------+ +--------------+ +-------------+
^ | |
| | | Deletion
| | Failed |
| | V
| |
| |
+------+-------+ | +--------------+
| | | | |
| Retry |<--------------+------->| Archived |
| | | |
+--------------+ +--------------+
🍵 信号处理
当您使用Server.Run(Handler)启动服务器处理时,它将阻塞并等待传入的信号。
有两种类型的信号可发送给正在运行的程序,以优雅地关闭进程。
TSTP:此信号告诉Server停止处理新任务。
TERM或INT:此信号告诉Server终止(即关闭)。
建议您首先发送TSTP信号以停止处理新任务,并等待所有正在进行中的任务完成后,再发送TERM信号**以终止程序。
使用 kill 命令发送信号。
kill -TSTP # 停止处理新任务
kill -TERM # 关闭服务器
注意:如果在发送TERM或INT信号时没有发送TSTP信号,Server将启动一个定时器,等待8秒钟,以便所有工作人员完成(要自定义此超时持续时间,请使用ShutdownTime配置)。如果在该时间范围内有工作人员未完成,则任务将被转换回待处理状态,并在程序重新启动后再处理。
注意:目前,Windows不支持TSTP信号。
🍵 队列优先级
asynq
设置队列优先级的方式很简洁,使用如下代码即可:
srv := asynq.NewServer(redis, asynq.Config{
Concurrency: 10,
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
})
这段代码就表示
- critical队列中的任务将60%的时间进行处理
- default队列中的任务将30%的时间进行处理
- low队列中的任务将10%的时间进行处理
还可以设置在入队时指定队列入队的队列(默认就是进入 default
队列)
client := asynq.NewClient(redis)
task := asynq.NewTask("send_notification", map[string]interface{}{"user_id": 42})
// 使用`asynq.Queue`选项指定一个任务使用"critical"队列。
err := client.Enqueue(task, asynq.Queue("critical"))
严格优先级模式,使用 StrictPriority
开启
srv := asynq.NewServer(redis, asynq.Config{
Concurrency: 10,
Queues: map[string]int{
"critical": 3,
"default": 2,
"low": 1,
},
StrictPriority: true, // 严格模式!
})
这将创建一个具有严格优先级的三个队列的 Background
实例:critical,default和low。在严格优先级模式下,具有更高优先级的队列始终先处理,只有在所有其他优先级更高的队列为空时,才会处理优先级较低的队列。
因此,在此示例中,始终首先处理critical队列中的任务。如果critical队列为空,则处理default队列。如果critical和default队列都为空,则处理low队列
🍵 设置重试次数
在默认情况下,最大重试次数是25次,达到最大重试次数之后加入 Archived
中,还可以通过web或者cli来手动重试
1️⃣ 设置最大重试次数
使用 synq.MaxRetry
在创建任务或者任务入队时可以设置最大重试次数
// 任务入队时
client.Enqueue(task, asynq.MaxRetry(5))
// 创建任务时
task := asynq.NewTask("feed:import", nil, asynq.MaxRetry(5))
client.Enqueue(task) // MaxRetry 设置为 5
2️⃣ 自定义重试延迟
默认重试时指数重试由 DefaultRetryDelayFunc
定义,可以使用 RetryDelayFunc
函数进行重新自定义什么间隔重试
// 函数签名
// n 是任务已重试的次数
// e 是任务处理程序返回的错误
// t 是相关的任务
RetryDelayFunc func(n int, e error, t *asynq.Task) time.Duration
比如,任务失败时每隔2秒重试
srv := asynq.NewServer(redis, asynq.Config{
Concurrency: 20, // 最大任务并发数
RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
return 2 * time.Second // 每隔2秒重试
},
})
可以为单独的任务设置单独的重试机制
srv := asynq.NewServer(redis, asynq.Config{
// 对于 "foo" 任务,始终使用 2 秒的延迟,其他任务使用默认行为。
RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
if t.Type() == "foo" {
return 2 * time.Second
}
return asynq.DefaultRetryDelayFunc(n, e, t)
},
})
3️⃣ 非失败性错误处理
有时报错并不是因为这个任务本身发生报错了,而是因为一些外界因素导致这个任务报错,但是这些报错会占用重试的次数,如果像前面那样只设置了5次重试,那么一次的重试机会还是蛮宝贵的,你并不希望这种报错占用重试次数,那么就可以使用 Config
提供 IsFailure(error) bool
函数,可以在此函数内自定义不需要或者需要占用重试的报错类型
srv := asynq.NewServer(redisConnOpt, asynq.Config{
// ... 其他配置选项
IsFailure: func(err error) bool {
return err != ErrResourceNotAvailable // 如果没有可用资源,则为非失败错误
},
})
4️⃣ 跳过重试
如果 Handler.ProcessTask
返回一个 SkipRetry
错误,不管剩余的重试次数是多少,任务都将被归档。返回的错误可以是 SkipRetry
或包装了 SkipRetry
错误的错误
func ExampleHandler(ctx context.Context, task *asynq.Task) error {
// 任务处理逻辑在此处...
// 如果处理程序知道任务不需要重试,则返回 SkipRetry
return fmt.Errorf(": %w", asynq.SkipRetry)
}
💥 任务超时与取消配置
在任务开始处理的时候,有些任务由于网络原因导致很久没有完成,后面又有很多任务来了,这可能会影响队列的性能,可能造成任务积压,这个时候就需要使用到任务超时配置了
可以配置Timeout
或 Deadline
来配置任务超时的选项,粗略的可以理解为前者为短时间、后者为长时间,它们都是使用 context.Context
的超时时间或截止时间,将其作为第一个参数传递给你的处理程序可以查看Go上下文
如果你有一个需要在30秒内完成的任务,你使用 timeout
将超时时间设置为 30*time.Second
err := c.Enqueue(task, asynq.Timeout(30 * time.Second))
如果你有一个需要在特定时间之前完成的任务,你可以设置该任务的截止时间。
例如,如果你有一个需要在 2020-12-25
之前完成的任务,你可以将其作为 Deadline
选项传递
xmas := time.Date(2020, time.December, 12, 25, 0, 0, 0, time.UTC)
err := c.Enqueue(task, asynq.Deadline(xmas))
使用了 Timeout
和 Deadline
选项创建了任务但是还需要通过读取上下文中的 Done
通道来实现超时取消
func myHandler(ctx context.Context, task *asynq.Task) error {
c := make(chan error, 1) // 创建一个带缓冲的 channel,用于接收任务处理结果
go func() {
c <- doWork(task) // 启动一个 goroutine 执行任务,并将结果发送到 channel
}()
select {
case <-ctx.Done(): // 如果 ctx 被取消(超时或手动取消),则返回 ctx 的错误
return ctx.Err()
case res := <-c: // 如果任务完成,返回任务的结果
return res
}
}
还可以通过命令行取消任务
asynq task ls --queue=myqueue --state=active
asynq task cancel [task_id]
💥 定时任务
asynq
有定时执行任务的功能叫做 scheduler
,Scheduler
会定期将任务加入队列,然后由集群中可用的工作服务器来执行这些任务
使用定时任务需要注意下面几点:
-
确保每个调度只有一个Scheduler在运行,否则会出现重复任务
-
使用集中化的方法意味着不需要同步调度,并且服务可以在不使用锁的情况下运行
-
如果需要动态地添加和删除周期性任务,请使用
PeriodicTaskManager
而不是直接使用Scheduler
-
时区的配置
//默认情况下,周期任务使用的是UTC时间,但是你可以使用SchedulerOpts来更改所使用的时区 // 例如,使用America/Los_Angeles时区而不是默认的UTC时区。 loc, err := time.LoadLocation("America/Los_Angeles") if err != nil { panic(err) } scheduler := asynq.NewScheduler( redisConnOpt, &asynq.SchedulerOpts{ Location: loc, }, )
1️⃣ 启动定时任务
在运行定时任务之前,需要创建一个调度器,并且将任务都注册至调度器,最后使用 Run
启动调度器,启动之后它就会挂起,在收到终止命令后就会停止
// 创建任务调度器
scheduler := asynq.NewScheduler(redisConnOpt, nil)
// 创建任务
task := asynq.NewTask("example_task", nil)
// 你可以使用cron规范字符串来指定调度,表示每秒执行一次
entryID, err := scheduler.Register("* * * * *", task) // 注册任务
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
// 你还可以使用"@every "来指定间隔。
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
// 你也可以传递选项,比如入队的队列
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
// 使用Run来启动调度
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
2️⃣ 自定义无法入队错误处理
可以自定义队无法入队情况下的错误
// task 表示入队失败的任务对象
// opts 内包含任务的选项列表
// handleEnqueueError 是 EnqueueErrorHandler 的实现
func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
// 记录错误日志
log.Printf("Failed to enqueue task: type=%s, payload=%s, error=%v", task.Type(), task.Payload(), err)
// 可以根据任务类型和错误类型进行不同的处理
switch task.Type() {
case TypeEmailDelivery:
// 如果是邮件任务,可以尝试重试或记录到数据库
log.Printf("Retrying email task for user: %s", task.Payload())
// 这里可以添加重试逻辑,例如重新入队或记录到数据库
default:
// 其他任务类型的处理逻辑
log.Printf("Unhandled task type: %s", task.Type())
}
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
EnqueueErrorHandler: handleEnqueueError,
},
)
还可以通过CLI进行检查CLI有一个名为 cron
的子命令用于检查调度器记录。要查看当前正在运行的调度器的所有记录,可以运行以下命令:
asynq cron ls
这个命令将输出一个包含每个记录的ID、计划规范、下次加入队列时间和上次加入队列时间的列表。你还可以运行以下命令来查看每个记录的历史记录:
asynq cron history
💥 执行任务速率限制
在 asynq
中可以使用 rate
包来对任务速率进行限制,这里的限制是限制每一个 task
处理 handler
,并不表示整个消费端
我觉得这个要配上定时任务才方便使用,如果是只执行一次Enqueue就没有必要使用任务限制器
package main
import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"time"
"golang.org/x/time/rate"
"github.com/hibiken/asynq"
)
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{
Concurrency: 10,
// 如果错误是由于速率限制导致的,请不将错误计为故障。
IsFailure: func(err error) bool { return !IsRateLimitError(err) },
RetryDelayFunc: retryDelay,
},
)
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
type RateLimitError struct {
RetryIn time.Duration
}
func (e *RateLimitError) Error() string {
return fmt.Sprintf("速率限制(%v 后重试)", e.RetryIn)
}
func IsRateLimitError(err error) bool {
_, ok := err.(*RateLimitError)
return ok
}
func retryDelay(n int, err error, task *asynq.Task) time.Duration {
var ratelimitErr *RateLimitError
if errors.As(err, &ratelimitErr) {
return ratelimitErr.RetryIn
}
return asynq.DefaultRetryDelayFunc(n, err, task)
}
// 每秒1次事件的速率,最多允许2个事件的突发。
var limiter = rate.NewLimiter(1, 2)
func handler(ctx context.Context, task *asynq.Task) error {
if !limiter.Allow() {
log.Println("限速任务")
return &RateLimitError{
RetryIn: time.Duration(rand.Intn(10)) * time.Second,
}
}
log.Printf("[*] 处理任务 %s", task.Payload())
return nil
}
这段代码定义了一个自定义错误名为 RateLimitError
内有一个字段表示如果发生了这个错误则 RetryIn
时间后重试
如果想了解自定义错误可以https://blog.tanc.fun:9999/archives/dc612560-1a14-48d7-97f7-d72dee3f9786#2---%E8%87%AA%E5%AE%9A%E4%B9%89%E9%94%99%E8%AF%AF 内查看
核心代码是如下这段,使用 rate
的 NewLimiter
方法定义了一个限速器,在执行 handler
的时候会先调用限速器的 Allow
方法,可以将限速器看成一个令牌桶,在创建令牌桶时里面存放了两个令牌,调用 Allow
方法来获取令牌,如果获取不到那就返回自定义得到 RateLimitError
错误,来达到限制的作用
// 每秒1次事件的速率,最多允许2个事件的突发。
var limiter = rate.NewLimiter(1, 2)
func handler(ctx context.Context, task *asynq.Task) error {
if !limiter.Allow() {
log.Println("限速任务")
return &RateLimitError{
RetryIn: time.Duration(rand.Intn(10)) * time.Second,
}
}
log.Printf("[*] 处理任务 %s", task.Payload())
return nil
}
返回这个错误就会被之前说的自定义延迟和非失败性错误给处理了
启动一个生产端来测试一下
func main() {
// 创建 Asynq 客户端
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: ":6379", // Redis 地址
})
defer client.Close()
// 定时任务入队
ticker := time.NewTicker(300 * time.Millisecond) // 每 0.3 秒触发一次
defer ticker.Stop()
for i := 1; ; i++ {
<-ticker.C // 等待定时器触发
// 创建任务
task := asynq.NewTask("example_task", []byte(fmt.Sprintf("任务 %d", i)))
// 将任务推送到队列中
info, err := client.Enqueue(task)
if err != nil {
log.Printf("Failed to enqueue task: %v", err)
} else {
log.Printf("Enqueued task: id=%s queue=%s", info.ID, info.Queue)
}
}
}
启动worker和product端,就会有很多限速的
asynq: pid=18420 2025/01/17 14:40:01.636133 INFO: Starting processing
asynq: pid=18420 2025/01/17 14:40:01.636799 INFO: Send signal TERM or INT to terminate the process
2025/01/17 22:40:10 [*] 处理任务 任务 1
2025/01/17 22:40:10 [*] 处理任务 任务 2
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 [*] 处理任务 任务 10
2025/01/17 22:40:12 限速
2025/01/17 22:40:12 [*] 处理任务 任务 12
2025/01/17 22:40:13 限速
2025/01/17 22:40:13 限速
💥 任务去重
有两种方法可以对任务进行去重,一种是使用任务Id,另外一种是使用 Unique
选项:让 Asynq
为任务创建唯一性锁
1️⃣ 任务id
在入队时直接添加任务id即可
// 第一个任务应该没问题
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))
在定时任务,如果任务次数很频繁,当一个任务在外界因素(没有设置非失败性错误处理)失败或者一些时间段错误,会加入重试队列,第二个任务来的时候还是这些报错就又会加入重试队列,这样一来就造成任务积压,这个时候就可以使用去重了
_, err := scheduler.Register("@every 5m", task, asynq.TaskID("XXX"))
if err != nil {
panic(err)
}
2️⃣ 使用 unique
基于唯一性锁。在使用Unique选项入队任务时,Client会检查是否可以为给定任务获取锁。只有在可以获取锁时,任务才会被入队。如果已经有另一个任务持有锁,那么Client将返回一个错误(查看下面的示例代码以了解如何检查错误)。
唯一性锁与TTL(生存时间)相关联,以避免永久持有锁。在TTL后或者在任务在TTL之前成功处理后,锁将被释放。 需要注意的一件重要事情是,Asynq的唯一任务特性是尽力而为的唯一性。换句话说,在任务处理之前如果锁已经过期,可能会入队一个重复的任务。
任务的唯一性基于以下属性:
- 类型
- Payload(载荷)
- 队列
因此,如果存在相同类型和载荷的任务,并且入队到相同的队列,那么在锁被释放之前,不会入队具有相同属性的另一个任务
// 函数签名 传入TLL时间
func Unique(ttl time.Duration) Option
c := asynq.NewClient(redis)
t1 := asynq.NewTask("example", []byte("hello"))
// t1将在接下来的一个小时内持有唯一性锁。
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// 处理重复任务
case err != nil:
// 处理其他错误
}
t2 := asynq.NewTask("example", []byte("hello"))
// t2不能入队,因为它是t1的副本。
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// 处理重复任务
case err != nil:
// 处理其他错误
}
☕️ 保存任务以及结果
1️⃣ 保留任务
在默认情况下,任务执行完成(一旦任务成功通过Handler处,即Handler.ProcessTask返回ni)就会从队列中移除,如果你希望任务执行完成后会保存至队列中可以使用 Retention
,来设置任务保留至队列中的时间期限,函数签名如下
func Retention(d time.Duration) Option
可以在任务初始化时加,也可以在入队时加
// 在任务初始化时设置选项。
task := asynq.NewTask("my_task", payload, asynq.Retention(24 * time.Hour))
// 或者,在排队时设置选项。
info, err := client.Enqueue(task, asynq.Retention(24 * time.Hour))
2️⃣ 保留任务结果
如果你想在任务处理时存储与任务相关的一些数据,并且如果这些数据只在任务的生命周期内需要(即直到任务从队列中删除),那么你可以将数据与任务一起存储。
使用ResultWriter将数据写入redis,从而将写入的数据与任务关联起来。
⚠️ 注意:请谨慎考虑写入redis的数据量,如果需要存储的数据很大,最好使用基于磁盘的存储系统,比如SQL数据库。
// 在处理程序代码中。
func MyHandler(ctx context.Context, task *asynq.Task) error {
res, err := DoStuff(ctx, task)
if err != nil {
return fmt.Errorf("failed to process task: %v", err)
}
if _, err = task.ResultWriter().Write(res); err != nil {
return fmt.Errorf("failed to write task result: %v", err)
}
return nil
}
💥 任务聚合
使用任务聚合可以使得将多个连续操作批量处理成一个,以节省成本、优化缓存或批量通知。
为了使用任务聚合功能,您需要将任务以相同的组名(任务组)排队到同一个队列中。使用相同的(queue, group
)对排队的任务将由您提供的**GroupAggregator
**聚合为一个任务,并将聚合后的任务传递给处理程序
创建聚合任务时,Asynq
服务器将等待更多的任务,直到可配置的宽限期到期。每次使用相同的(queue, group
)排队新任务时,宽限期都会更新。
宽限期有可配置的上限:可以设置最大聚合延迟时间,在此之后,Asynq
服务器将无视剩余的宽限期并聚合任务。
还可以设置可以一起聚合的最大任务数。如果达到该数目,Asynq
服务器将立即聚合任务。
注意:任务的调度和聚合是冲突的功能,调度优先于聚合
调度和聚合的目标是矛盾的:
- 调度要求任务在特定时间点执行,强调准时性。
- 聚合要求任务在达到一定数量后执行,强调批量处理。
// 此函数用于将多个任务聚合为一个任务。
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... 您的逻辑以聚合给定的任务并返回聚合后的任务。
// ... 如果需要,使用NewTask(typename, payload, opts...)创建一个新任务并设置选项。
// ... (注意)将会忽略Queue选项,并且聚合后的任务将始终排队到组所属的同一个队列中。
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
// GroupAggregator 是一个函数,用于定义如何将多个任务聚合为一个任务。
// 这里传入了一个自定义的聚合函数 `aggregate`,该函数需要实现任务聚合的逻辑。
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
// GroupMaxDelay 是任务聚合的最大延迟时间。
// 如果任务组在达到最大大小(GroupMaxSize)之前未完成,系统会等待最多 10 分钟,然后强制处理当前已聚合的任务。
GroupMaxDelay: 10 * time.Minute,
// GroupGracePeriod 是任务组的宽限期。
// 当任务组达到最大大小(GroupMaxSize)或达到最大延迟时间(GroupMaxDelay)时,
// 系统会等待额外的 2 分钟,以确保所有可能的任务都能被聚合。
GroupGracePeriod: 2 * time.Minute,
// GroupMaxSize 是任务组的最大大小。
// 当任务组中的任务数量达到 20 时,系统会立即处理这些任务,而不需要等待 GroupMaxDelay。
GroupMaxSize: 20,
// Queues 是一个映射,定义了任务队列的名称及其优先级。
// 这里的 "notifications" 队列的优先级为 1,表示系统会优先处理该队列中的任务。
// 如果有多个队列,可以设置不同的优先级(数值越大,优先级越低)。
Queues: map[string]int{"notifications": 1},
},
)
GroupAggregatorFunc()
也是使用了一个装饰器模式,下面是它的函数签名
type GroupAggregatorFunc func(group string, tasks []*Task) *Task
// 执行聚合的具体方法
func (fn GroupAggregatorFunc) Aggregate(group string, tasks []*Task) *Task {
return fn(group, tasks)
}
它实现了 GroupAggregator
接口,其中有一个 Aggregate
,它就是执行聚合操作的实现方法
Aggregate 方法会在以下两种情况下被调用:
- 任务组达到最大大小(GroupMaxSize):当任务组中的任务数量达到 GroupMaxSize 时,Asynq 会立即调用 Aggregate 方法,将当前任务组中的所有任务聚合成一个任务。
- 任务组达到最大延迟时间(GroupMaxDelay): 如果任务组中的任务数量未达到 GroupMaxSize,但等待时间超过了 GroupMaxDelay,Asynq 也会调用 Aggregate 方法,将当前任务组中的任务聚合成一个任务。
// GroupAggregator 用于在任务被传递给 Handler 之前,将一组任务聚合成一个任务。
type GroupAggregator interface {
// Aggregate 将给定组名(group)下的任务列表(tasks)聚合成一个任务,
// 并返回一个新的任务,该任务是这些任务的聚合结果。
//
// 可以使用 NewTask(typename, payload, opts...) 来为聚合后的任务设置选项。
// 如果提供了 Queue 选项,它将被忽略,聚合后的任务总是会被入列到该组所属的队列中。
Aggregate(group string, tasks []*Task) *Task
}
下面是一个小实验
生产端代码: 这里创建了4个任务,且同属同一个任务组
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis服务器地址")
flagMessage = flag.String("message", "hello", "任务处理时要打印的消息")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: "redis.tanc.fun:6377", Password: "bxzxhygr.25"})
defer c.Close()
// 循环创建task
var tasks []*asynq.Task
for i := 0; i < 4; i++ {
tasks = append(tasks, asynq.NewTask("aggregation-tutorial", []byte(*flagMessage)))
}
// 循环入队
for _, task := range tasks {
_, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
if err != nil {
log.Fatalf("无法加入任务队列:%v", err)
}
}
log.Printf("成功加入任务队列\n")
}
消费端,将
package main
import (
"context"
"flag"
"github.com/hibiken/asynq"
"log"
"strings"
"time"
)
var (
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "组的优雅延迟时间")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "组的最大延迟时间")
flagGroupMaxSize = flag.Int("max-size", 20, "组的最大尺寸")
)
func main() {
srv := asynq.NewServer(asynq.RedisClientOpt{
Addr: "redis.tanc.fun:6377",
Password: "bxzxhygr.25",
DB: 0,
}, asynq.Config{
Queues: map[string]int{"tutorial": 1},
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
})
mux := asynq.NewServeMux()
mux.HandleFunc("aggregated-task", handleAggregatedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("无法启动服务器:%v", err)
}
}
// 简单的聚合函数。
// 将所有任务的消息组合在一起,每个消息占一行。
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("从组 %q 聚合了 %d 个任务", group, len(tasks))
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("aggregated-task", []byte(b.String()))
}
func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
log.Print("处理程序收到聚合的任务")
log.Printf("聚合的消息:%s", task.Payload())
return nil
}
输出
2025/01/19 18:04:54 从组 "example-group" 聚合了 4 个任务
2025/01/19 18:04:54 处理程序收到聚合的任务
2025/01/19 18:04:54 聚合的消息:hello
hello
hello
hello
☕️ 连接redis集群和哨兵
1️⃣ 连接redis集群
Asynq根据队列对数据进行分片。 在上图中,我们有一个6个实例的Redis Cluster(3个主节点,3个从节点)和4个队列(q1、q2、q3、q4)。
- Master1(以及它的副本Slave1)托管q1和q2。
- Master2(以及它的副本Slave2)托管q3。
- Master3(以及它的副本Slave3)托管q4。
当您使用asynq.Client将任务加入队列时,您可以使用Queue选项指定队列。 被加入队列的任务将由从这些队列拉取任务的asynq.Server(s)消费
可以通过 RedisClusterClientOpt
来连接集群
如果要了解集群原理的话请查看 redis集群
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
一个节点托管一个队列,可以使用如下命令查看
asynq queue ls --cluster
这个命令将打印一个队列列表,以及:
- 队列所属的集群节点
- 队列映射到的集群哈希槽
3️⃣ 配置哨兵
哨兵可以在 redis哨兵
配置 asynq
的 Client
和 Server
以使用Redis Sentinel非常简单。使用 RedisFailoverClientOpt
指定Redis主节点的名称和Redis Sentinel的地址。
var redis = &asynq.RedisFailoverClientOpt{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
}
然后将此客户端选项传递给 NewClient
和 NewBackground
,以创建一个使用Redis Sentinels的实例。
client := asynq.NewClient(redis)
// ...
srv := asynq.NewServer(redis, asynq.Config{ Concurrency: 10 })
通过这种设置,当Redis主节点故障时,Sentinels将启动故障转移过程,并通知 asynq
新的主节点,后台任务处理将继续正常工作