介绍
- 什么是 Routing?
Routing 是指消息从交换机传递到队列的规则。它决定了哪些队列会接收到消息。路由规则通常由以下两个因素决定:
交换机类型:不同的交换机类型支持不同的路由规则。
Binding Key:队列绑定到交换机时指定的路由规则。
2. Routing 的核心组件
交换机(Exchange)
交换机负责接收生产者发送的消息,并根据路由规则将消息传递到一个或多个队列。
RabbitMQ 提供了四种核心的交换机类型:Direct、Fanout、Topic 和 Headers。
Binding Key
当队列绑定到交换机时,可以指定一个 Binding Key。
这个 Binding Key 用于匹配消息的 Routing Key,从而决定消息是否会被路由到该队列。
Routing Key
生产者在发送消息时,可以为消息指定一个 Routing Key。
交换机会根据 Routing Key 和 Binding Key 的匹配规则将消息分发到队列。
3. Routing 的工作流程
生产者发送消息:
生产者将消息发送到交换机,并指定一个 Routing Key。
示例:Routing Key = “error”。
交换机路由消息:
交换机会根据 Routing Key 和 Binding Key 的匹配规则,将消息分发到对应的队列。
示例:交换机将 Routing Key = “error” 的消息分发到绑定 Binding Key = “error” 的队列。
消费者接收消息:
消费者从队列中获取消息并处理。
代码:
producer.go
package main
import (
"log"
"os"
"strings"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_direct",
"direct",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare an exchange")
severity := os.Args[1]
body := strings.Join(os.Args[2:], " ")
err = ch.Publish(
"logs_direct",
severity, // routing key
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
},
)
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s: %s", severity, body)
}
consumer.go
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明 Direct Exchange
err = ch.ExchangeDeclare(
"logs_direct", // 交换机名称
"direct", // 交换机类型
true, // 持久化
false, // 自动删除
false, // 内部交换机
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare an exchange")
// 声明队列
q, err := ch.QueueDeclare(
"", // 队列名称(随机生成)
false, // 持久化
false, // 自动删除
true, // 排他性
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
// 绑定队列到交换机
severities := []string{"info", "error"}
for _, severity := range severities {
err = ch.QueueBind(
q.Name, // 队列名称
severity, // binding key
"logs_direct", // 交换机名称
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to bind a queue")
}
// 接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标签
true, // 自动确认
false, // 排他性
false, // 无本地
false, // 无等待
nil, // 参数
)
failOnError(err, "Failed to register a consumer")
// 处理消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] Received %s: %s", d.RoutingKey, d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}