Go Asynq 从零到实战指南
Asynq 是一个基于 Redis 的 Go 语言异步任务队列库,支持延迟任务、重试、优先级、定时调度等特性。本教程将带你从零开始搭建一个完整的 Asynq 应用,包含客户端(生产者)和服务端(消费者)示例。
目录
1. 简介
Asynq 使用 Redis 作为后端存储,任务以 JSON 形式序列化并推入队列。消费者从队列中拉取任务并执行。其核心组件包括:
asynq.Client:用于发送任务(生产者)asynq.Server:用于处理任务(消费者)asynq.RedisClientOpt:Redis 连接配置
2. 环境准备
- Go 1.20+
- Redis 6.0+
- 项目目录结构如下:
your-project/
├── go.mod
├── client/
│ └── main.go
├── server/
│ └── main.go
└── task/
└── task.go
3. 安装依赖
在项目根目录执行:
go mod init your-project
go get github.com/hibiken/asynq
4. 定义任务类型
创建 task/task.go,定义任务类型和 payload 结构:
// task/task.go
package task
import (
"encoding/json"
"fmt"
)
// 任务类型常量
const (
TypeEmailDelivery = "email:deliver"
TypeImageResize = "image:resize"
)
// EmailDeliveryPayload 邮件任务负载
type EmailDeliveryPayload struct {
UserID int `json:"user_id"`
Email string `json:"email"`
Body string `json:"body"`
}
// ImageResizePayload 图片缩放任务负载
type ImageResizePayload struct {
ImageURL string `json:"image_url"`
Width int `json:"width"`
Height int `json:"height"`
}
// NewEmailDeliveryTask 创建邮件任务
func NewEmailDeliveryTask(userID int, email, body string) (*asynq.Task, error) {
payload, err := json.Marshal(EmailDeliveryPayload{
UserID: userID,
Email: email,
Body: body,
})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeEmailDelivery, payload), nil
}
// NewImageResizeTask 创建图片缩放任务
func NewImageResizeTask(imageURL string, width, height int) (*asynq.Task, error) {
payload, err := json.Marshal(ImageResizePayload{
ImageURL: imageURL,
Width: width,
Height: height,
})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeImageResize, payload), nil
}
注意:上面代码中使用了
asynq.Task,需在文件顶部导入:
import "github.com/hibiken/asynq"
5. 客户端:发送任务
创建 client/main.go:
// client/main.go
package main
import (
"log"
"your-project/task"
"github.com/hibiken/asynq"
)
func main() {
// Redis 配置
redisOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// Password: "yourpassword", // 如有密码
DB: 0,
}
// 创建客户端
client := asynq.NewClient(redisOpt)
defer client.Close()
// 发送邮件任务
emailTask, err := task.NewEmailDeliveryTask(123, "user@example.com", "Hello from Asynq!")
if err != nil {
log.Fatalf("Failed to create email task: %v", err)
}
// Enqueue 将任务推入默认队列
_, err = client.Enqueue(emailTask)
if err != nil {
log.Fatalf("Failed to enqueue email task: %v", err)
}
log.Println("Email task enqueued")
// 发送图片缩放任务(带延迟 10 秒)
imageTask, err := task.NewImageResizeTask("https://example.com/photo.jpg", 800, 600)
if err != nil {
log.Fatalf("Failed to create image task: %v", err)
}
// 延迟 10 秒执行
_, err = client.Enqueue(imageTask, asynq.ProcessIn(10))
if err != nil {
log.Fatalf("Failed to enqueue delayed image task: %v", err)
}
log.Println("Delayed image resize task enqueued (will run in 10s)")
}
6. 服务端:处理任务
创建 server/main.go:
// server/main.go
package main
import (
"context"
"encoding/json"
"fmt"
"go-asynq/task"
"log"
"github.com/hibiken/asynq"
)
// 实现任务处理器
type EmailProcessor struct{}
type ImageProcessor struct{}
func (p *EmailProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
var payload task.EmailDeliveryPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("json.Unmarshal failed: %w", err)
}
// 模拟发送邮件
log.Printf("Sending email to %s (user_id=%d): %s", payload.Email, payload.UserID, payload.Body)
// 实际中这里会调用 SMTP 或第三方 API
return nil
}
func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
var payload task.ImageResizePayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("json.Unmarshal failed: %w", err)
}
// 模拟图片处理
log.Printf("Resizing image %s to %dx%d", payload.ImageURL, payload.Width, payload.Height)
// 实际中这里会调用图像处理库
return nil
}
func main() {
redisOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
DB: 0,
}
// 创建服务器
srv := asynq.NewServer(redisOpt, asynq.Config{
Concurrency: 10, // 并发 worker 数
Queues: map[string]int{
"default": 10,
},
})
// 注册任务处理器
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeEmailDelivery, (&EmailProcessor{}).ProcessTask)
mux.HandleFunc(task.TypeImageResize, (&ImageProcessor{}).ProcessTask)
// 启动服务
if err := srv.Run(mux); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}
7. 启动 Redis
确保本地运行 Redis:
# 使用 Docker 启动 Redis(推荐)
docker run -d --name redis-asynq -p 6379:6379 redis:alpine
# 或直接安装 Redis 并启动
redis-server
8. 运行示例
步骤 1:启动服务端
cd server
go run main.go
输出:
2026/02/05 16:50:00 INFO: Starting processing
步骤 2:运行客户端(新终端)
cd client
go run main.go
输出:
2026/02/05 16:50:05 Email task enqueued
2026/02/05 16:50:05 Delayed image resize task enqueued (will run in 10s)
观察服务端日志
服务端将打印:
2026/02/05 16:50:05 Sending email to user@example.com (user_id=123): Hello from Asynq!
2026/02/05 16:51:15 Resizing image https://example.com/photo.jpg to 800x600
注意:图片任务因设置了 10 秒延迟,会在 10 秒后执行。
9. 进阶功能
9.1 重试机制
Asynq 默认最多重试 25 次。可通过 asynq.MaxRetry(n) 设置:
client.Enqueue(task, asynq.MaxRetry(3))
在处理器中返回 asynq.NewRetryError(err) 可触发重试。
9.2 任务超时
client.Enqueue(task, asynq.Timeout(30*time.Second))
9.3 多队列与优先级
// 客户端指定队列
client.Enqueue(task, asynq.Queue("critical"))
// 服务端配置优先级
srv := asynq.NewServer(redisOpt, asynq.Config{
Queues: map[string]int{
"critical": 10,
"default": 5,
},
})
9.4 定时任务(Cron)
使用 asynqmon 或结合 robfig/cron:
c := cron.New()
c.AddFunc("@daily", func() {
task, _ := task.NewEmailDeliveryTask(999, "admin@example.com", "Daily report")
client.Enqueue(task, asynq.Queue("cron"))
})
c.Start()
10. 常见问题
Q1: 任务丢失怎么办?
- 确保 Redis 持久化开启(RDB/AOF)
- 避免消费者 panic,应 recover 并记录错误
Q2: 如何监控任务状态?
- 使用官方 Web UI 工具 asynqmon
- 或通过
asynq.Inspector查询队列状态
Q3: 能否跨语言使用?
- Asynq 协议基于 Redis + JSON,理论上其他语言可实现兼容客户端,但官方仅支持 Go。
参考资料
- GitHub: https://github.com/hibiken/asynq
- 文档: https://pkg.go.dev/github.com/hibiken/asynq
- 示例项目: https://github.com/hibiken/asynq/tree/master/example
✅ 到此,你已掌握 Asynq 的基本使用!可在此基础上构建邮件系统、数据处理流水线、定时报表等异步任务场景。
打赏下吧