go-asynq 示例

阅读:14
作者:majingjing
发布:2026-03-14 19:22:42

Go Asynq 从零到实战指南

Asynq 是一个基于 Redis 的 Go 语言异步任务队列库,支持延迟任务、重试、优先级、定时调度等特性。本教程将带你从零开始搭建一个完整的 Asynq 应用,包含客户端(生产者)和服务端(消费者)示例。


目录

  1. 简介
  2. 环境准备
  3. 安装依赖
  4. 定义任务类型
  5. 客户端:发送任务
  6. 服务端:处理任务
  7. 启动 Redis
  8. 运行示例
  9. 进阶功能
  10. 常见问题

1. 简介

Asynq 使用 Redis 作为后端存储,任务以 JSON 形式序列化并推入队列。消费者从队列中拉取任务并执行。其核心组件包括:

  • asynq.Client:用于发送任务(生产者)
  • asynq.Server:用于处理任务(消费者)
  • asynq.RedisClientOpt:Redis 连接配置

2. 环境准备

  • Go 1.20+
  • Redis 6.0+
  • 项目目录结构如下:
your-project/
├── go.mod
├── client/
│   └── main.go
├── server/
│   └── main.go
└── task/
    └── task.go

3. 安装依赖

在项目根目录执行:

go mod init your-project
go get github.com/hibiken/asynq

4. 定义任务类型

创建 task/task.go,定义任务类型和 payload 结构:

// task/task.go
package task

import (
	"encoding/json"
	"fmt"
)

// 任务类型常量
const (
	TypeEmailDelivery = "email:deliver"
	TypeImageResize   = "image:resize"
)

// EmailDeliveryPayload 邮件任务负载
type EmailDeliveryPayload struct {
	UserID int    `json:"user_id"`
	Email  string `json:"email"`
	Body   string `json:"body"`
}

// ImageResizePayload 图片缩放任务负载
type ImageResizePayload struct {
	ImageURL string `json:"image_url"`
	Width    int    `json:"width"`
	Height   int    `json:"height"`
}

// NewEmailDeliveryTask 创建邮件任务
func NewEmailDeliveryTask(userID int, email, body string) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailDeliveryPayload{
		UserID: userID,
		Email:  email,
		Body:   body,
	})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeEmailDelivery, payload), nil
}

// NewImageResizeTask 创建图片缩放任务
func NewImageResizeTask(imageURL string, width, height int) (*asynq.Task, error) {
	payload, err := json.Marshal(ImageResizePayload{
		ImageURL: imageURL,
		Width:    width,
		Height:   height,
	})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeImageResize, payload), nil
}

注意:上面代码中使用了 asynq.Task,需在文件顶部导入:

import "github.com/hibiken/asynq"

5. 客户端:发送任务

创建 client/main.go

// client/main.go
package main

import (
	"log"
	"your-project/task"

	"github.com/hibiken/asynq"
)

func main() {
	// Redis 配置
	redisOpt := asynq.RedisClientOpt{
		Addr: "localhost:6379",
		// Password: "yourpassword", // 如有密码
		DB: 0,
	}

	// 创建客户端
	client := asynq.NewClient(redisOpt)
	defer client.Close()

	// 发送邮件任务
	emailTask, err := task.NewEmailDeliveryTask(123, "user@example.com", "Hello from Asynq!")
	if err != nil {
		log.Fatalf("Failed to create email task: %v", err)
	}

	// Enqueue 将任务推入默认队列
	_, err = client.Enqueue(emailTask)
	if err != nil {
		log.Fatalf("Failed to enqueue email task: %v", err)
	}
	log.Println("Email task enqueued")

	// 发送图片缩放任务(带延迟 10 秒)
	imageTask, err := task.NewImageResizeTask("https://example.com/photo.jpg", 800, 600)
	if err != nil {
		log.Fatalf("Failed to create image task: %v", err)
	}

	// 延迟 10 秒执行
	_, err = client.Enqueue(imageTask, asynq.ProcessIn(10))
	if err != nil {
		log.Fatalf("Failed to enqueue delayed image task: %v", err)
	}
	log.Println("Delayed image resize task enqueued (will run in 10s)")
}

6. 服务端:处理任务

创建 server/main.go

// server/main.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"go-asynq/task"
	"log"

	"github.com/hibiken/asynq"
)

// 实现任务处理器
type EmailProcessor struct{}
type ImageProcessor struct{}

func (p *EmailProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
	var payload task.EmailDeliveryPayload
	if err := json.Unmarshal(t.Payload(), &payload); err != nil {
		return fmt.Errorf("json.Unmarshal failed: %w", err)
	}

	// 模拟发送邮件
	log.Printf("Sending email to %s (user_id=%d): %s", payload.Email, payload.UserID, payload.Body)

	// 实际中这里会调用 SMTP 或第三方 API
	return nil
}

func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
	var payload task.ImageResizePayload
	if err := json.Unmarshal(t.Payload(), &payload); err != nil {
		return fmt.Errorf("json.Unmarshal failed: %w", err)
	}

	// 模拟图片处理
	log.Printf("Resizing image %s to %dx%d", payload.ImageURL, payload.Width, payload.Height)

	// 实际中这里会调用图像处理库
	return nil
}

func main() {
	redisOpt := asynq.RedisClientOpt{
		Addr: "localhost:6379",
		DB:   0,
	}

	// 创建服务器
	srv := asynq.NewServer(redisOpt, asynq.Config{
		Concurrency: 10, // 并发 worker 数
		Queues: map[string]int{
			"default": 10,
		},
	})

	// 注册任务处理器
	mux := asynq.NewServeMux()
	mux.HandleFunc(task.TypeEmailDelivery, (&EmailProcessor{}).ProcessTask)
	mux.HandleFunc(task.TypeImageResize, (&ImageProcessor{}).ProcessTask)

	// 启动服务
	if err := srv.Run(mux); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}


7. 启动 Redis

确保本地运行 Redis:

# 使用 Docker 启动 Redis(推荐)
docker run -d --name redis-asynq -p 6379:6379 redis:alpine

# 或直接安装 Redis 并启动
redis-server

8. 运行示例

步骤 1:启动服务端

cd server
go run main.go

输出:

2026/02/05 16:50:00 INFO: Starting processing

步骤 2:运行客户端(新终端)

cd client
go run main.go

输出:

2026/02/05 16:50:05 Email task enqueued
2026/02/05 16:50:05 Delayed image resize task enqueued (will run in 10s)

观察服务端日志

服务端将打印:

2026/02/05 16:50:05 Sending email to user@example.com (user_id=123): Hello from Asynq!
2026/02/05 16:51:15 Resizing image https://example.com/photo.jpg to 800x600

注意:图片任务因设置了 10 秒延迟,会在 10 秒后执行。


9. 进阶功能

9.1 重试机制

Asynq 默认最多重试 25 次。可通过 asynq.MaxRetry(n) 设置:

client.Enqueue(task, asynq.MaxRetry(3))

在处理器中返回 asynq.NewRetryError(err) 可触发重试。

9.2 任务超时

client.Enqueue(task, asynq.Timeout(30*time.Second))

9.3 多队列与优先级

// 客户端指定队列
client.Enqueue(task, asynq.Queue("critical"))

// 服务端配置优先级
srv := asynq.NewServer(redisOpt, asynq.Config{
	Queues: map[string]int{
		"critical": 10,
		"default":   5,
	},
})

9.4 定时任务(Cron)

使用 asynqmon 或结合 robfig/cron

c := cron.New()
c.AddFunc("@daily", func() {
	task, _ := task.NewEmailDeliveryTask(999, "admin@example.com", "Daily report")
	client.Enqueue(task, asynq.Queue("cron"))
})
c.Start()

10. 常见问题

Q1: 任务丢失怎么办?

  • 确保 Redis 持久化开启(RDB/AOF)
  • 避免消费者 panic,应 recover 并记录错误

Q2: 如何监控任务状态?

  • 使用官方 Web UI 工具 asynqmon
  • 或通过 asynq.Inspector 查询队列状态

Q3: 能否跨语言使用?

  • Asynq 协议基于 Redis + JSON,理论上其他语言可实现兼容客户端,但官方仅支持 Go。

参考资料

  • GitHub: https://github.com/hibiken/asynq
  • 文档: https://pkg.go.dev/github.com/hibiken/asynq
  • 示例项目: https://github.com/hibiken/asynq/tree/master/example

✅ 到此,你已掌握 Asynq 的基本使用!可在此基础上构建邮件系统、数据处理流水线、定时报表等异步任务场景。