

Send
package main
import (
"context"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建 Channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"hello", // 队列名称
false, // 是否持久化
false, // 是否自动删除
false, // 是否独占
false, // 是否非阻塞
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")
// 创建带有超时的 Context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 发送消息
body := "Hello, RabbitMQ!"
err = ch.PublishWithContext(ctx,
"", // 使用默认交换机
q.Name, // 路由键(队列名称)
false, // 不强制
false, // 不立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
failOnError(err, "Failed to publish a message")
log.Printf("[x] Sent %s\n", body)
}
Receiver
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建 Channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"hello", // 队列名称
false, // 是否持久化
false, // 是否自动删除
false, // 是否独占
false, // 是否非阻塞
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")
// 消费消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者名称(空字符串表示自动生成)
true, // 是否自动确认消息
false, // 是否独占
false, // 是否阻塞
false, // 是否非阻塞
nil, // 额外参数
)
failOnError(err, "Failed to register a consumer")
// 循环消费消息
go func() {
for msg := range msgs {
log.Printf("Received a message: %s", msg.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
select {}
}