

Topic Exchange 是 RabbitMQ 中最灵活的一种交换机类型,它允许通过 通配符 的方式匹配 Routing Key 和 Binding Key,从而实现更复杂的消息路由规则。以下是关于 Topic Exchange 的详细说明:
1. Topic Exchange 的核心概念
- Routing Key:生产者发送消息时指定的路由键。
- Binding Key:队列绑定到交换机时指定的绑定键。
- 通配符:
- *(星号):匹配一个单词。
- #(井号):匹配零个或多个单词。
 
2. Topic Exchange 的匹配规则
- Routing Key 和 Binding Key 是由 .分隔的单词组成的字符串。
- 通配符可以用于 Binding Key 中,以实现灵活的匹配。
示例
| Binding Key | Routing Key | 是否匹配 | 
|---|---|---|
| quick.orange.rabbit | quick.orange.rabbit | 是 | 
| quick.orange.* | quick.orange.rabbit | 是 | 
| quick.orange.* | quick.orange.fox | 是 | 
| quick.orange.* | quick.brown.fox | 否 | 
| quick.# | quick.orange.rabbit | 是 | 
| quick.# | quick.brown.fox | 是 | 
| #.rabbit | quick.orange.rabbit | 是 | 
| *.orange.* | quick.orange.rabbit | 是 | 
| *.orange.* | lazy.orange.elephant | 是 | 
| *.orange.* | quick.brown.fox | 否 | 
3. Topic Exchange 的使用场景
- 日志系统:将不同级别和来源的日志路由到不同的队列。
- 通知系统:根据用户类型和通知类型分发消息。
- 订单系统:根据订单状态和地区分发消息。
4. 代码实现
Producer 代码
package main
import (
	"log"
	"os"
	"strings"
	amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}
func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	// 声明 Topic Exchange
	err = ch.ExchangeDeclare(
		"logs_topic", // 交换机名称
		"topic",      // 交换机类型
		true,         // 持久化
		false,        // 自动删除
		false,        // 内部交换机
		false,        // 无等待
		nil,          // 参数
	)
	failOnError(err, "Failed to declare an exchange")
	// 获取 Routing Key 和消息内容
	routingKey := os.Args[1]
	body := strings.Join(os.Args[2:], " ")
	// 发送消息
	err = ch.Publish(
		"logs_topic", // 交换机名称
		routingKey,   // Routing Key
		false,        // 强制
		false,        // 立即
		amqp.Publishing{
			DeliveryMode: amqp.Persistent, // 持久化消息
			ContentType:  "text/plain",
			Body:         []byte(body),
		},
	)
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s: %s", routingKey, body)
}
Consumer 代码
package main
import (
	"log"
	"os"
	amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}
func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	// 声明 Topic Exchange
	err = ch.ExchangeDeclare(
		"logs_topic", // 交换机名称
		"topic",      // 交换机类型
		true,         // 持久化
		false,        // 自动删除
		false,        // 内部交换机
		false,        // 无等待
		nil,          // 参数
	)
	failOnError(err, "Failed to declare an exchange")
	// 声明队列
	q, err := ch.QueueDeclare(
		"",    // 队列名称(随机生成)
		false, // 持久化
		false, // 自动删除
		true,  // 排他性
		false, // 无等待
		nil,   // 参数
	)
	failOnError(err, "Failed to declare a queue")
	// 绑定队列到交换机
	bindingKeys := os.Args[1:]
	for _, bindingKey := range bindingKeys {
		err = ch.QueueBind(
			q.Name,        // 队列名称
			bindingKey,    // Binding Key
			"logs_topic",  // 交换机名称
			false,         // 无等待
			nil,           // 参数
		)
		failOnError(err, "Failed to bind a queue")
	}
	// 接收消息
	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标签
		true,   // 自动确认
		false,  // 排他性
		false,  // 无本地
		false,  // 无等待
		nil,    // 参数
	)
	failOnError(err, "Failed to register a consumer")
	// 处理消息
	forever := make(chan bool)
	go func() {
		for d := range msgs {
			log.Printf(" [x] Received %s: %s", d.RoutingKey, d.Body)
		}
	}()
	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}
5. 运行示例
- 
启动消费者,绑定 *.orange.*和lazy.#:go run consumer.go "*.orange.*" "lazy.#"
- 
启动生产者,发送消息: go run producer.go "quick.orange.rabbit" "This is a quick orange rabbit" go run producer.go "lazy.orange.elephant" "This is a lazy orange elephant" go run producer.go "lazy.brown.fox" "This is a lazy brown fox" go run producer.go "quick.brown.fox" "This is a quick brown fox"
- 
观察消费者日志,验证消息是否被正确路由。 
6. 总结
- Topic Exchange 通过通配符实现了灵活的路由规则。
- 使用 *和#可以匹配复杂的 Routing Key。
- 适用于需要根据多个条件分发消息的场景。
如果有其他问题,欢迎继续提问!
