Topic Exchange 是 RabbitMQ 中最灵活的一种交换机类型,它允许通过 通配符 的方式匹配 Routing KeyBinding Key,从而实现更复杂的消息路由规则。以下是关于 Topic Exchange 的详细说明:


1. Topic Exchange 的核心概念

  • Routing Key:生产者发送消息时指定的路由键。
  • Binding Key:队列绑定到交换机时指定的绑定键。
  • 通配符
    • *(星号):匹配一个单词。
    • #(井号):匹配零个或多个单词。

2. Topic Exchange 的匹配规则

  • Routing KeyBinding Key 是由 . 分隔的单词组成的字符串。
  • 通配符可以用于 Binding Key 中,以实现灵活的匹配。

示例

Binding Key Routing Key 是否匹配
quick.orange.rabbit quick.orange.rabbit
quick.orange.* quick.orange.rabbit
quick.orange.* quick.orange.fox
quick.orange.* quick.brown.fox
quick.# quick.orange.rabbit
quick.# quick.brown.fox
#.rabbit quick.orange.rabbit
*.orange.* quick.orange.rabbit
*.orange.* lazy.orange.elephant
*.orange.* quick.brown.fox

3. Topic Exchange 的使用场景

  • 日志系统:将不同级别和来源的日志路由到不同的队列。
  • 通知系统:根据用户类型和通知类型分发消息。
  • 订单系统:根据订单状态和地区分发消息。

4. 代码实现

Producer 代码

package main

import (
	"log"
	"os"
	"strings"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明 Topic Exchange
	err = ch.ExchangeDeclare(
		"logs_topic", // 交换机名称
		"topic",      // 交换机类型
		true,         // 持久化
		false,        // 自动删除
		false,        // 内部交换机
		false,        // 无等待
		nil,          // 参数
	)
	failOnError(err, "Failed to declare an exchange")

	// 获取 Routing Key 和消息内容
	routingKey := os.Args[1]
	body := strings.Join(os.Args[2:], " ")

	// 发送消息
	err = ch.Publish(
		"logs_topic", // 交换机名称
		routingKey,   // Routing Key
		false,        // 强制
		false,        // 立即
		amqp.Publishing{
			DeliveryMode: amqp.Persistent, // 持久化消息
			ContentType:  "text/plain",
			Body:         []byte(body),
		},
	)
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s: %s", routingKey, body)
}

Consumer 代码

package main

import (
	"log"
	"os"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明 Topic Exchange
	err = ch.ExchangeDeclare(
		"logs_topic", // 交换机名称
		"topic",      // 交换机类型
		true,         // 持久化
		false,        // 自动删除
		false,        // 内部交换机
		false,        // 无等待
		nil,          // 参数
	)
	failOnError(err, "Failed to declare an exchange")

	// 声明队列
	q, err := ch.QueueDeclare(
		"",    // 队列名称(随机生成)
		false, // 持久化
		false, // 自动删除
		true,  // 排他性
		false, // 无等待
		nil,   // 参数
	)
	failOnError(err, "Failed to declare a queue")

	// 绑定队列到交换机
	bindingKeys := os.Args[1:]
	for _, bindingKey := range bindingKeys {
		err = ch.QueueBind(
			q.Name,        // 队列名称
			bindingKey,    // Binding Key
			"logs_topic",  // 交换机名称
			false,         // 无等待
			nil,           // 参数
		)
		failOnError(err, "Failed to bind a queue")
	}

	// 接收消息
	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标签
		true,   // 自动确认
		false,  // 排他性
		false,  // 无本地
		false,  // 无等待
		nil,    // 参数
	)
	failOnError(err, "Failed to register a consumer")

	// 处理消息
	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf(" [x] Received %s: %s", d.RoutingKey, d.Body)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

5. 运行示例

  1. 启动消费者,绑定 *.orange.*lazy.#

    go run consumer.go "*.orange.*" "lazy.#"
    
  2. 启动生产者,发送消息:

    go run producer.go "quick.orange.rabbit" "This is a quick orange rabbit"
    go run producer.go "lazy.orange.elephant" "This is a lazy orange elephant"
    go run producer.go "lazy.brown.fox" "This is a lazy brown fox"
    go run producer.go "quick.brown.fox" "This is a quick brown fox"
    
  3. 观察消费者日志,验证消息是否被正确路由。


6. 总结

  • Topic Exchange 通过通配符实现了灵活的路由规则。
  • 使用 *# 可以匹配复杂的 Routing Key
  • 适用于需要根据多个条件分发消息的场景。

如果有其他问题,欢迎继续提问!