介绍

  1. 什么是 Routing?
    Routing 是指消息从交换机传递到队列的规则。它决定了哪些队列会接收到消息。路由规则通常由以下两个因素决定:

交换机类型:不同的交换机类型支持不同的路由规则。
Binding Key:队列绑定到交换机时指定的路由规则。
2. Routing 的核心组件
交换机(Exchange)
交换机负责接收生产者发送的消息,并根据路由规则将消息传递到一个或多个队列。
RabbitMQ 提供了四种核心的交换机类型:Direct、Fanout、Topic 和 Headers。
Binding Key
当队列绑定到交换机时,可以指定一个 Binding Key。
这个 Binding Key 用于匹配消息的 Routing Key,从而决定消息是否会被路由到该队列。
Routing Key
生产者在发送消息时,可以为消息指定一个 Routing Key。
交换机会根据 Routing Key 和 Binding Key 的匹配规则将消息分发到队列。
3. Routing 的工作流程
生产者发送消息:

生产者将消息发送到交换机,并指定一个 Routing Key。
示例:Routing Key = “error”。
交换机路由消息:

交换机会根据 Routing Key 和 Binding Key 的匹配规则,将消息分发到对应的队列。
示例:交换机将 Routing Key = “error” 的消息分发到绑定 Binding Key = “error” 的队列。
消费者接收消息:

消费者从队列中获取消息并处理。

代码:

producer.go

package main

import (
	"log"
	"os"
	"strings"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}
func main() {
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	err = ch.ExchangeDeclare(
		"logs_direct",
		"direct",
		true,
		false,
		false,
		false,
		nil,
	)
	failOnError(err, "Failed to declare an exchange")

	severity := os.Args[1]
	body := strings.Join(os.Args[2:], " ")
	err = ch.Publish(
		"logs_direct",
		severity, // routing key
		false,
		false,
		amqp.Publishing{
			DeliveryMode: amqp.Persistent,
			ContentType:  "text/plain",
			Body:         []byte(body),
		},
	)
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s: %s", severity, body)
}

consumer.go

package main

import (
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明 Direct Exchange
	err = ch.ExchangeDeclare(
		"logs_direct", // 交换机名称
		"direct",      // 交换机类型
		true,          // 持久化
		false,         // 自动删除
		false,         // 内部交换机
		false,         // 无等待
		nil,           // 参数
	)
	failOnError(err, "Failed to declare an exchange")

	// 声明队列
	q, err := ch.QueueDeclare(
		"",    // 队列名称(随机生成)
		false, // 持久化
		false, // 自动删除
		true,  // 排他性
		false, // 无等待
		nil,   // 参数
	)
	failOnError(err, "Failed to declare a queue")

	// 绑定队列到交换机
	severities := []string{"info", "error"}
	for _, severity := range severities {
		err = ch.QueueBind(
			q.Name,        // 队列名称
			severity,      // binding key
			"logs_direct", // 交换机名称
			false,         // 无等待
			nil,           // 参数
		)
		failOnError(err, "Failed to bind a queue")
	}

	// 接收消息
	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标签
		true,   // 自动确认
		false,  // 排他性
		false,  // 无本地
		false,  // 无等待
		nil,    // 参数
	)
	failOnError(err, "Failed to register a consumer")

	// 处理消息
	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf(" [x] Received %s: %s", d.RoutingKey, d.Body)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}