Topic Exchange 是 RabbitMQ 中最灵活的一种交换机类型,它允许通过 通配符 的方式匹配 Routing Key 和 Binding Key,从而实现更复杂的消息路由规则。以下是关于 Topic Exchange 的详细说明:
1. Topic Exchange 的核心概念
- Routing Key:生产者发送消息时指定的路由键。
- Binding Key:队列绑定到交换机时指定的绑定键。
- 通配符:
*
(星号):匹配一个单词。#
(井号):匹配零个或多个单词。
2. Topic Exchange 的匹配规则
- Routing Key 和 Binding Key 是由
.
分隔的单词组成的字符串。 - 通配符可以用于 Binding Key 中,以实现灵活的匹配。
示例
Binding Key | Routing Key | 是否匹配 |
---|---|---|
quick.orange.rabbit |
quick.orange.rabbit |
是 |
quick.orange.* |
quick.orange.rabbit |
是 |
quick.orange.* |
quick.orange.fox |
是 |
quick.orange.* |
quick.brown.fox |
否 |
quick.# |
quick.orange.rabbit |
是 |
quick.# |
quick.brown.fox |
是 |
#.rabbit |
quick.orange.rabbit |
是 |
*.orange.* |
quick.orange.rabbit |
是 |
*.orange.* |
lazy.orange.elephant |
是 |
*.orange.* |
quick.brown.fox |
否 |
3. Topic Exchange 的使用场景
- 日志系统:将不同级别和来源的日志路由到不同的队列。
- 通知系统:根据用户类型和通知类型分发消息。
- 订单系统:根据订单状态和地区分发消息。
4. 代码实现
Producer 代码
package main
import (
"log"
"os"
"strings"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明 Topic Exchange
err = ch.ExchangeDeclare(
"logs_topic", // 交换机名称
"topic", // 交换机类型
true, // 持久化
false, // 自动删除
false, // 内部交换机
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare an exchange")
// 获取 Routing Key 和消息内容
routingKey := os.Args[1]
body := strings.Join(os.Args[2:], " ")
// 发送消息
err = ch.Publish(
"logs_topic", // 交换机名称
routingKey, // Routing Key
false, // 强制
false, // 立即
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久化消息
ContentType: "text/plain",
Body: []byte(body),
},
)
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s: %s", routingKey, body)
}
Consumer 代码
package main
import (
"log"
"os"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明 Topic Exchange
err = ch.ExchangeDeclare(
"logs_topic", // 交换机名称
"topic", // 交换机类型
true, // 持久化
false, // 自动删除
false, // 内部交换机
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare an exchange")
// 声明队列
q, err := ch.QueueDeclare(
"", // 队列名称(随机生成)
false, // 持久化
false, // 自动删除
true, // 排他性
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
// 绑定队列到交换机
bindingKeys := os.Args[1:]
for _, bindingKey := range bindingKeys {
err = ch.QueueBind(
q.Name, // 队列名称
bindingKey, // Binding Key
"logs_topic", // 交换机名称
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to bind a queue")
}
// 接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标签
true, // 自动确认
false, // 排他性
false, // 无本地
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to register a consumer")
// 处理消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] Received %s: %s", d.RoutingKey, d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
5. 运行示例
-
启动消费者,绑定
*.orange.*
和lazy.#
:go run consumer.go "*.orange.*" "lazy.#"
-
启动生产者,发送消息:
go run producer.go "quick.orange.rabbit" "This is a quick orange rabbit" go run producer.go "lazy.orange.elephant" "This is a lazy orange elephant" go run producer.go "lazy.brown.fox" "This is a lazy brown fox" go run producer.go "quick.brown.fox" "This is a quick brown fox"
-
观察消费者日志,验证消息是否被正确路由。
6. 总结
- Topic Exchange 通过通配符实现了灵活的路由规则。
- 使用
*
和#
可以匹配复杂的 Routing Key。 - 适用于需要根据多个条件分发消息的场景。
如果有其他问题,欢迎继续提问!