📅 2025/1/12

📦 go版本: 1.23.3

💻 MacOs 14.6

🏆 Asynq

🍪 介绍

🏢 https://github.com/hibiken/asynq

Asynq是一个 Go语言异步任务框架,它以 Redis作为消息队列,具备可伸缩性和简易性,大致应该是使用了redis的list数据结构

异步性是指操作系统能够同时处理多个任务,并按照不同的速度进行处理和响应。它允许非阻塞调用、事件驱动、多任务并行执行,并通过回调机制处理异步操作的结果。通过异步性,操作系统提高了效率、资源利用率,并提供了更好的用户体验。

Asynq异步任务解决方案:

  • 客户端将任务放入队列
  • 服务器从队列中提取任务并为每个任务启动一个工作线程(协程)
  • 多个工作协程并行处理任务

任务队列是一种将工作分配到多台机器的机制。系统可以由多个工作服务器和代理组成,实现高可用性和水平扩展。 可以看一下我的Rocketmq介绍

特性

  • 保证至少执行一次任务
  • 任务调度
  • 重试失败的任务
  • 在工作线程崩溃时自动恢复任务
  • 加权优先级队列
  • 严格优先级队列
  • 由于Redis中的写操作快速,添加任务的延迟低
  • 使用唯一选项对任务进行去重
  • 允许为每个任务设置超时和截止时间
  • 允许聚合一组任务以批量执行多个连续操作
  • 灵活的处理程序接口,支持中间件
  • 允许暂停队列以停止从队列中处理任务
  • 周期性任务
  • 支持Redis Cluster以自动分片和实现高可用性
  • 支持Redis Sentinel实现高可用性
  • 与Prometheus集成,以收集和可视化队列指标
  • Web界面,用于检查和远程控制队列和任务
  • 命令行界面,用于检查和远程控制队列和任务

🍵 开启入门

教程地址: https://www.tizi365.com/topic/14001.html

首先需要安装redis(版本最新即可,自行安装),以及asynq

go install github.com/hibiken/asynq@latest

对应的需要创建客户端和消费端两个服务目录如下:

├── client
│   └── client.go
├── go.mod
├── go.sum
├── task
│   └── task.go
└── workers
    └── workers.go

client 和 workers 分别表示客户端和消费端,task用于保存创建和执行任务相关操作,开始之前先熟悉一下基本的asynq操作

1️⃣ Asynq连接redis

使用RedisClientOpt结构体来指定与本地运行的Redis服务器的连接。

type RedisClientOpt struct {
	// 要使用的网络类型,可以是 tcp 或 unix。
	// 默认是 tcp。
	Network string

	// Redis 服务器地址,格式为 "host:port"。
	Addr string

	// 在使用 Redis ACLs 时,用于验证当前连接的用户名。
	// 参见:https://redis.io/commands/auth
	Username string

	// 用于验证当前连接的密码。
	// 参见:https://redis.io/commands/auth
	Password string

	// 连接到服务器后要选择的 Redis 数据库。
	// 参见:https://redis.io/commands/select
	DB int

	// 建立新连接时的拨号超时。
	// 默认是 5 秒。
	DialTimeout time.Duration

	// 套接字读取的超时时间。
	// 如果达到超时,读取命令将以超时错误失败,而不是阻塞。
	//
	// 使用值 -1 表示无超时,0 表示默认值。
	// 默认是 3 秒。
	ReadTimeout time.Duration

	// 套接字写入的超时时间。
	// 如果达到超时,写入命令将以超时错误失败,而不是阻塞。
	//
	// 使用值 -1 表示无超时,0 表示默认值。
	// 默认是 ReadTimeout。
	WriteTimeout time.Duration

	// 套接字连接的最大数量。
	// 默认是每个 CPU 核心 10 个连接,由 runtime.NumCPU 报告。
	PoolSize int

	// 用于连接到服务器的 TLS 配置。
	// 只有设置此字段时才会协商 TLS。
	TLSConfig *tls.Config
}

示例配置

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // 如果不需要密码,可省略
    Password: "mypassword",
    // 为asynq使用一个专用的数据库编号。
    // 默认情况下,Redis提供16个数据库(0..15)
    DB: 0,
}

2️⃣ 任务

在asynq中,工作单元封装在称为Task的类型中,它概念上具有两个字段:Type和Payload。

// Type是一个字符串值,用于指示任务的类型。
func (t *Task) Type() string

// Payload是任务执行所需的数据。
func (t *Task) Payload() []byte

使用 NewTask即可创建任务,可以看到接受两个东西一个所类名称,一个是 payload

// NewTask 返回一个给定类型名称和负载数据的新任务。可以传递选项来配置任务处理行为。
func NewTask(typename string, payload []byte, opts ...Option) *Task

这些就是需要了解的基本类型,接下来开始操作

3️⃣ 客户端代码

客户端任务是创建任务并且加入任务队列,所以需要连接``redis`,使用NewClient来创建客户端

// NewClient 返回一个给定 redis 连接选项的新 Client 实例
func NewClient(r RedisConnOpt) *Client

创建任务使用``NewTask 即可创建任务,这里我将NewTask 任务封装至task`包中,相应的tasktype 和 payload也都封装到里面了

Enqueue 将给定任务放入队列中,如果任务入队成功,Enqueue 返回 TaskInfo 和 nil 错误,否则返回非 nil 错误,参数 opts 指定任务处理的行为。如果存在冲突的 Option 值,则最后一个值会覆盖其他值。提供给 NewTask 的任何选项都可以被传递给 Enqueue 的选项覆盖。默认情况下,最大重试次数设置为 25,超时设置为 30 分钟,如果未提供 ProcessAt ProcessIn 选项,任务将立即挂起

// 入队
func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error)

// ProcessIn 返回一个选项,用于指定相对于当前时间处理给定任务的时间。
func ProcessIn(d time.Duration) Option


// ProcessAt 返回一个选项来指定何时处理给定任务。
func ProcessAt(t time.Time) Option

代码如下:

这里创建了两个任务,分别是welcome和reminder

func main() {
    // 连接redis
	redisClient := asynq.NewClient(asynq.RedisClientOpt{
		Addr: "localhost:6379",
		DB:   0,
	})
	defer redisClient.Close()

	t1, err := task.NewWelcomeEmailTask("222@qq.com")
	if err != nil {
		panic(err)
	}
	t2, err := task.NewReminderEmailTask("222@qq.com")
	if err != nil {
		panic(err)
	}

	// 立即处理
	t1Info, err := redisClient.Enqueue(t1)
	if err != nil {
		panic(err)
	}
	fmt.Println("执行成功: ", t1Info)
  
    // 延时处理
	t2Info, err := redisClient.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
	if err != nil {
		panic(err)
	}
	fmt.Println("执行成功: ", t2Info)
}

对应的创建task代码

// 任务类型的列表。
const (
	TypeWelcomeEmail  = "email:welcome"
	TypeReminderEmail = "email:reminder"
)

// 与任何与电子邮件相关的任务相关的任务负载。
type emailTaskPayload struct {
	// 电子邮件收件人的ID。
	UserID string
}

// 新键任务
func NewWelcomeEmailTask(id string) (*asynq.Task, error) {
    // payload 切换为[]byte
	payload, err := json.Marshal(emailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

// 新键任务
func NewReminderEmailTask(id string) (*asynq.Task, error) {
	payload, err := json.Marshal(emailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeReminderEmail, payload), nil
}

4️⃣ 消费端代码

同样的也需要连redis不过使用 NewServer来创建,多了一个传参 Config作用是配置指定服务器的后台任务处理行为

// NewServer 返回一个给定 redis 连接选项和服务器配置的新服务器。
func NewServer(r RedisConnOpt, cfg Config) *Server

Config类型

type Config struct {
	// 同时处理任务的最大数量。
	//
	// 如果设置为零或负值,NewServer 将覆盖该值为当前进程可使用的 CPU 数量。
	Concurrency int

	// BaseContext 可选地指定一个函数,该函数返回此服务器上调用 Handler 的基础上下文。
	//
	// 如果 BaseContext 为 nil,则默认值为 context.Background()。
	// 如果定义了此函数,则它必须返回一个非空的上下文
	BaseContext func() context.Context

	// TaskCheckInterval 指定在所有队列为空时检查新任务以进行处理的时间间隔。
	//
	// 如果未设置、为零或负值,则时间间隔设置为 1 秒。
	//
	// 注意:将此值设置得过低可能会给 redis 增加显著负载。
	//
	// 默认情况下,TaskCheckInterval 被设置为 1 秒。
	TaskCheckInterval time.Duration

	// 用于计算失败任务重试延迟的函数。
	//
	// 默认情况下,它使用指数退避算法来计算延迟。
	RetryDelayFunc RetryDelayFunc

	// 用于确定 Handler 返回的错误是否为失败情况的谓词函数。
	// 如果该函数返回 false,则 Server 不会增加任务的重试计数器,
	// 并且 Server 不会记录队列统计信息(已处理和失败统计信息)以避免扭曲队列的错误率。
	//
	// 默认情况下,如果给定的错误是非 nil 的,则该函数返回 true。
	IsFailure func(error) bool

	// 要按给定优先级值处理的队列列表。键是队列的名称,值是关联的优先级值。
	//
	// 如果设置为 nil 或未指定,服务器将仅处理 "default" 队列。
	//
	// 为了避免使低优先级队列饥饿,按如下方式处理优先级:
	//
	// 示例:
	//
	//     Queues: map[string]int{
	//         "critical": 6,
	//         "default":  3,
	//         "low":      1,
	//     }
	//
	// 根据上述配置,并且所有队列都不为空,"critical"、"default"、"low" 队列中的任务应分别按 60%、30%、10% 的时间进行处理。
	//
	// 如果队列具有零或负的优先级值,则该队列将被忽略。
	Queues map[string]int

	// StrictPriority 表示是否应严格处理队列优先级。
	//
	// 如果设置为 true,则首先处理具有最高优先级的队列中的任务。
	// 只有当这些具有较高优先级的队列为空时,才处理较低优先级队列中的任务。
	StrictPriority bool

	ErrorHandler ErrorHandler

	// Logger 指定服务器实例使用的日志记录器。
	//
	// 如果未设置,则使用默认日志记录器。
	Logger Logger

	// LogLevel 指定要启用的最低日志级别。
	//
	// 如果未设置,默认使用 InfoLevel。
	LogLevel LogLevel

	// ShutdownTimeout 指定在停止服务器时等待工作者完成其任务
	// 的持续时间,之后强制中止它们。
	//
	// 如果未设置或为零,则使用默认的 8 秒超时。
	ShutdownTimeout time.Duration

	// HealthCheckFunc 定期调用,并传入在连接到 redis 服务器的 ping 过程中遇到的任何错误。
	HealthCheckFunc func(error)

	// HealthCheckInterval 指定健康检查之间的间隔。
	//
	// 如果未设置或为零,则时间间隔设置为 15 秒。
	HealthCheckInterval time.Duration

	// DelayedTaskCheckInterval 指定在 'scheduled' 和 'retry'
	// 任务上运行的检查间隔,并在它们准备好被处理时将它们转发到 'pending' 状态。
	//
	// 如果未设置或为零,则间隔设置为 5 秒。
	DelayedTaskCheckInterval time.Duration

	// GroupGracePeriod 指定服务器在聚合组内任务之前等待传入任务的时间量。
	// 如果在这段时间内接收到传入任务,服务器将再等待相同长度的另一个周期,直到达到 GroupMaxDelay(如果指定)。
	//
	// 如果未设置或为零,则宽限期设置为 1 分钟。
	// GroupGracePeriod 的最小持续时间为 1 秒。如果指定的值小于 1 秒,则调用 NewServer 将引发 panic。
	GroupGracePeriod time.Duration

	// GroupMaxDelay 指定服务器在聚合组内任务之前等待传入任务的最大时间量。
	//
	// 如果未设置或为零,则不使用延迟限制。
	GroupMaxDelay time.Duration

	// GroupMaxSize 指定可以在组内聚合到单个任务中的最大任务数量。
	// 如果达到 GroupMaxSize,则服务器将立即将任务聚合成一个。
	//
	// 如果未设置或为零,则不使用大小限制。
	GroupMaxSize int

	// GroupAggregator 指定用于将组内的多个任务聚合成一个任务的聚合函数。
	//
	// 如果未设置或为 nil,则服务器上的组聚合功能将被禁用。
	GroupAggregator GroupAggregator

	// JanitorInterval 指定过期已完成任务的 janitor 检查的平均间隔。
	//
	// 如果未设置或为零,则使用默认的 8 秒间隔。
	JanitorInterval time.Duration

	// JanitorBatchSize 指定在一次运行中要删除的过期已完成任务的数量。
	//
	// 如果未设置或为零,则使用默认的批量大小 100。
	// 确保不要将大的数字作为批量大小,以防止运行时间过长的脚本。
	JanitorBatchSize int
}

这里的asynq.NewServeMux() 是 作用是用来注册handler

server的Run接口作用就是运行 handler

// Run 启动任务处理并阻塞,直到	收到退出程序的操作系统信号。一旦它收到信号,它就会优雅地关闭所有活跃的工作线程和其他 goroutine 来处理任务
func (srv *Server) Run(handler Handler) error

handler的概念和gin中的差不多,是处理任务逻辑的,这里Run接受一个Handler接口,它有一个 ProcessTask方法,如果任务处理成功,ProcessTask 应返回 nil,如果 ProcessTask 返回非零错误或恐慌,如果剩余重试次数,任务将在延迟后重试,否则任务将被存档,此规则的一个例外是 ProcessTask 返回 SkipRetry 错误。如果返回的错误是 SkipRetry 或错误包裹 SkipRetry,则跳过重试,并立即归档任务,此规则的另一个例外是当 ProcessTask 返回 RevokeTask 错误时。如果返回的错误是 RevokeTask 或错误包装了 RevokeTask,则不会重试或存档该任务

type Handler interface {
    // 如果任务成功处理,ProcessTask应返回nil。
    // 如果ProcessTask返回一个非nil错误或导致panic,任务将稍后重试。
    ProcessTask(context.Context, *Task) error
}

实现一个 handler最简单的方法是定义一个具有相同签名的函数,并在将其传递给 Run时使用 asynq.HandlerFunc适配器类型。

这里没有指定任何type,也就表示是消费所有类型的type

// 自定义的handler
func handler(ctx context.Context, t *asynq.Task) error {
}

// 如果要被server run则可以使用asynq.HandlerFunc
server.Run(asynq.HandlerFunc(handler))

来看看 HandlerFunc的构造

// HandlerFunc 类型是一个适配器,允许使用普通函数作为 Handler。如果 f 是具有适当签名的函数,则 HandlerFunc(f) 是调用 f 的 Handler
type HandlerFunc func(context.Context, *Task) error


// 实现了Handler接口,它会调用调用HandlerFunc(context.Context, *Task)
func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error

但是我的代码使用 ServeMux来创建handler。就像来自 "net/http"包的 ServeMux一样,你可以通过调用Handle或HandleFunc来注册一个handler。ServeMux满足Handler接口,所以可以将其传递给Run,你可以注册多个处理器。它会将每个任务的类型与注册的模式列表进行匹配,并调用与任务类型名称最接近的模式对应的处理器。

// ServeMux 是异步任务的多路复用器。它将每个任务的类型与注册类型列表进行匹配,并调用与任务类型名称最匹配的类型的处理程序。
// 较长的类型优先于较短的类型,因此,如果同时为“images”和“images:thumbnails”注册了处理程序,则类型名以“images:thumbnails”开头的任务将调用后者的处理程序,而前者将被调用。接收类型名称以“images”开头的任务。
type ServeMux struct {
}


// 创建一个ServerMux
func NewServeMux() *ServeMux

//  Handle 注册给定类型的处理程序。如果类型的处理程序已存在,则 Handle 会出现panic
func (mux *ServeMux) Handle(pattern string, handler Handler)

// HandleFunc 注册给定类型的处理函数。
func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error)

// 处理程序返回用于给定任务的处理程序。它总是返回一个非零处理程序。处理程序还返回与任务匹配的注册类型。  如果没有适用于该任务的已注册处理程序,则处理程序将返回“未找到”处理程序,该处理程序会返回错误。

func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string)

// ProcessTask 将任务分派给其类型与任务类型最匹配的处理程序。
func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error

// 使用将 MiddlewareFunc 附加到链中。中间件按照它们应用于 ServeMux 的顺序执行。
func (mux *ServeMux) Use(mws ...MiddlewareFunc)

这里的 HandlerFunc方法我也封装到了 task

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{
			Addr: "localhost:6379",
			DB:   0,
		},
		asynq.Config{Concurrency: 10},
	)
	mux := asynq.NewServeMux()
	mux.HandleFunc("email:welcome", task.HandleWelcomeEmailTask)
	mux.HandleFunc("email:reminder", task.HandleReminderEmailTask)

	if err := srv.Run(mux); err != nil {
		panic(err)
	}
}

handler函数

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    // 从t.Payload 解析出payload信息
	var p emailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
    // 打印
	log.Printf(" [*] 向用户 %s 发送欢迎电子邮件", p.UserID)
	return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
	var p emailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] 向用户 %s 发送提醒电子邮件", p.UserID)
	return nil
}

5️⃣ 运行程序

运行这两个程序。让我们先运行client程序来创建和调度任务。

> go run client/client.go
执行成功:  &{c9e044e5-b078-414e-b5a7-e5da47fddae0 default email:welcome [123 34 85 115 101 114 73 68 34 58 34 50 50 50 64 113 113 46 99 111 109 34 125] pending 25 0  0001-01-01 00:00:00 +0000 UTC 30m0s 0001-01-01 00:00:00 +0000 UTC  2025-01-12 18:12:06.960881 +0800 CST m=+0.001739835 false 0s 0001-01-01 00:00:00 +0000 UTC []}
执行成功:  &{0e72bca0-c275-4b2e-997b-914f3bb4e582 default email:reminder [123 34 85 115 101 114 73 68 34 58 34 50 50 50 64 113 113 46 99 111 109 34 125] scheduled 25 0  0001-01-01 00:00:00 +0000 UTC 30m0s 0001-01-01 00:00:00 +0000 UTC  2025-01-13 18:12:06.970658 +0800 CST m=+86400.011517001 false 0s 0001-01-01 00:00:00 +0000 UTC []}

这将创建两个任务:一个立即处理的任务和一个在24小时后处理的任务。

让我们使用asynq的命令行界面来检查任务。在此界面中会显示队列列表,以上代码都是默认在default中运行,点进去就可以查看task的运行状态了

asynq dash

最后,让我们启动workers程序来处理任务。

go run workers/workers.go

注意:该程序不会退出,直到发送一个信号来终止程序

运行后应该能够在终端中看到一些文本输出,表示任务已成功处理。

6️⃣ 使用中间件

中间件就是使用了装饰器模式,也就是说,它接受一个handler运行你在运行hanlder之前或者之后在做一些其他事情,下面定义了有个中间件,它接受一个handler并且执行handler,执行完毕打印耗时
func LoggerMiddler(handler asynq.Handler) asynq.Handler {
	return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
		// 获取当前时间
		start_time := time.Now()

		// 执行任务
		err := handler.ProcessTask(ctx, t)
		if err != nil {
			return err
		}

		// 获取结束时间
        // time.Since可以快捷计算耗时
		fmt.Println("任务执行完成,耗时:", time.Since(start_time))

		return nil
	})
}

定义完成之后该如何使用呢?你可以直接使用中间件包裹handler比如:

middler.LoggerMiddler(asynq.HandlerFunc(task.HandleReminderEmailTask))

如果使用servermux则可以直接Use

mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", task.HandleWelcomeEmailTask)
mux.HandleFunc("email:reminder", task.HandleReminderEmailTask)
mux.Use(middler.LoggerMiddler)

启动测试一下

## 执行client创建任务
only/asynq/quickstart via 🐹 v1.23.3 on 🐳 v27.3.1 (orbstack) took 8m 10.0s 
➜ go run client/client.go        
执行成功:  &{513a7743-f0e7-4eef-a26b-5b1291c46fd8 default email:welcome [123 34 85 115 101 114 73 68 34 58 34 50 50 50 64 113 113 46 99 111 109 34 125] pending 25 0  0001-01-01 00:00:00 +0000 UTC 30m0s 0001-01-01 00:00:00 +0000 UTC  2025-01-13 21:58:18.68871 +0800 CST m=+0.002020751 false 0s 0001-01-01 00:00:00 +0000 UTC []}
执行成功:  &{f6ac043a-d0e4-42c8-9316-962c31eb7d5f default email:reminder [123 34 85 115 101 114 73 68 34 58 34 50 50 50 64 113 113 46 99 111 109 34 125] scheduled 25 0  0001-01-01 00:00:00 +0000 UTC 30m0s 0001-01-01 00:00:00 +0000 UTC  2025-01-14 21:58:18.699056 +0800 CST m=+86400.012366501 false 0s 0001-01-01 00:00:00 +0000 UTC []}

// 查看任务
only/asynq/quickstart via 🐹 v1.23.3 on 🐳 v27.3.1 (orbstack) 
➜ asynq task ls --queue=default --state=pending
ID                                    Type           Payload  
--                                    ----           -------  
513a7743-f0e7-4eef-a26b-5b1291c46fd8  email:welcome  {"UserID":"222@qq.com"}  

only/asynq/quickstart via 🐹 v1.23.3 on 🐳 v27.3.1 (orbstack) 
➜ asynq task ls --queue=default --state=scheduled
ID                                    Type            Payload                  Process In  
--                                    ----            -------                  ----------  
5baa8b51-d1fe-4bc3-922c-8083b2537308  email:reminder  {"UserID":"222@qq.com"}  in 23h49m39s 



## 运行server
only/asynq/quickstart via 🐹 v1.23.3 on 🐳 v27.3.1 (orbstack) 
➜ go run workers/workers.go      
asynq: pid=21643 2025/01/13 13:59:39.491731 INFO: Starting processing
asynq: pid=21643 2025/01/13 13:59:39.491829 INFO: Send signal TSTP to stop processing new tasks
asynq: pid=21643 2025/01/13 13:59:39.491869 INFO: Send signal TERM or INT to terminate the process
2025/01/13 21:59:39  [*] 向用户 222@qq.com 发送欢迎电子邮件
任务执行完成,耗时: 383.917µs ## 打印耗时了

如果不同应用需要使用到不同的中间件可以使用如下方法

productHandlers := asynq.NewServeMux()
productHandlers.Use(productMiddleware) // 对所有产品任务应用共享逻辑
productHandlers.HandleFunc("product:update", productUpdateTaskHandler)
// ... 注册其他“产品”任务处理函数

orderHandlers := asynq.NewServeMux()
orderHandler.Use(orderMiddleware) // 对所有订单任务应用共享逻辑
orderHandlers.HandleFunc("order:refund", orderRefundTaskHandler)
// ... 注册其他“订单”任务处理函数

// 顶级处理函数
mux := asynq.NewServeMux()
mux.Use(someGlobalMiddleware) // 对所有任务应用共享逻辑
mux.Handle("product:", productHandlers)
mux.Handle("order:", orderHandlers)

if err := srv.Run(mux); err != nil {
    log.Fatal(err)
}

🍵 Task生命周期

task有如下状态:

  1. Scheduled:任务正在等待未来处理(仅适用于具有ProcessAt或ProcessIn选项的任务)。
  2. Pending:任务已准备好进行处理,并将由一个空闲的工作器接收。
  3. Active:任务正在被工作器处理(即处理程序正在处理该任务)。
  4. Retry:工作器无法处理任务,任务正在等待将来重试。
  5. Archived:任务达到最大重试次数,并存储在归档中以供手动检查。
  6. Completed:任务已成功处理,并保留到保留时间到期为止(仅适用于具有Retention选项的任务)。
+-------------+            +--------------+          +--------------+           +-------------+
|             |            |              |          |              | Success   |             |
|  Scheduled  |----------->|   Pending    |--------->|    Active    |---------> |  Completed  |
|  (Optional) |            |              |          |              |           |  (Optional) |
+-------------+            +--------------+          +--------------+           +-------------+
                                  ^                       |                            |
                                  |                       |                            | Deletion
                                  |                       | Failed                     |
                                  |                       |                            V
                                  |                       |
                                  |                       |
                           +------+-------+               |        +--------------+
                           |              |               |        |              |
                           |    Retry     |<--------------+------->|   Archived   |
                           |              |                        |              |
                           +--------------+                        +--------------+

🍵 信号处理

当您使用Server.Run(Handler)启动服务器处理时,它将阻塞并等待传入的信号。

有两种类型的信号可发送给正在运行的程序,以优雅地关闭进程。

TSTP:此信号告诉Server停止处理新任务。
TERM或INT:此信号告诉Server终止(即关闭)。
建议您首先发送TSTP信号以停止处理新任务,并等待所有正在进行中的任务完成后,再发送TERM信号**以终止程序。

使用 kill 命令发送信号。

kill -TSTP  # 停止处理新任务

kill -TERM  # 关闭服务器

注意:如果在发送TERM或INT信号时没有发送TSTP信号,Server将启动一个定时器,等待8秒钟,以便所有工作人员完成(要自定义此超时持续时间,请使用ShutdownTime配置)。如果在该时间范围内有工作人员未完成,则任务将被转换回待处理状态,并在程序重新启动后再处理。

注意:目前,Windows不支持TSTP信号。

🍵 队列优先级

asynq 设置队列优先级的方式很简洁,使用如下代码即可:

srv := asynq.NewServer(redis, asynq.Config{
    Concurrency: 10,
    Queues: map[string]int{
        "critical": 6,
        "default":  3,
        "low":      1,
    },
})

这段代码就表示

  • critical队列中的任务将60%的时间进行处理
  • default队列中的任务将30%的时间进行处理
  • low队列中的任务将10%的时间进行处理

还可以设置在入队时指定队列入队的队列(默认就是进入 default队列)

client := asynq.NewClient(redis)
task := asynq.NewTask("send_notification", map[string]interface{}{"user_id": 42})

// 使用`asynq.Queue`选项指定一个任务使用"critical"队列。
err := client.Enqueue(task, asynq.Queue("critical"))

严格优先级模式,使用 StrictPriority开启

srv := asynq.NewServer(redis, asynq.Config{
    Concurrency: 10,
    Queues: map[string]int{
        "critical": 3,
        "default":  2,
        "low":      1,
    },
    StrictPriority: true, // 严格模式!
})

这将创建一个具有严格优先级的三个队列的 Background实例:criticaldefaultlow。在严格优先级模式下,具有更高优先级的队列始终先处理,只有在所有其他优先级更高的队列为空时,才会处理优先级较低的队列。

因此,在此示例中,始终首先处理critical队列中的任务。如果critical队列为空,则处理default队列。如果criticaldefault队列都为空,则处理low队列

🍵 设置重试次数

在默认情况下,最大重试次数是25次,达到最大重试次数之后加入 Archived中,还可以通过web或者cli来手动重试

1️⃣ 设置最大重试次数

使用 synq.MaxRetry在创建任务或者任务入队时可以设置最大重试次数

// 任务入队时
client.Enqueue(task, asynq.MaxRetry(5))

// 创建任务时
task := asynq.NewTask("feed:import", nil, asynq.MaxRetry(5))
client.Enqueue(task) // MaxRetry 设置为 5

2️⃣ 自定义重试延迟

默认重试时指数重试由 DefaultRetryDelayFunc定义,可以使用 RetryDelayFunc函数进行重新自定义什么间隔重试

// 函数签名
// n 是任务已重试的次数
// e 是任务处理程序返回的错误
// t 是相关的任务
RetryDelayFunc func(n int, e error, t *asynq.Task) time.Duration

比如,任务失败时每隔2秒重试

srv := asynq.NewServer(redis, asynq.Config{
    Concurrency: 20, // 最大任务并发数
    RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
        return 2 * time.Second // 每隔2秒重试
    },
})

可以为单独的任务设置单独的重试机制

srv := asynq.NewServer(redis, asynq.Config{
    // 对于 "foo" 任务,始终使用 2 秒的延迟,其他任务使用默认行为。
    RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
        if t.Type() == "foo" {
            return 2 * time.Second 
        }
        return asynq.DefaultRetryDelayFunc(n, e, t) 
    },
})

3️⃣ 非失败性错误处理

有时报错并不是因为这个任务本身发生报错了,而是因为一些外界因素导致这个任务报错,但是这些报错会占用重试的次数,如果像前面那样只设置了5次重试,那么一次的重试机会还是蛮宝贵的,你并不希望这种报错占用重试次数,那么就可以使用 Config 提供 IsFailure(error) bool 函数,可以在此函数内自定义不需要或者需要占用重试的报错类型

srv := asynq.NewServer(redisConnOpt, asynq.Config{
    // ... 其他配置选项
    IsFailure: func(err error) bool {
        return err != ErrResourceNotAvailable // 如果没有可用资源,则为非失败错误
    },
})

4️⃣ 跳过重试

如果 Handler.ProcessTask 返回一个 SkipRetry 错误,不管剩余的重试次数是多少,任务都将被归档。返回的错误可以是 SkipRetry 或包装了 SkipRetry 错误的错误

func ExampleHandler(ctx context.Context, task *asynq.Task) error {
    // 任务处理逻辑在此处...
    // 如果处理程序知道任务不需要重试,则返回 SkipRetry
    return fmt.Errorf(": %w", asynq.SkipRetry)
}

💥 任务超时与取消配置

在任务开始处理的时候,有些任务由于网络原因导致很久没有完成,后面又有很多任务来了,这可能会影响队列的性能,可能造成任务积压,这个时候就需要使用到任务超时配置了

可以配置TimeoutDeadline 来配置任务超时的选项,粗略的可以理解为前者为短时间、后者为长时间,它们都是使用 context.Context 的超时时间或截止时间,将其作为第一个参数传递给你的处理程序可以查看Go上下文

如果你有一个需要在30秒内完成的任务,你使用 timeout将超时时间设置为 30*time.Second

err := c.Enqueue(task, asynq.Timeout(30 * time.Second))

如果你有一个需要在特定时间之前完成的任务,你可以设置该任务的截止时间。
例如,如果你有一个需要在 2020-12-25 之前完成的任务,你可以将其作为 Deadline 选项传递

xmas := time.Date(2020, time.December, 12, 25, 0, 0, 0, time.UTC)
err := c.Enqueue(task, asynq.Deadline(xmas))

使用了 TimeoutDeadline 选项创建了任务但是还需要通过读取上下文中的 Done 通道来实现超时取消

func myHandler(ctx context.Context, task *asynq.Task) error {
    c := make(chan error, 1) // 创建一个带缓冲的 channel,用于接收任务处理结果
    go func() {
        c <- doWork(task) // 启动一个 goroutine 执行任务,并将结果发送到 channel
    }()
    select {
    case <-ctx.Done(): // 如果 ctx 被取消(超时或手动取消),则返回 ctx 的错误
        return ctx.Err()
    case res := <-c: // 如果任务完成,返回任务的结果
        return res
    }
}

还可以通过命令行取消任务

asynq task ls --queue=myqueue --state=active
asynq task cancel [task_id]

💥 定时任务

asynq有定时执行任务的功能叫做 schedulerScheduler会定期将任务加入队列,然后由集群中可用的工作服务器来执行这些任务

使用定时任务需要注意下面几点:

  • 确保每个调度只有一个Scheduler在运行,否则会出现重复任务

  • 使用集中化的方法意味着不需要同步调度,并且服务可以在不使用锁的情况下运行

  • 如果需要动态地添加和删除周期性任务,请使用 PeriodicTaskManager而不是直接使用 Scheduler

  • 时区的配置

    //默认情况下,周期任务使用的是UTC时间,但是你可以使用SchedulerOpts来更改所使用的时区
    // 例如,使用America/Los_Angeles时区而不是默认的UTC时区。
    loc, err := time.LoadLocation("America/Los_Angeles")
    if err != nil {
        panic(err)
    }
    scheduler := asynq.NewScheduler(
        redisConnOpt, 
        &asynq.SchedulerOpts{
            Location: loc,
        },
    )
    

1️⃣ 启动定时任务

在运行定时任务之前,需要创建一个调度器,并且将任务都注册至调度器,最后使用 Run启动调度器,启动之后它就会挂起,在收到终止命令后就会停止

// 创建任务调度器
scheduler := asynq.NewScheduler(redisConnOpt, nil)

// 创建任务
task := asynq.NewTask("example_task", nil)

// 你可以使用cron规范字符串来指定调度,表示每秒执行一次
entryID, err := scheduler.Register("* * * * *", task) // 注册任务
if err != nil {
    log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)

// 你还可以使用"@every "来指定间隔。
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
    log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)

// 你也可以传递选项,比如入队的队列
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
    log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)


// 使用Run来启动调度
if err := scheduler.Run(); err != nil {
    log.Fatal(err)
}

2️⃣ 自定义无法入队错误处理

可以自定义队无法入队情况下的错误

// task 表示入队失败的任务对象
// opts 内包含任务的选项列表
// handleEnqueueError 是 EnqueueErrorHandler 的实现
func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
	// 记录错误日志
	log.Printf("Failed to enqueue task: type=%s, payload=%s, error=%v", task.Type(), task.Payload(), err)

	// 可以根据任务类型和错误类型进行不同的处理
	switch task.Type() {
	case TypeEmailDelivery:
		// 如果是邮件任务,可以尝试重试或记录到数据库
		log.Printf("Retrying email task for user: %s", task.Payload())
		// 这里可以添加重试逻辑,例如重新入队或记录到数据库
	default:
		// 其他任务类型的处理逻辑
		log.Printf("Unhandled task type: %s", task.Type())
	}
}

scheduler := asynq.NewScheduler(
    redisConnOpt, 
    &asynq.SchedulerOpts{
        EnqueueErrorHandler: handleEnqueueError,
    },
)

还可以通过CLI进行检查CLI有一个名为 cron的子命令用于检查调度器记录。要查看当前正在运行的调度器的所有记录,可以运行以下命令:

asynq cron ls

这个命令将输出一个包含每个记录的ID、计划规范、下次加入队列时间和上次加入队列时间的列表。你还可以运行以下命令来查看每个记录的历史记录:

asynq cron history 

💥 执行任务速率限制

asynq中可以使用 rate包来对任务速率进行限制,这里的限制是限制每一个 task处理 handler,并不表示整个消费端

我觉得这个要配上定时任务才方便使用,如果是只执行一次Enqueue就没有必要使用任务限制器

package main

import (
    "context"
    "errors"
    "fmt"
    "log"
    "math/rand"
    "time"

    "golang.org/x/time/rate"
    "github.com/hibiken/asynq"
)

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: ":6379"},
        asynq.Config{
            Concurrency:    10,
            // 如果错误是由于速率限制导致的,请不将错误计为故障。
            IsFailure:      func(err error) bool { return !IsRateLimitError(err) },
            RetryDelayFunc: retryDelay,
        },
    )

    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

type RateLimitError struct {
    RetryIn time.Duration
}

func (e *RateLimitError) Error() string {
    return fmt.Sprintf("速率限制(%v 后重试)", e.RetryIn)
}

func IsRateLimitError(err error) bool {
    _, ok := err.(*RateLimitError)
    return ok
}

func retryDelay(n int, err error, task *asynq.Task) time.Duration {
    var ratelimitErr *RateLimitError
    if errors.As(err, &ratelimitErr) {
        return ratelimitErr.RetryIn
    }
    return asynq.DefaultRetryDelayFunc(n, err, task)
}

// 每秒1次事件的速率,最多允许2个事件的突发。
var limiter = rate.NewLimiter(1, 2)

func handler(ctx context.Context, task *asynq.Task) error {
    if !limiter.Allow() {
        log.Println("限速任务")
        return &RateLimitError{
            RetryIn: time.Duration(rand.Intn(10)) * time.Second,
        }
    }
    log.Printf("[*] 处理任务 %s", task.Payload())
    return nil
}

这段代码定义了一个自定义错误名为 RateLimitError内有一个字段表示如果发生了这个错误则 RetryIn时间后重试

如果想了解自定义错误可以https://blog.tanc.fun:9999/archives/dc612560-1a14-48d7-97f7-d72dee3f9786#2---%E8%87%AA%E5%AE%9A%E4%B9%89%E9%94%99%E8%AF%AF 内查看

核心代码是如下这段,使用 rateNewLimiter方法定义了一个限速器,在执行 handler的时候会先调用限速器的 Allow方法,可以将限速器看成一个令牌桶,在创建令牌桶时里面存放了两个令牌,调用 Allow方法来获取令牌,如果获取不到那就返回自定义得到 RateLimitError错误,来达到限制的作用

// 每秒1次事件的速率,最多允许2个事件的突发。
var limiter = rate.NewLimiter(1, 2)

func handler(ctx context.Context, task *asynq.Task) error {
    if !limiter.Allow() {
        log.Println("限速任务")
        return &RateLimitError{
            RetryIn: time.Duration(rand.Intn(10)) * time.Second,
        }
    }
    log.Printf("[*] 处理任务 %s", task.Payload())
    return nil
}

返回这个错误就会被之前说的自定义延迟和非失败性错误给处理了

启动一个生产端来测试一下

func main() {
	// 创建 Asynq 客户端
	client := asynq.NewClient(asynq.RedisClientOpt{
		Addr:     ":6379", // Redis 地址
	})
	defer client.Close()

	// 定时任务入队
	ticker := time.NewTicker(300 * time.Millisecond) // 每 0.3 秒触发一次
	defer ticker.Stop()

	for i := 1; ; i++ {
		<-ticker.C // 等待定时器触发

		// 创建任务
		task := asynq.NewTask("example_task", []byte(fmt.Sprintf("任务 %d", i)))

		// 将任务推送到队列中
		info, err := client.Enqueue(task)
		if err != nil {
			log.Printf("Failed to enqueue task: %v", err)
		} else {
			log.Printf("Enqueued task: id=%s queue=%s", info.ID, info.Queue)
		}
	}
}

启动worker和product端,就会有很多限速的

asynq: pid=18420 2025/01/17 14:40:01.636133 INFO: Starting processing
asynq: pid=18420 2025/01/17 14:40:01.636799 INFO: Send signal TERM or INT to terminate the process
2025/01/17 22:40:10 [*] 处理任务 任务 1
2025/01/17 22:40:10 [*] 处理任务 任务 2
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 限速
2025/01/17 22:40:11 [*] 处理任务 任务 10
2025/01/17 22:40:12 限速
2025/01/17 22:40:12 [*] 处理任务 任务 12
2025/01/17 22:40:13 限速
2025/01/17 22:40:13 限速

💥 任务去重

有两种方法可以对任务进行去重,一种是使用任务Id,另外一种是使用 Unique选项:让 Asynq为任务创建唯一性锁

1️⃣ 任务id

在入队时直接添加任务id即可

// 第一个任务应该没问题
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))
在定时任务,如果任务次数很频繁,当一个任务在外界因素(没有设置非失败性错误处理)失败或者一些时间段错误,会加入重试队列,第二个任务来的时候还是这些报错就又会加入重试队列,这样一来就造成任务积压,这个时候就可以使用去重了
	_, err := scheduler.Register("@every 5m", task, asynq.TaskID("XXX"))
	if err != nil {
		panic(err)
	}

2️⃣ 使用 unique

基于唯一性锁。在使用Unique选项入队任务时,Client会检查是否可以为给定任务获取锁。只有在可以获取锁时,任务才会被入队。如果已经有另一个任务持有锁,那么Client将返回一个错误(查看下面的示例代码以了解如何检查错误)。

唯一性锁与TTL(生存时间)相关联,以避免永久持有锁。在TTL后或者在任务在TTL之前成功处理后,锁将被释放。 需要注意的一件重要事情是,Asynq的唯一任务特性是尽力而为的唯一性。换句话说,在任务处理之前如果锁已经过期,可能会入队一个重复的任务。

任务的唯一性基于以下属性:

  • 类型
  • Payload(载荷)
  • 队列

因此,如果存在相同类型和载荷的任务,并且入队到相同的队列,那么在锁被释放之前,不会入队具有相同属性的另一个任务

// 函数签名 传入TLL时间
func Unique(ttl time.Duration) Option
c := asynq.NewClient(redis)

t1 := asynq.NewTask("example", []byte("hello"))

// t1将在接下来的一个小时内持有唯一性锁。
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // 处理重复任务
case err != nil:
    // 处理其他错误
}

t2 := asynq.NewTask("example", []byte("hello"))
 
// t2不能入队,因为它是t1的副本。
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // 处理重复任务
case err != nil:
    // 处理其他错误
}

☕️ 保存任务以及结果

1️⃣ 保留任务

在默认情况下,任务执行完成(一旦任务成功通过Handler处,即Handler.ProcessTask返回ni)就会从队列中移除,如果你希望任务执行完成后会保存至队列中可以使用 Retention,来设置任务保留至队列中的时间期限,函数签名如下

func Retention(d time.Duration) Option

可以在任务初始化时加,也可以在入队时加

// 在任务初始化时设置选项。
task := asynq.NewTask("my_task", payload, asynq.Retention(24 * time.Hour))

// 或者,在排队时设置选项。
info, err := client.Enqueue(task, asynq.Retention(24 * time.Hour))

2️⃣ 保留任务结果

如果你想在任务处理时存储与任务相关的一些数据,并且如果这些数据只在任务的生命周期内需要(即直到任务从队列中删除),那么你可以将数据与任务一起存储。

使用ResultWriter将数据写入redis,从而将写入的数据与任务关联起来。

⚠️ 注意:请谨慎考虑写入redis的数据量,如果需要存储的数据很大,最好使用基于磁盘的存储系统,比如SQL数据库。

// 在处理程序代码中。
func MyHandler(ctx context.Context, task *asynq.Task) error {
    res, err := DoStuff(ctx, task)
    if err != nil {
        return fmt.Errorf("failed to process task: %v", err)
    }
    if _, err = task.ResultWriter().Write(res); err != nil {
        return fmt.Errorf("failed to write task result: %v", err)
    }
    return nil
}

💥 任务聚合

使用任务聚合可以使得将多个连续操作批量处理成一个,以节省成本、优化缓存或批量通知。

为了使用任务聚合功能,您需要将任务以相同的组名(任务组)排队到同一个队列中。使用相同的(queue, group)对排队的任务将由您提供的**GroupAggregator**聚合为一个任务,并将聚合后的任务传递给处理程序

创建聚合任务时,Asynq服务器将等待更多的任务,直到可配置的宽限期到期。每次使用相同的(queue, group)排队新任务时,宽限期都会更新。

宽限期有可配置的上限:可以设置最大聚合延迟时间,在此之后,Asynq服务器将无视剩余的宽限期并聚合任务。

还可以设置可以一起聚合的最大任务数。如果达到该数目,Asynq服务器将立即聚合任务。

注意:任务的调度和聚合是冲突的功能,调度优先于聚合

调度和聚合的目标是矛盾的:

  1. 调度要求任务在特定时间点执行,强调准时性。
  2. 聚合要求任务在达到一定数量后执行,强调批量处理。
// 此函数用于将多个任务聚合为一个任务。
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... 您的逻辑以聚合给定的任务并返回聚合后的任务。
    // ... 如果需要,使用NewTask(typename, payload, opts...)创建一个新任务并设置选项。
    // ... (注意)将会忽略Queue选项,并且聚合后的任务将始终排队到组所属的同一个队列中。
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
                // GroupAggregator 是一个函数,用于定义如何将多个任务聚合为一个任务。
                // 这里传入了一个自定义的聚合函数 `aggregate`,该函数需要实现任务聚合的逻辑。
                GroupAggregator: asynq.GroupAggregatorFunc(aggregate),

                // GroupMaxDelay 是任务聚合的最大延迟时间。
                // 如果任务组在达到最大大小(GroupMaxSize)之前未完成,系统会等待最多 10 分钟,然后强制处理当前已聚合的任务。
                GroupMaxDelay: 10 * time.Minute,

                // GroupGracePeriod 是任务组的宽限期。
                // 当任务组达到最大大小(GroupMaxSize)或达到最大延迟时间(GroupMaxDelay)时,
                // 系统会等待额外的 2 分钟,以确保所有可能的任务都能被聚合。
                GroupGracePeriod: 2 * time.Minute,

                // GroupMaxSize 是任务组的最大大小。
                // 当任务组中的任务数量达到 20 时,系统会立即处理这些任务,而不需要等待 GroupMaxDelay。
                GroupMaxSize: 20,

                // Queues 是一个映射,定义了任务队列的名称及其优先级。
                // 这里的 "notifications" 队列的优先级为 1,表示系统会优先处理该队列中的任务。
                // 如果有多个队列,可以设置不同的优先级(数值越大,优先级越低)。
                Queues: map[string]int{"notifications": 1},
           },
       )

GroupAggregatorFunc() 也是使用了一个装饰器模式,下面是它的函数签名

type GroupAggregatorFunc func(group string, tasks []*Task) *Task


// 执行聚合的具体方法
func (fn GroupAggregatorFunc) Aggregate(group string, tasks []*Task) *Task {
	return fn(group, tasks)
}

它实现了 GroupAggregator接口,其中有一个 Aggregate,它就是执行聚合操作的实现方法

Aggregate 方法会在以下两种情况下被调用:

  1. 任务组达到最大大小(GroupMaxSize):当任务组中的任务数量达到 GroupMaxSize 时,Asynq 会立即调用 Aggregate 方法,将当前任务组中的所有任务聚合成一个任务。
  2. 任务组达到最大延迟时间(GroupMaxDelay): 如果任务组中的任务数量未达到 GroupMaxSize,但等待时间超过了 GroupMaxDelay,Asynq 也会调用 Aggregate 方法,将当前任务组中的任务聚合成一个任务。
// GroupAggregator 用于在任务被传递给 Handler 之前,将一组任务聚合成一个任务。
type GroupAggregator interface {
	// Aggregate 将给定组名(group)下的任务列表(tasks)聚合成一个任务,
	// 并返回一个新的任务,该任务是这些任务的聚合结果。
	//
	// 可以使用 NewTask(typename, payload, opts...) 来为聚合后的任务设置选项。
	// 如果提供了 Queue 选项,它将被忽略,聚合后的任务总是会被入列到该组所属的队列中。
	Aggregate(group string, tasks []*Task) *Task
}

下面是一个小实验

生产端代码: 这里创建了4个任务,且同属同一个任务组

package main

import (
	"flag"
	"log"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis服务器地址")
	flagMessage   = flag.String("message", "hello", "任务处理时要打印的消息")
)

func main() {
	flag.Parse()

	c := asynq.NewClient(asynq.RedisClientOpt{Addr: "redis.tanc.fun:6377", Password: "bxzxhygr.25"})
	defer c.Close()

	// 循环创建task
	var tasks []*asynq.Task
	for i := 0; i < 4; i++ {
		tasks = append(tasks, asynq.NewTask("aggregation-tutorial", []byte(*flagMessage)))
	}

	// 循环入队
	for _, task := range tasks {
		_, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
		if err != nil {
			log.Fatalf("无法加入任务队列:%v", err)
		}
	}
	log.Printf("成功加入任务队列\n")
}

消费端,将

package main

import (
	"context"
	"flag"
	"github.com/hibiken/asynq"
	"log"
	"strings"
	"time"
)

var (
	flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "组的优雅延迟时间")
	flagGroupMaxDelay    = flag.Duration("max-delay", 30*time.Second, "组的最大延迟时间")
	flagGroupMaxSize     = flag.Int("max-size", 20, "组的最大尺寸")
)

func main() {
	srv := asynq.NewServer(asynq.RedisClientOpt{
		Addr:     "redis.tanc.fun:6377",
		Password: "bxzxhygr.25",
		DB:       0,
	}, asynq.Config{
		Queues:           map[string]int{"tutorial": 1},
		GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
		GroupGracePeriod: *flagGroupGracePeriod,
		GroupMaxDelay:    *flagGroupMaxDelay,
		GroupMaxSize:     *flagGroupMaxSize,
	})

	mux := asynq.NewServeMux()
	mux.HandleFunc("aggregated-task", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("无法启动服务器:%v", err)
	}
}

// 简单的聚合函数。
// 将所有任务的消息组合在一起,每个消息占一行。
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("从组 %q 聚合了 %d 个任务", group, len(tasks))
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("aggregated-task", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("处理程序收到聚合的任务")
	log.Printf("聚合的消息:%s", task.Payload())
	return nil
}

输出

2025/01/19 18:04:54 从组 "example-group" 聚合了 4 个任务
2025/01/19 18:04:54 处理程序收到聚合的任务
2025/01/19 18:04:54 聚合的消息:hello
hello
hello
hello

☕️ 连接redis集群和哨兵

1️⃣ 连接redis集群

image-20250119215943546

Asynq根据队列对数据进行分片。 在上图中,我们有一个6个实例的Redis Cluster(3个主节点,3个从节点)和4个队列(q1、q2、q3、q4)。

  • Master1(以及它的副本Slave1)托管q1和q2。
  • Master2(以及它的副本Slave2)托管q3。
  • Master3(以及它的副本Slave3)托管q4。

当您使用asynq.Client将任务加入队列时,您可以使用Queue选项指定队列。 被加入队列的任务将由从这些队列拉取任务的asynq.Server(s)消费

可以通过 RedisClusterClientOpt来连接集群

如果要了解集群原理的话请查看 redis集群

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

一个节点托管一个队列,可以使用如下命令查看

asynq queue ls --cluster

这个命令将打印一个队列列表,以及:

  • 队列所属的集群节点
  • 队列映射到的集群哈希槽

3️⃣ 配置哨兵

哨兵可以在 redis哨兵

配置 asynqClientServer以使用Redis Sentinel非常简单。使用 RedisFailoverClientOpt指定Redis主节点的名称和Redis Sentinel的地址。

var redis = &asynq.RedisFailoverClientOpt{
    MasterName:    "mymaster",
    SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
}

然后将此客户端选项传递给 NewClientNewBackground,以创建一个使用Redis Sentinels的实例。

client := asynq.NewClient(redis)

// ...

srv := asynq.NewServer(redis, asynq.Config{ Concurrency: 10 })

通过这种设置,当Redis主节点故障时,Sentinels将启动故障转移过程,并通知 asynq新的主节点,后台任务处理将继续正常工作