package rabbitmq
import (
"context"
"fmt"
"log"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type ExchangeTypes = string
var (
ExchangeTypeDirect ExchangeTypes = "direct"
ExchangeTypeFanout ExchangeTypes = "fanout"
ExchangeTypeTopic ExchangeTypes = "topic"
ExchangeTypeHeaders ExchangeTypes = "headers"
)
// ConnectionConfiguration 包含连接 RabbitMQ 服务器所需的配置信息
type ConnectionConfiguration struct {
IP string // RabbitMQ 服务器的 IP 地址
Port string // RabbitMQ 服务器的端口号
Username string // 用于连接 RabbitMQ 服务器的用户名
Password string // 用于连接 RabbitMQ 服务器的密码
}
// ExchangeConfiguration 包含声明交换机所需的配置信息
type ExchangeConfiguration struct {
Name string // 交换机名称
ExchangeType ExchangeTypes // 交换机类型(direct、fanout、topic、headers 等)
Durable bool // 是否持久化
AutoDelete bool // 是否自动删除
Internal bool // 是否为内部交换机
NoWait bool // 是否不等待服务器响应
Args amqp.Table // 其他参数
}
// QueueConfiguration 包含声明队列所需的配置信息
type QueueConfiguration struct {
Name string // 队列名称
Durable bool // 是否持久化
AutoDelete bool // 是否自动删除
Exclusive bool // 是否独占
NoWait bool // 是否不等待服务器响应
Args amqp.Table // 其他参数
}
// Config 整合了连接、交换机、队列等相关配置信息
type Config struct {
ConnectionConfiguration ConnectionConfiguration // 连接配置
ExchangeConfiguration ExchangeConfiguration // 交换机配置
QueueConfiguration QueueConfiguration // 队列配置
RoutingKey string // 路由键
ReconnectDelay time.Duration // 重连间隔时间
PrefetchCount int // 消费者预取数
AutoAck bool // 是否自动确认
}
// RabbitMQ 结构体包含了与 RabbitMQ 交互所需的配置、连接、通道及内部状态
type RabbitMQ struct {
config *Config
conn *amqp.Connection
channel *amqp.Channel
done chan struct{}
mu sync.Mutex // 用于保护 conn、channel 等共享资源
closeOnce sync.Once // 确保 Close() 只执行一次
reconnectWG sync.WaitGroup
}
// NewRabbitMQ 创建一个新的 RabbitMQ 实例
func NewRabbitMQ(cfg *Config) *RabbitMQ {
if cfg.ReconnectDelay == 0 {
cfg.ReconnectDelay = 5 * time.Second
}
// 如果预取数小于 0,则设为 0
if cfg.PrefetchCount < 0 {
cfg.PrefetchCount = 0
}
return &RabbitMQ{
config: cfg,
done: make(chan struct{}),
}
}
// buildURL 构建连接 RabbitMQ 服务器的 URL
func (r *RabbitMQ) buildURL() string {
return fmt.Sprintf("amqp://%s:%s@%s:%s/",
r.config.ConnectionConfiguration.Username,
r.config.ConnectionConfiguration.Password,
r.config.ConnectionConfiguration.IP,
r.config.ConnectionConfiguration.Port)
}
// closeResources 关闭与 RabbitMQ 相关的资源
func (r *RabbitMQ) closeResources() {
if r.channel != nil {
if err := r.channel.Close(); err != nil {
log.Printf("关闭通道失败: %v", err)
}
}
if r.conn != nil {
if err := r.conn.Close(); err != nil {
log.Printf("关闭连接失败: %v", err)
}
}
}
// setupExchangeAndQueue 声明交换机、队列以及队列绑定,并设置 QoS
func (r *RabbitMQ) setupExchangeAndQueue(ch *amqp.Channel) error {
if r.config.ExchangeConfiguration.Name != "" {
err := ch.ExchangeDeclare(
r.config.ExchangeConfiguration.Name,
r.config.ExchangeConfiguration.ExchangeType,
r.config.ExchangeConfiguration.Durable,
r.config.ExchangeConfiguration.AutoDelete,
r.config.ExchangeConfiguration.Internal,
r.config.ExchangeConfiguration.NoWait,
r.config.ExchangeConfiguration.Args,
)
if err != nil {
return fmt.Errorf("声明交换机 %s 失败: %w", r.config.ExchangeConfiguration.Name, err)
}
}
if r.config.QueueConfiguration.Name != "" {
q, err := ch.QueueDeclare(
r.config.QueueConfiguration.Name,
r.config.QueueConfiguration.Durable,
r.config.QueueConfiguration.AutoDelete,
r.config.QueueConfiguration.Exclusive,
r.config.QueueConfiguration.NoWait,
r.config.QueueConfiguration.Args,
)
if err != nil {
return fmt.Errorf("声明队列 %s 失败: %w", r.config.QueueConfiguration.Name, err)
}
if r.config.ExchangeConfiguration.Name != "" {
err = ch.QueueBind(
q.Name,
r.config.RoutingKey,
r.config.ExchangeConfiguration.Name,
false,
nil,
)
if err != nil {
return fmt.Errorf("队列绑定失败,队列 %s, 路由键 %s: %w", q.Name, r.config.RoutingKey, err)
}
}
}
if !r.config.AutoAck && r.config.PrefetchCount > 0 {
err := ch.Qos(
r.config.PrefetchCount,
0,
false,
)
if err != nil {
return fmt.Errorf("设置 QoS 失败: %w", err)
}
}
return nil
}
// Connect 连接到 RabbitMQ 服务器,并初始化交换机、队列等设置。
// 成功后会启动后台 goroutine 监控连接断开,自动重连。
func (r *RabbitMQ) Connect() error {
r.mu.Lock()
defer r.mu.Unlock()
url := r.buildURL()
conn, err := amqp.Dial(url)
if err != nil {
return fmt.Errorf("连接 RabbitMQ 失败: %w", err)
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return fmt.Errorf("打开通道失败: %w", err)
}
if err = r.setupExchangeAndQueue(ch); err != nil {
ch.Close()
conn.Close()
return err
}
r.conn = conn
r.channel = ch
log.Println("RabbitMQ 连接建立成功")
// 启动连接监控,自动重连
go r.handleReconnect()
return nil
}
// handleReconnect 监控连接断开,并尝试自动重连
func (r *RabbitMQ) handleReconnect() {
r.reconnectWG.Add(1)
defer r.reconnectWG.Done()
// 监听连接关闭事件
closeChan := r.conn.NotifyClose(make(chan *amqp.Error))
err, ok := <-closeChan
if !ok {
// 如果 closeChan 被正常关闭,则退出
log.Println("RabbitMQ 连接正常关闭")
return
}
log.Printf("RabbitMQ 连接断开: %v", err)
// 尝试重连
for {
select {
case <-r.done:
log.Println("接收到关闭信号,不再重连")
return
default:
log.Println("尝试重新连接 RabbitMQ...")
r.mu.Lock()
conn, errDial := amqp.Dial(r.buildURL())
if errDial != nil {
r.mu.Unlock()
log.Printf("重连失败 (dial): %v", errDial)
time.Sleep(r.config.ReconnectDelay)
continue
}
ch, errCh := conn.Channel()
if errCh != nil {
conn.Close()
r.mu.Unlock()
log.Printf("重连失败 (打开通道): %v", errCh)
time.Sleep(r.config.ReconnectDelay)
continue
}
if errSetup := r.setupExchangeAndQueue(ch); errSetup != nil {
ch.Close()
conn.Close()
r.mu.Unlock()
log.Printf("重连失败 (设置交换机/队列): %v", errSetup)
time.Sleep(r.config.ReconnectDelay)
continue
}
r.conn = conn
r.channel = ch
r.mu.Unlock()
log.Println("RabbitMQ 重连成功")
// 重连成功后重新启动连接监控
go r.handleReconnect()
return
}
}
}
// Publish 向指定的交换机发送消息,支持并发调用。
func (r *RabbitMQ) Publish(message string) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.channel == nil {
return fmt.Errorf("通道未打开")
}
err := r.channel.Publish(
r.config.ExchangeConfiguration.Name,
r.config.RoutingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
},
)
if err != nil {
return fmt.Errorf("发布消息失败: %w", err)
}
return nil
}
// Consume 从指定队列消费消息,并返回消息通道,支持通过 context 取消消费。
func (r *RabbitMQ) Consume(ctx context.Context) (<-chan amqp.Delivery, error) {
r.mu.Lock()
defer r.mu.Unlock()
if r.channel == nil {
return nil, fmt.Errorf("通道未打开")
}
msgs, err := r.channel.Consume(
r.config.QueueConfiguration.Name,
"",
r.config.AutoAck,
false,
false,
false,
nil,
)
if err != nil {
return nil, fmt.Errorf("启动消费失败: %w", err)
}
out := make(chan amqp.Delivery)
// 开启一个 goroutine,通过 context 管理消费生命周期
go func() {
defer close(out)
for {
select {
case d, ok := <-msgs:
if !ok {
return
}
out <- d
case <-ctx.Done():
return
case <-r.done:
return
}
}
}()
return out, nil
}
// Ack 手动确认消息已处理
func (r *RabbitMQ) Ack(delivery amqp.Delivery) error {
return delivery.Ack(false)
}
// Nack 手动拒绝消息,可以选择是否重新入队
func (r *RabbitMQ) Nack(delivery amqp.Delivery) error {
return delivery.Nack(false, true)
}
// Close 关闭与 RabbitMQ 的连接、通道,并停止自动重连
func (r *RabbitMQ) Close() {
r.closeOnce.Do(func() {
close(r.done)
r.mu.Lock()
defer r.mu.Unlock()
r.closeResources()
// 等待重连监控 goroutine 退出
r.reconnectWG.Wait()
log.Println("RabbitMQ 连接已关闭")
})
}
example
producer
package main
import (
"log"
"rabbitmq_learn/rabbitmq"
"time"
)
func main() {
// 配置RabbitMQ连接信息
config := &rabbitmq.Config{
ConnectionConfiguration: rabbitmq.ConnectionConfiguration{
IP: "192.168.3.242",
Port: "5672",
Username: "guest",
Password: "123456",
},
ExchangeConfiguration: rabbitmq.ExchangeConfiguration{
Name: "logs_direct",
ExchangeType: rabbitmq.ExchangeTypeDirect,
Durable: true,
AutoDelete: false,
Internal: false,
NoWait: false,
Args: nil,
},
QueueConfiguration: rabbitmq.QueueConfiguration{
Name: "my_queue",
Durable: true,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
},
RoutingKey: "my_routing_key",
ReconnectDelay: 5 * time.Second,
PrefetchCount: 10,
AutoAck: false,
}
// 创建RabbitMQ实例
rabbit := rabbitmq.NewRabbitMQ(config)
// 连接到RabbitMQ
err := rabbit.Connect()
if err != nil {
log.Fatalf("连接RabbitMQ失败: %v", err)
}
defer rabbit.Close()
// 发送消息
message := "Hello, RabbitMQ!"
err = rabbit.Publish(message)
if err != nil {
log.Fatalf("发布消息失败: %v", err)
}
log.Println("消息已成功发送")
}
consumer.go
package main
import (
"context"
"log"
"rabbitmq_learn/rabbitmq"
"time"
)
func main() {
// 配置RabbitMQ连接信息
config := &rabbitmq.Config{
ConnectionConfiguration: rabbitmq.ConnectionConfiguration{
IP: "192.168.3.242",
Port: "5672",
Username: "guest",
Password: "123456",
},
ExchangeConfiguration: rabbitmq.ExchangeConfiguration{
Name: "logs_direct",
ExchangeType: rabbitmq.ExchangeTypeDirect,
Durable: true,
AutoDelete: false,
Internal: false,
NoWait: false,
Args: nil,
},
QueueConfiguration: rabbitmq.QueueConfiguration{
Name: "my_queue",
Durable: true,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
},
RoutingKey: "my_routing_key",
ReconnectDelay: 5 * time.Second,
PrefetchCount: 10,
AutoAck: false,
}
// 创建RabbitMQ实例
rabbit := rabbitmq.NewRabbitMQ(config)
// 连接到RabbitMQ
err := rabbit.Connect()
if err != nil {
log.Fatalf("连接RabbitMQ失败: %v", err)
}
defer rabbit.Close()
// 消费消息
msgs, err := rabbit.Consume(context.Background())
if err != nil {
log.Fatalf("消费消息失败: %v", err)
}
go func() {
for d := range msgs {
log.Printf("接收到消息: %s", d.Body)
err := rabbit.Ack(d)
if err != nil {
log.Printf("确认消息失败: %v", err)
}
}
}()
select {}
}