producer

package main

import (
	"log"
	"os"
	"strings"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	package main

import (
	"log"
	"os"
	"strconv"
	"strings"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明队列
	q, err := ch.QueueDeclare(
		"task_queue", // 队列名称
		true,         // 持久化
		false,        // 自动删除
		false,        // 排他性
		false,        // 无等待
		nil,          // 参数
	)
	failOnError(err, "Failed to declare a queue")

	// 发送消息
	body := bodyFrom(os.Args)
	for i := 1; i <= 10; i++ {
		err = ch.Publish(
			"",     // 交换机
			q.Name, // 队列名称
			false,  // 强制
			false,  // 立即
			amqp.Publishing{
				DeliveryMode: amqp.Persistent, // 消息持久化
				ContentType:  "text/plain",
				Body:         []byte(body + strconv.Itoa(i)),
			})
		failOnError(err, "Failed to publish a message")
		log.Printf(" [x] Sent %s", body+strconv.Itoa(i))
	}
}

// 从命令行参数中获取消息内容
func bodyFrom(args []string) string {
	var s string
	if len(args) < 2 || args[1] == "" {
		s = "hello"
	} else {
		s = strings.Join(args[1:], " ")
	}
	return s
}


worker

package main

import (
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明队列
	q, err := ch.QueueDeclare(
		"task_queue", // 队列名称
		true,         // 持久化
		false,        // 自动删除
		false,        // 排他性
		false,        // 无等待
		nil,          // 参数
	)
	failOnError(err, "Failed to declare a queue")

	// 设置公平分发
	err = ch.Qos(
		1,     // 预取计数
		0,     // 预取大小
		false, // 全局
	)
	failOnError(err, "Failed to set QoS")

	// 接收消息
	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标签
		true,   // 自动确认
		false,  // 排他性
		false,  // 无本地
		false,  // 无等待
		nil,    // 参数
	)
	failOnError(err, "Failed to register a consumer")

	// 处理消息
	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
			// 模拟任务处理
			time.Sleep(time.Second)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

消息确认

package main

import (
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明队列
	q, err := ch.QueueDeclare(
		"task_queue", // 队列名称
		true,         // 持久化
		false,        // 自动删除
		false,        // 排他性
		false,        // 无等待
		nil,          // 参数
	)
	failOnError(err, "Failed to declare a queue")

	// 接收消息
	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标签
		false,  // 自动确认
		false,  // 排他性
		false,  // 无本地
		false,  // 无等待
		nil,    // 参数
	)
	failOnError(err, "Failed to register a consumer")

	// 处理消息
	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
			// 模拟任务处理
			time.Sleep(time.Second)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

我们修改上面的代码,让自动确认参数为false


可以看到消费过的消息没有被删除,这是因为worker没有确认消费,所以producer不会删除消息

我们用d.Ack手动确认

package main

import (
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明队列
	q, err := ch.QueueDeclare(
		"task_queue", // 队列名称
		true,         // 持久化
		false,        // 自动删除
		false,        // 排他性
		false,        // 无等待
		nil,          // 参数
	)
	failOnError(err, "Failed to declare a queue")

	// 接收消息
	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标签
		false,  // 自动确认
		false,  // 排他性
		false,  // 无本地
		false,  // 无等待
		nil,    // 参数
	)
	failOnError(err, "Failed to register a consumer")

	// 处理消息
	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
			// 模拟任务处理
			time.Sleep(time.Second)
			// 手动确认
			d.Ack(false)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

这样就没问题了

公平分派

没有设置qos前:

设置QOS后:

producer.go

package main

import (
	"log"
	"strconv"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	// 声明队列
	q, err := ch.QueueDeclare(
		"task_queue", // 队列名称
		true,         // 持久化
		false,        // 自动删除
		false,        // 排他性
		false,        // 无等待
		nil,          // 参数
	)
	failOnError(err, "Failed to declare a queue")
	// 发送 10 条消息,奇数消息模拟为重任务,偶数消息模拟为轻任务
	for i := 1; i <= 10; i++ {
		body := "Task " + strconv.Itoa(i)
		if i%2 == 1 {
			body += " (Heavy)"
		} else {
			body += " (Light)"
		}

		err = ch.Publish(
			"",     // 交换机
			q.Name, // 路由键
			false,  // 强制
			false,  // 立即
			amqp.Publishing{
				DeliveryMode: amqp.Persistent, // 持久化消息
				ContentType:  "text/plain",
				Body:         []byte(body),
			},
		)
		failOnError(err, "Failed to publish a message")
		log.Printf(" [x] Sent %s", body)
	}
}

worker.go

package main

import (
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	err = ch.Qos(1, 0, false)
	failOnError(err, "Failed to set QoS")
	// 声明队列
	q, err := ch.QueueDeclare(
		"task_queue", // 队列名称
		true,         // 持久化
		false,        // 自动删除
		false,        // 排他性
		false,        // 无等待
		nil,          // 参数
	)
	failOnError(err, "Failed to declare a queue")

	// 接收消息
	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标签
		false,  // 自动确认
		false,  // 排他性
		false,  // 无本地
		false,  // 无等待
		nil,    // 参数
	)
	failOnError(err, "Failed to register a consumer")

	// 处理消息
	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
			// 模拟任务处理
			time.Sleep(time.Second)
			// 手动确认
			d.Ack(false)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}


疑问

开启两个worker后,他们会轮流消费消息,为什么?

是的,这是 RabbitMQ 的默认行为!当你有多个消费者(Worker)订阅同一个队列时,RabbitMQ 会以 轮询(Round-robin) 的方式将消息分发给这些消费者。也就是说,消息会依次分发给每个消费者,从而实现负载均衡。


1. 为什么会出现轮流消费的现象?

RabbitMQ 的设计目标之一是公平分发消息。默认情况下,它会将消息依次分发给所有消费者,确保每个消费者都能平等地处理任务。例如:

  • 如果有两个消费者(Worker1 和 Worker2),消息会依次分发给 Worker1、Worker2、Worker1、Worker2……
  • 这种机制适用于大多数场景,尤其是在任务处理时间相近的情况下。

2. 如何验证这种现象?

你可以运行以下步骤来验证:

  1. 启动两个消费者(Worker):
    go run worker.go
    go run worker.go
    
  2. 启动生产者并发送多条消息:
    go run producer.go Message 1
    go run producer.go Message 2
    go run producer.go Message 3
    go run producer.go Message 4
    
  3. 观察消费者的输出:
    • Worker1 会处理 Message 1 和 Message 3。
    • Worker2 会处理 Message 2 和 Message 4。

3. 如何避免消息分发不均?

默认的轮询分发方式在某些场景下可能会导致问题,例如:

  • 如果某些消息处理时间较长,而其他消息处理时间较短,可能会导致某些消费者忙碌,而其他消费者空闲。

为了解决这个问题,可以使用 公平分发(Fair Dispatch) 机制,确保 RabbitMQ 不会一次性将大量消息分发给同一个消费者。具体实现如下:

设置公平分发

在消费者代码中,调用 Qos 方法设置预取计数(Prefetch Count):

err = ch.Qos(
    1,     // 预取计数(每次只分发一条消息)
    0,     // 预取大小(不限制消息大小)
    false, // 全局设置(false 表示仅对当前消费者生效)
)
failOnError(err, "Failed to set QoS")

作用

  • Qos(1, 0, false) 表示 RabbitMQ 每次只会向消费者分发一条消息,直到消费者处理完当前消息并确认后,才会分发下一条消息。
  • 这样可以确保消息分发更加公平,避免某些消费者过载。

4. 完整示例代码(公平分发)

以下是启用公平分发的消费者代码:

package main

import (
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	// 连接到 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明队列
	q, err := ch.QueueDeclare(
		"task_queue", // 队列名称
		true,        // 持久化
		false,       // 自动删除
		false,       // 排他性
		false,       // 无等待
		nil,         // 参数
	)
	failOnError(err, "Failed to declare a queue")

	// 设置公平分发
	err = ch.Qos(
		1,     // 预取计数
		0,     // 预取大小
		false, // 全局设置
	)
	failOnError(err, "Failed to set QoS")

	// 接收消息
	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标签
		false,  // 自动确认(设置为 false,启用手动确认)
		false,  // 排他性
		false,  // 无本地
		false,  // 无等待
		nil,    // 参数
	)
	failOnError(err, "Failed to register a consumer")

	// 处理消息
	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
			// 模拟任务处理
			time.Sleep(time.Second)
			log.Printf("Done")
			d.Ack(false) // 手动确认消息
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

5. 公平分发的作用

  • 每个消费者一次只处理一条消息,直到确认完成后才会接收下一条消息。
  • 确保消息分发更加公平,避免某些消费者过载。

6. 总结

  • RabbitMQ 默认以轮询的方式将消息分发给多个消费者。
  • 如果需要更公平的分发机制,可以通过 Qos 方法设置预取计数,确保每个消费者一次只处理一条消息。
  • 公平分发适用于任务处理时间不均匀的场景,避免某些消费者过载。