介绍


发布/订阅模式(Publish/Subscribe Pattern)是 RabbitMQ 中非常重要的消息传递模式之一,它允许你将消息广播给多个消费者。与 Worker Queue 模式不同,发布/订阅模式的核心思想是 一条消息会被多个消费者同时接收。
- 发布/订阅模式的核心概念、
在发布/订阅模式中,消息的发送者(Producer)将消息发送到 交换机(Exchange),而不是直接发送到队列。交换机将消息广播给所有绑定到它的队列,然后每个队列将消息传递给订阅它的消费者。
关键组件:
Producer:消息的发送者。
Exchange:接收消息并将其路由到队列。
Queue:存储消息,供消费者消费。
Consumer:消息的接收者。
工作流程:
Producer 将消息发送到 Exchange。
Exchange 将消息广播给所有绑定到它的队列。
每个队列将消息传递给订阅它的 Consumer。
- 交换机的类型

RabbitMQ 支持多种类型的交换机,发布/订阅模式通常使用 Fanout Exchange。Fanout Exchange 会将消息广播给所有绑定到它的队列,忽略路由键(Routing Key)。
Fanout Exchange 的特点:
消息会被广播给所有绑定到它的队列。
不需要指定路由键。
适用于发布/订阅模式。
- 
实现发布/订阅模式的步骤 
 以下是实现发布/订阅模式的基本步骤:
- 
创建交换机 
 在 Producer 和 Consumer 中,需要创建一个 Fanout Exchange。
- 
绑定队列到交换机 
 在 Consumer 中,将队列绑定到交换机。
- 
发送消息 
 Producer 将消息发送到交换机。
- 
接收消息 
 Consumer 从队列中接收消息。
代码


package main
import (
	"log"
	"strconv"
	amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}
func main() {
	// 连接到rabbitmq
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	// 声明 Fanout Exchange
	err = ch.ExchangeDeclare(
		"logs",   //交换机名称
		"fanout", //交换机类型
		true,     //是否持久化
		false,    //是否自动删除
		false,    // 内部交换机
		false,    // 等待服务器确认
		nil,      // 额外参数
	)
	failOnError(err, "Failed to declare an exchange")
	for i := 0; i < 10; i++ {
		body := "Hello World!" + strconv.Itoa(i)
		err = ch.Publish(
			"logs", //交换机
			"",     //路由键
			false,  // 强制
			false,  // 立即
			amqp.Publishing{
				ContentType: "text/plain",
				Body:        []byte(body),
			},
		)
		failOnError(err, "Failed to publish a message")
		log.Printf(" [x] Sent %s", body)
	}
}
package main
import (
	"log"
	amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}
func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	// 声明 Fanout Exchange
	err = ch.ExchangeDeclare(
		"logs",   // 交换机名称
		"fanout", // 交换机类型
		true,     // 持久化
		false,    // 自动删除
		false,    // 内部交换机
		false,    // 无等待
		nil,      // 参数
	)
	failOnError(err, "Failed to declare an exchange")
	// 声明队列(让 RabbitMQ 自动生成队列名称)
	q, err := ch.QueueDeclare(
		"",    // 队列名称(空字符串表示让 RabbitMQ 自动生成)
		false, // 持久化
		false, // 自动删除
		true,  // 排他性
		false, // 无等待
		nil,   // 参数
	)
	failOnError(err, "Failed to declare a queue")
	// 绑定队列到交换机
	err = ch.QueueBind(
		q.Name, // 队列名称
		"",     // 路由键(Fanout Exchange 忽略路由键)
		"logs", // 交换机名称
		false,  // 无等待
		nil,    // 参数
	)
	failOnError(err, "Failed to bind a queue")
	// 接收消息
	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标签
		true,   // 自动确认
		false,  // 排他性
		false,  // 无本地
		false,  // 无等待
		nil,    // 参数
	)
	failOnError(err, "Failed to register a consumer")
	// 处理消息
	forever := make(chan bool)
	go func() {
		for d := range msgs {
			log.Printf(" [x] %s", d.Body)
		}
	}()
	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}
