producer
package main
import (
"log"
"os"
"strings"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
package main
import (
"log"
"os"
"strconv"
"strings"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"task_queue", // 队列名称
true, // 持久化
false, // 自动删除
false, // 排他性
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
// 发送消息
body := bodyFrom(os.Args)
for i := 1; i <= 10; i++ {
err = ch.Publish(
"", // 交换机
q.Name, // 队列名称
false, // 强制
false, // 立即
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 消息持久化
ContentType: "text/plain",
Body: []byte(body + strconv.Itoa(i)),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body+strconv.Itoa(i))
}
}
// 从命令行参数中获取消息内容
func bodyFrom(args []string) string {
var s string
if len(args) < 2 || args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
worker
package main
import (
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"task_queue", // 队列名称
true, // 持久化
false, // 自动删除
false, // 排他性
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
// 设置公平分发
err = ch.Qos(
1, // 预取计数
0, // 预取大小
false, // 全局
)
failOnError(err, "Failed to set QoS")
// 接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标签
true, // 自动确认
false, // 排他性
false, // 无本地
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to register a consumer")
// 处理消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 模拟任务处理
time.Sleep(time.Second)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
消息确认
package main
import (
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"task_queue", // 队列名称
true, // 持久化
false, // 自动删除
false, // 排他性
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
// 接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标签
false, // 自动确认
false, // 排他性
false, // 无本地
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to register a consumer")
// 处理消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 模拟任务处理
time.Sleep(time.Second)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
我们修改上面的代码,让自动确认参数为false
可以看到消费过的消息没有被删除,这是因为worker没有确认消费,所以producer不会删除消息
我们用d.Ack手动确认
package main
import (
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"task_queue", // 队列名称
true, // 持久化
false, // 自动删除
false, // 排他性
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
// 接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标签
false, // 自动确认
false, // 排他性
false, // 无本地
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to register a consumer")
// 处理消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 模拟任务处理
time.Sleep(time.Second)
// 手动确认
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
这样就没问题了
公平分派
没有设置qos前:
设置QOS后:
producer.go
package main
import (
"log"
"strconv"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"task_queue", // 队列名称
true, // 持久化
false, // 自动删除
false, // 排他性
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
// 发送 10 条消息,奇数消息模拟为重任务,偶数消息模拟为轻任务
for i := 1; i <= 10; i++ {
body := "Task " + strconv.Itoa(i)
if i%2 == 1 {
body += " (Heavy)"
} else {
body += " (Light)"
}
err = ch.Publish(
"", // 交换机
q.Name, // 路由键
false, // 强制
false, // 立即
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久化消息
ContentType: "text/plain",
Body: []byte(body),
},
)
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
}
worker.go
package main
import (
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.Qos(1, 0, false)
failOnError(err, "Failed to set QoS")
// 声明队列
q, err := ch.QueueDeclare(
"task_queue", // 队列名称
true, // 持久化
false, // 自动删除
false, // 排他性
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
// 接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标签
false, // 自动确认
false, // 排他性
false, // 无本地
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to register a consumer")
// 处理消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 模拟任务处理
time.Sleep(time.Second)
// 手动确认
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
疑问
开启两个worker后,他们会轮流消费消息,为什么?
是的,这是 RabbitMQ 的默认行为!当你有多个消费者(Worker)订阅同一个队列时,RabbitMQ 会以 轮询(Round-robin) 的方式将消息分发给这些消费者。也就是说,消息会依次分发给每个消费者,从而实现负载均衡。
1. 为什么会出现轮流消费的现象?
RabbitMQ 的设计目标之一是公平分发消息。默认情况下,它会将消息依次分发给所有消费者,确保每个消费者都能平等地处理任务。例如:
- 如果有两个消费者(Worker1 和 Worker2),消息会依次分发给 Worker1、Worker2、Worker1、Worker2……
- 这种机制适用于大多数场景,尤其是在任务处理时间相近的情况下。
2. 如何验证这种现象?
你可以运行以下步骤来验证:
- 启动两个消费者(Worker):
go run worker.go go run worker.go
- 启动生产者并发送多条消息:
go run producer.go Message 1 go run producer.go Message 2 go run producer.go Message 3 go run producer.go Message 4
- 观察消费者的输出:
- Worker1 会处理 Message 1 和 Message 3。
- Worker2 会处理 Message 2 和 Message 4。
3. 如何避免消息分发不均?
默认的轮询分发方式在某些场景下可能会导致问题,例如:
- 如果某些消息处理时间较长,而其他消息处理时间较短,可能会导致某些消费者忙碌,而其他消费者空闲。
为了解决这个问题,可以使用 公平分发(Fair Dispatch) 机制,确保 RabbitMQ 不会一次性将大量消息分发给同一个消费者。具体实现如下:
设置公平分发
在消费者代码中,调用 Qos
方法设置预取计数(Prefetch Count):
err = ch.Qos(
1, // 预取计数(每次只分发一条消息)
0, // 预取大小(不限制消息大小)
false, // 全局设置(false 表示仅对当前消费者生效)
)
failOnError(err, "Failed to set QoS")
作用
Qos(1, 0, false)
表示 RabbitMQ 每次只会向消费者分发一条消息,直到消费者处理完当前消息并确认后,才会分发下一条消息。- 这样可以确保消息分发更加公平,避免某些消费者过载。
4. 完整示例代码(公平分发)
以下是启用公平分发的消费者代码:
package main
import (
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"task_queue", // 队列名称
true, // 持久化
false, // 自动删除
false, // 排他性
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
// 设置公平分发
err = ch.Qos(
1, // 预取计数
0, // 预取大小
false, // 全局设置
)
failOnError(err, "Failed to set QoS")
// 接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标签
false, // 自动确认(设置为 false,启用手动确认)
false, // 排他性
false, // 无本地
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to register a consumer")
// 处理消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 模拟任务处理
time.Sleep(time.Second)
log.Printf("Done")
d.Ack(false) // 手动确认消息
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
5. 公平分发的作用
- 每个消费者一次只处理一条消息,直到确认完成后才会接收下一条消息。
- 确保消息分发更加公平,避免某些消费者过载。
6. 总结
- RabbitMQ 默认以轮询的方式将消息分发给多个消费者。
- 如果需要更公平的分发机制,可以通过
Qos
方法设置预取计数,确保每个消费者一次只处理一条消息。 - 公平分发适用于任务处理时间不均匀的场景,避免某些消费者过载。