在 RabbitMQ 中,PublishWithContext 方法用于将消息发送到指定的交换机或队列。以下是 PublishWithContext 方法的参数详细说明:


方法签名

func (ch *Channel) PublishWithContext(
    ctx context.Context,         // 上下文,用于控制超时或取消
    exchange string,           // 交换机名称
    key string,                // 路由键
    mandatory bool,            // 是否强制
    immediate bool,            // 是否立即
    msg Publishing,            // 消息内容
) error

参数详解

  1. ctx context.Context

    • 上下文,用于控制方法的超时或取消。
    • 如果上下文超时或被取消,方法会立即返回错误。
    • 例如,可以使用 context.WithTimeout 设置超时时间。
  2. exchange string

    • 交换机的名称。
    • 如果为空字符串 "",表示使用默认交换机。
    • 默认交换机会将消息路由到与 key 匹配的队列。
  3. key string

    • 路由键,用于决定消息的路由规则。
    • 如果使用默认交换机,key 必须是队列的名称。
    • 如果使用其他交换机,key 需要与交换机的绑定规则匹配。
  4. mandatory bool

    • 是否强制。
    • 如果设置为 true,当消息无法路由到任何队列时,RabbitMQ 会返回一个 basic.return 消息。
    • 如果设置为 false,无法路由的消息会被直接丢弃。
  5. immediate bool

    • 是否立即。
    • 如果设置为 true,当消息无法立即被消费者接收时,RabbitMQ 会返回一个 basic.return 消息。
    • 如果设置为 false,消息会被存储在队列中,等待消费者接收。
    • 注意:RabbitMQ 3.0 及以上版本已弃用此参数,设置为 false 即可。
  6. msg Publishing

    • 消息内容,是一个 amqp.Publishing 结构体,包含以下字段:
      • ContentType string:消息的内容类型(如 "text/plain")。
      • Body []byte:消息的实际内容(字节数组)。
      • 其他可选字段:
        • DeliveryMode uint8:消息的持久化模式(2 表示持久化,1 表示非持久化)。
        • Headers Table:消息的头部信息。
        • Priority uint8:消息的优先级。
        • CorrelationId string:关联 ID。
        • ReplyTo string:回复队列名称。
        • Expiration string:消息的过期时间。
        • MessageId string:消息的唯一 ID。
        • Timestamp time.Time:消息的时间戳。
        • Type string:消息的类型。
        • UserId string:用户 ID。
        • AppId string:应用程序 ID。

示例代码

以下是一个完整的示例,展示了如何使用 PublishWithContext 发送消息:

package main

import (
	"context"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 连接 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 创建 Channel
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明队列
	q, err := ch.QueueDeclare(
		"hello", // 队列名称
		false,   // 是否持久化
		false,   // 是否自动删除
		false,   // 是否独占
		false,   // 是否非阻塞
		nil,     // 额外参数
	)
	failOnError(err, "Failed to declare a queue")

	// 创建带有超时的 Context
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// 发送消息
	body := "Hello, RabbitMQ!"
	err = ch.PublishWithContext(ctx,
		"",     // 使用默认交换机
		q.Name, // 路由键(队列名称)
		false,  // 不强制
		false,  // 不立即
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		},
	)
	failOnError(err, "Failed to publish a message")
	log.Printf("[x] Sent %s\n", body)
}

参数组合示例

  1. 发送到默认交换机

    err = ch.PublishWithContext(ctx,
        "",     // 默认交换机
        "hello", // 路由键(队列名称)
        false,  // 不强制
        false,  // 不立即
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte("Hello, RabbitMQ!"),
        },
    )
    
  2. 发送到指定交换机

    err = ch.PublishWithContext(ctx,
        "my_exchange", // 交换机名称
        "routing_key", // 路由键
        false,         // 不强制
        false,         // 不立即
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte("Hello, RabbitMQ!"),
        },
    )
    
  3. 发送持久化消息

    err = ch.PublishWithContext(ctx,
        "",     // 默认交换机
        "hello", // 路由键(队列名称)
        false,  // 不强制
        false,  // 不立即
        amqp.Publishing{
            ContentType: "text/plain",
            DeliveryMode: amqp.Persistent, // 持久化消息
            Body:        []byte("Hello, RabbitMQ!"),
        },
    )
    

总结

  • exchange:指定交换机名称,默认交换机为空字符串 ""
  • key:路由键,用于决定消息的路由规则。
  • mandatory:是否强制,设置为 true 时,无法路由的消息会返回错误。
  • immediate:是否立即,RabbitMQ 3.0 及以上版本已弃用,建议设置为 false
  • msg:消息内容,包含 BodyContentType 等字段。