在 RabbitMQ 中,PublishWithContext
方法用于将消息发送到指定的交换机或队列。以下是 PublishWithContext
方法的参数详细说明:
方法签名
func (ch *Channel) PublishWithContext(
ctx context.Context, // 上下文,用于控制超时或取消
exchange string, // 交换机名称
key string, // 路由键
mandatory bool, // 是否强制
immediate bool, // 是否立即
msg Publishing, // 消息内容
) error
参数详解
-
ctx context.Context
- 上下文,用于控制方法的超时或取消。
- 如果上下文超时或被取消,方法会立即返回错误。
- 例如,可以使用
context.WithTimeout
设置超时时间。
-
exchange string
- 交换机的名称。
- 如果为空字符串
""
,表示使用默认交换机。 - 默认交换机会将消息路由到与
key
匹配的队列。
-
key string
- 路由键,用于决定消息的路由规则。
- 如果使用默认交换机,
key
必须是队列的名称。 - 如果使用其他交换机,
key
需要与交换机的绑定规则匹配。
-
mandatory bool
- 是否强制。
- 如果设置为
true
,当消息无法路由到任何队列时,RabbitMQ 会返回一个basic.return
消息。 - 如果设置为
false
,无法路由的消息会被直接丢弃。
-
immediate bool
- 是否立即。
- 如果设置为
true
,当消息无法立即被消费者接收时,RabbitMQ 会返回一个basic.return
消息。 - 如果设置为
false
,消息会被存储在队列中,等待消费者接收。 - 注意:RabbitMQ 3.0 及以上版本已弃用此参数,设置为
false
即可。
-
msg Publishing
- 消息内容,是一个
amqp.Publishing
结构体,包含以下字段:ContentType string
:消息的内容类型(如"text/plain"
)。Body []byte
:消息的实际内容(字节数组)。- 其他可选字段:
DeliveryMode uint8
:消息的持久化模式(2
表示持久化,1
表示非持久化)。Headers Table
:消息的头部信息。Priority uint8
:消息的优先级。CorrelationId string
:关联 ID。ReplyTo string
:回复队列名称。Expiration string
:消息的过期时间。MessageId string
:消息的唯一 ID。Timestamp time.Time
:消息的时间戳。Type string
:消息的类型。UserId string
:用户 ID。AppId string
:应用程序 ID。
- 消息内容,是一个
示例代码
以下是一个完整的示例,展示了如何使用 PublishWithContext
发送消息:
package main
import (
"context"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建 Channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"hello", // 队列名称
false, // 是否持久化
false, // 是否自动删除
false, // 是否独占
false, // 是否非阻塞
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")
// 创建带有超时的 Context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 发送消息
body := "Hello, RabbitMQ!"
err = ch.PublishWithContext(ctx,
"", // 使用默认交换机
q.Name, // 路由键(队列名称)
false, // 不强制
false, // 不立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
failOnError(err, "Failed to publish a message")
log.Printf("[x] Sent %s\n", body)
}
参数组合示例
-
发送到默认交换机
err = ch.PublishWithContext(ctx, "", // 默认交换机 "hello", // 路由键(队列名称) false, // 不强制 false, // 不立即 amqp.Publishing{ ContentType: "text/plain", Body: []byte("Hello, RabbitMQ!"), }, )
-
发送到指定交换机
err = ch.PublishWithContext(ctx, "my_exchange", // 交换机名称 "routing_key", // 路由键 false, // 不强制 false, // 不立即 amqp.Publishing{ ContentType: "text/plain", Body: []byte("Hello, RabbitMQ!"), }, )
-
发送持久化消息
err = ch.PublishWithContext(ctx, "", // 默认交换机 "hello", // 路由键(队列名称) false, // 不强制 false, // 不立即 amqp.Publishing{ ContentType: "text/plain", DeliveryMode: amqp.Persistent, // 持久化消息 Body: []byte("Hello, RabbitMQ!"), }, )
总结
exchange
:指定交换机名称,默认交换机为空字符串""
。key
:路由键,用于决定消息的路由规则。mandatory
:是否强制,设置为true
时,无法路由的消息会返回错误。immediate
:是否立即,RabbitMQ 3.0 及以上版本已弃用,建议设置为false
。msg
:消息内容,包含Body
、ContentType
等字段。