介绍
发布/订阅模式(Publish/Subscribe Pattern)是 RabbitMQ 中非常重要的消息传递模式之一,它允许你将消息广播给多个消费者。与 Worker Queue 模式不同,发布/订阅模式的核心思想是 一条消息会被多个消费者同时接收。
- 发布/订阅模式的核心概念、
在发布/订阅模式中,消息的发送者(Producer)将消息发送到 交换机(Exchange),而不是直接发送到队列。交换机将消息广播给所有绑定到它的队列,然后每个队列将消息传递给订阅它的消费者。
关键组件:
Producer:消息的发送者。
Exchange:接收消息并将其路由到队列。
Queue:存储消息,供消费者消费。
Consumer:消息的接收者。
工作流程:
Producer 将消息发送到 Exchange。
Exchange 将消息广播给所有绑定到它的队列。
每个队列将消息传递给订阅它的 Consumer。
- 交换机的类型
RabbitMQ 支持多种类型的交换机,发布/订阅模式通常使用 Fanout Exchange。Fanout Exchange 会将消息广播给所有绑定到它的队列,忽略路由键(Routing Key)。
Fanout Exchange 的特点:
消息会被广播给所有绑定到它的队列。
不需要指定路由键。
适用于发布/订阅模式。
-
实现发布/订阅模式的步骤
以下是实现发布/订阅模式的基本步骤: -
创建交换机
在 Producer 和 Consumer 中,需要创建一个 Fanout Exchange。 -
绑定队列到交换机
在 Consumer 中,将队列绑定到交换机。 -
发送消息
Producer 将消息发送到交换机。 -
接收消息
Consumer 从队列中接收消息。
代码
package main
import (
"log"
"strconv"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接到rabbitmq
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明 Fanout Exchange
err = ch.ExchangeDeclare(
"logs", //交换机名称
"fanout", //交换机类型
true, //是否持久化
false, //是否自动删除
false, // 内部交换机
false, // 等待服务器确认
nil, // 额外参数
)
failOnError(err, "Failed to declare an exchange")
for i := 0; i < 10; i++ {
body := "Hello World!" + strconv.Itoa(i)
err = ch.Publish(
"logs", //交换机
"", //路由键
false, // 强制
false, // 立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
}
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明 Fanout Exchange
err = ch.ExchangeDeclare(
"logs", // 交换机名称
"fanout", // 交换机类型
true, // 持久化
false, // 自动删除
false, // 内部交换机
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare an exchange")
// 声明队列(让 RabbitMQ 自动生成队列名称)
q, err := ch.QueueDeclare(
"", // 队列名称(空字符串表示让 RabbitMQ 自动生成)
false, // 持久化
false, // 自动删除
true, // 排他性
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
// 绑定队列到交换机
err = ch.QueueBind(
q.Name, // 队列名称
"", // 路由键(Fanout Exchange 忽略路由键)
"logs", // 交换机名称
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to bind a queue")
// 接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标签
true, // 自动确认
false, // 排他性
false, // 无本地
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to register a consumer")
// 处理消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}