RabbitMQ 可靠性传递

image-20250211155159481

image-20250211155234108

image-20250211155252659

在 RabbitMQ 中,可靠性传递 是指确保消息从生产者发送到消费者过程中不会丢失。为了实现这一点,RabbitMQ 提供了多种机制来保证消息的可靠性传递。以下是实现 RabbitMQ 可靠性传递的关键策略:


1. 消息持久化

消息持久化是确保消息在 RabbitMQ 服务器重启后不会丢失的关键机制。

实现方式

  • 队列持久化
    在声明队列时,将 durable 参数设置为 true

    q, err := ch.QueueDeclare(
        "my_queue", // 队列名称
        true,       // 持久化
        false,      // 自动删除
        false,      // 排他性
        false,      // 无等待
        nil,        // 参数
    )
    
  • 消息持久化
    在发送消息时,将 DeliveryMode 设置为 amqp.Persistent

    err = ch.Publish(
        "my_exchange", // 交换机名称
        "my_key",      // Routing Key
        false,         // 强制
        false,         // 立即
        amqp.Publishing{
            DeliveryMode: amqp.Persistent, // 持久化消息
            ContentType:  "text/plain",
            Body:         []byte("Hello, World!"),
        },
    )
    

注意事项

  • 只有队列和消息都持久化时,才能确保消息在服务器重启后不丢失。
  • 持久化会降低性能,因为消息需要写入磁盘。

2. 生产者确认(Publisher Confirms)

生产者确认机制可以确保消息成功到达 RabbitMQ 服务器。

实现方式

  • 开启生产者确认模式:

    err = ch.Confirm(false) // 非批量确认
    
  • 监听确认消息:

    confirm := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
    go func() {
        for c := range confirm {
            if c.Ack {
                log.Println("Message confirmed")
            } else {
                log.Println("Message not confirmed")
            }
        }
    }()
    

注意事项

  • 如果消息未确认,生产者可以重试发送。
  • 批量确认模式可以提高性能,但会增加复杂性。

3. 消费者确认(Consumer Acknowledgements)

消费者确认机制可以确保消息被消费者成功处理。

实现方式

  • 手动确认模式:
    在消费消息时,将 autoAck 参数设置为 false,并在处理完消息后手动确认。

    msgs, err := ch.Consume(
        "my_queue", // 队列名称
        "",         // 消费者标签
        false,      // 手动确认
        false,      // 排他性
        false,      // 无本地
        false,      // 无等待
        nil,        // 参数
    )
    
    for d := range msgs {
        // 处理消息
        log.Printf("Received: %s", d.Body)
        d.Ack(false) // 手动确认
    }
    

注意事项

  • 如果消费者未确认消息,RabbitMQ 会将消息重新投递给其他消费者。
  • 使用 d.Nack(false, true) 可以拒绝消息并重新入队。

4. 高可用性(HA)

通过集群和镜像队列实现 RabbitMQ 的高可用性。

实现方式

  • 集群:将多个 RabbitMQ 节点组成集群,实现负载均衡和故障转移。

  • 镜像队列:将队列镜像到多个节点,确保即使某个节点宕机,消息也不会丢失。

    rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'
    

注意事项

  • 镜像队列会增加网络和存储开销。
  • 集群需要合理规划节点数量和网络拓扑。

5. 消息重试机制

在消费者处理失败时,可以通过重试机制确保消息最终被处理。

实现方式

  • 死信队列(DLX):将处理失败的消息路由到死信队列,稍后重新处理。

    args := amqp.Table{
        "x-dead-letter-exchange": "my_dlx_exchange",
    }
    q, err := ch.QueueDeclare(
        "my_queue", // 队列名称
        true,       // 持久化
        false,      // 自动删除
        false,      // 排他性
        false,      // 无等待
        args,       // 参数
    )
    
  • 重试队列:将失败的消息发送到重试队列,设置 TTL(Time-To-Live)后重新投递。


6. 监控与告警

通过监控 RabbitMQ 的状态,及时发现和处理问题。

实现方式

  • 使用 RabbitMQ 的管理插件(Management Plugin)监控队列、交换机和连接状态。
  • 设置告警规则,如队列积压、消费者断开等。

7. 总结

为了实现 RabbitMQ 的可靠性传递,需要综合使用以下机制:

  1. 消息持久化:确保消息在服务器重启后不丢失。
  2. 生产者确认:确保消息成功到达 RabbitMQ。
  3. 消费者确认:确保消息被成功处理。
  4. 高可用性:通过集群和镜像队列避免单点故障。
  5. 消息重试:通过死信队列和重试队列处理失败消息。
  6. 监控与告警:及时发现和处理问题。

生产者确认机制

生产者代码

package main

import (
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明队列
	q, err := ch.QueueDeclare(
		"my_queue", // 队列名称
		true,       // 持久化
		false,      // 自动删除
		false,      // 排他性
		false,      // 无等待
		nil,        // 参数
	)
	failOnError(err, "Failed to declare a queue")

	// 开启生产者确认模式
	err = ch.Confirm(false) // false 表示非批量确认
	failOnError(err, "Failed to enable publisher confirms")

	// 监听确认消息
	confirm := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
	go func() {
		for c := range confirm {
			if c.Ack {
				log.Println("Message confirmed")
			} else {
				log.Println("Message not confirmed")
			}
		}
	}()

	// 发送消息
	body := "Hello, RabbitMQ!"
	err = ch.Publish(
		"",     // 交换机名称(默认交换机)
		q.Name, // 队列名称
		false,  // 强制
		false,  // 立即
		amqp.Publishing{
			DeliveryMode: amqp.Persistent, // 持久化消息
			ContentType:  "text/plain",
			Body:         []byte(body),
		},
	)
	failOnError(err, "Failed to publish a message")

	log.Printf(" [x] Sent %s", body)

	// 等待确认
	time.Sleep(2 * time.Second)
}

消费者代码:

package main

import (
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}
func main() {
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	q, err := ch.QueueDeclare(
		"my_queue",
		true,
		false,
		false,
		false,
		nil,
	)
	failOnError(err, "Failed to declare a queue")
	msgs, err := ch.Consume(
		q.Name,
		"",
		false,
		false,
		false,
		false,
		nil,
	)
	failOnError(err, "Failed to register a consumer")
	var forever chan struct{}
	go func() {
		for msg := range msgs {
			log.Printf("Received a message: %s", msg.Body)
			msg.Ack(false)
		}
	}()
	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever

}


d.Nack(false, true) 是 RabbitMQ 中消费者拒绝消息并决定如何处理该消息的方法。它允许消费者显式地拒绝消息,并可以选择是否将消息重新放回队列。


d.Nack 方法的作用

  • 拒绝消息:消费者明确表示无法处理该消息。
  • 控制消息行为:决定是否将消息重新放回队列,或者直接丢弃。

d.Nack 方法的参数

d.Nack(multiple, requeue)
  1. multiple(布尔值):

    • false:只拒绝当前消息。
    • true:拒绝当前消息及其之前所有未确认的消息。
  2. requeue(布尔值):

    • true:将消息重新放回队列,RabbitMQ 会重新投递该消息(可能投递给其他消费者)。
    • false:直接丢弃消息(如果配置了死信队列,消息会被路由到死信队列)。

d.Nack(false, true) 的用法

  • 场景
    消费者无法处理当前消息,但希望 RabbitMQ 将消息重新放回队列,以便其他消费者可以处理。

  • 代码示例

    msgs, err := ch.Consume(
        "my_queue", // 队列名称
        "",         // 消费者标签
        false,      // 手动确认
        false,      // 排他性
        false,      // 无本地
        false,      // 无等待
        nil,        // 参数
    )
    failOnError(err, "Failed to register a consumer")
    
    for d := range msgs {
        // 模拟消息处理失败
        log.Printf("Received a message: %s", d.Body)
        if shouldRejectMessage(d.Body) {
            log.Println("Message rejected, requeueing...")
            d.Nack(false, true) // 拒绝消息并重新放回队列
            continue
        }
    
        // 处理消息
        log.Println("Message processed")
        d.Ack(false) // 手动确认消息
    }
    

d.Nack(false, true) 的行为

  1. 消费者接收到消息。
  2. 如果消息处理失败,调用 d.Nack(false, true)
    • false:只拒绝当前消息。
    • true:将消息重新放回队列。
  3. RabbitMQ 会将消息重新投递给其他消费者。

d.Nack(false, false) 的用法

  • 场景
    消费者无法处理当前消息,且不希望消息重新放回队列。如果配置了死信队列,消息会被路由到死信队列。

  • 代码示例

    msgs, err := ch.Consume(
        "my_queue", // 队列名称
        "",         // 消费者标签
        false,      // 手动确认
        false,      // 排他性
        false,      // 无本地
        false,      // 无等待
        nil,        // 参数
    )
    failOnError(err, "Failed to register a consumer")
    
    for d := range msgs {
        // 模拟消息处理失败
        log.Printf("Received a message: %s", d.Body)
        if shouldRejectMessage(d.Body) {
            log.Println("Message rejected, discarding...")
            d.Nack(false, false) // 拒绝消息并丢弃
            continue
        }
    
        // 处理消息
        log.Println("Message processed")
        d.Ack(false) // 手动确认消息
    }
    

d.Nack(false, false) 的行为

  1. 消费者接收到消息。
  2. 如果消息处理失败,调用 d.Nack(false, false)
    • false:只拒绝当前消息。
    • false:直接丢弃消息。
  3. 如果配置了死信队列,消息会被路由到死信队列。

总结

  • d.Nack(false, true):拒绝消息并重新放回队列,适合临时性错误。
  • d.Nack(false, false):拒绝消息并丢弃,适合无法处理的错误(结合死信队列使用)。

根据业务需求选择合适的拒绝方式,可以有效管理消息的处理流程。如果有其他问题,欢迎继续提问!