介绍


发布/订阅模式(Publish/Subscribe Pattern)是 RabbitMQ 中非常重要的消息传递模式之一,它允许你将消息广播给多个消费者。与 Worker Queue 模式不同,发布/订阅模式的核心思想是 一条消息会被多个消费者同时接收。

  1. 发布/订阅模式的核心概念、

在发布/订阅模式中,消息的发送者(Producer)将消息发送到 交换机(Exchange),而不是直接发送到队列。交换机将消息广播给所有绑定到它的队列,然后每个队列将消息传递给订阅它的消费者。

关键组件:
Producer:消息的发送者。
Exchange:接收消息并将其路由到队列。
Queue:存储消息,供消费者消费。
Consumer:消息的接收者。

工作流程:
Producer 将消息发送到 Exchange。
Exchange 将消息广播给所有绑定到它的队列。
每个队列将消息传递给订阅它的 Consumer。

  1. 交换机的类型


RabbitMQ 支持多种类型的交换机,发布/订阅模式通常使用 Fanout Exchange。Fanout Exchange 会将消息广播给所有绑定到它的队列,忽略路由键(Routing Key)。

Fanout Exchange 的特点:
消息会被广播给所有绑定到它的队列。
不需要指定路由键。
适用于发布/订阅模式。

  1. 实现发布/订阅模式的步骤
    以下是实现发布/订阅模式的基本步骤:

  2. 创建交换机
    在 Producer 和 Consumer 中,需要创建一个 Fanout Exchange。

  3. 绑定队列到交换机
    在 Consumer 中,将队列绑定到交换机。

  4. 发送消息
    Producer 将消息发送到交换机。

  5. 接收消息
    Consumer 从队列中接收消息。

代码

package main

import (
	"log"
	"strconv"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}
func main() {
	// 连接到rabbitmq
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	// 声明 Fanout Exchange
	err = ch.ExchangeDeclare(
		"logs",   //交换机名称
		"fanout", //交换机类型
		true,     //是否持久化
		false,    //是否自动删除
		false,    // 内部交换机
		false,    // 等待服务器确认
		nil,      // 额外参数
	)
	failOnError(err, "Failed to declare an exchange")
	for i := 0; i < 10; i++ {
		body := "Hello World!" + strconv.Itoa(i)
		err = ch.Publish(
			"logs", //交换机
			"",     //路由键
			false,  // 强制
			false,  // 立即
			amqp.Publishing{
				ContentType: "text/plain",
				Body:        []byte(body),
			},
		)
		failOnError(err, "Failed to publish a message")
		log.Printf(" [x] Sent %s", body)
	}
}
package main

import (
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明 Fanout Exchange
	err = ch.ExchangeDeclare(
		"logs",   // 交换机名称
		"fanout", // 交换机类型
		true,     // 持久化
		false,    // 自动删除
		false,    // 内部交换机
		false,    // 无等待
		nil,      // 参数
	)
	failOnError(err, "Failed to declare an exchange")

	// 声明队列(让 RabbitMQ 自动生成队列名称)
	q, err := ch.QueueDeclare(
		"",    // 队列名称(空字符串表示让 RabbitMQ 自动生成)
		false, // 持久化
		false, // 自动删除
		true,  // 排他性
		false, // 无等待
		nil,   // 参数
	)
	failOnError(err, "Failed to declare a queue")

	// 绑定队列到交换机
	err = ch.QueueBind(
		q.Name, // 队列名称
		"",     // 路由键(Fanout Exchange 忽略路由键)
		"logs", // 交换机名称
		false,  // 无等待
		nil,    // 参数
	)
	failOnError(err, "Failed to bind a queue")

	// 接收消息
	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标签
		true,   // 自动确认
		false,  // 排他性
		false,  // 无本地
		false,  // 无等待
		nil,    // 参数
	)
	failOnError(err, "Failed to register a consumer")

	// 处理消息
	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf(" [x] %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}