package rabbitmq

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

type ExchangeTypes = string

var (
	ExchangeTypeDirect  ExchangeTypes = "direct"
	ExchangeTypeFanout  ExchangeTypes = "fanout"
	ExchangeTypeTopic   ExchangeTypes = "topic"
	ExchangeTypeHeaders ExchangeTypes = "headers"
)

// ConnectionConfiguration 包含连接 RabbitMQ 服务器所需的配置信息
type ConnectionConfiguration struct {
	IP       string // RabbitMQ 服务器的 IP 地址
	Port     string // RabbitMQ 服务器的端口号
	Username string // 用于连接 RabbitMQ 服务器的用户名
	Password string // 用于连接 RabbitMQ 服务器的密码
}

// ExchangeConfiguration 包含声明交换机所需的配置信息
type ExchangeConfiguration struct {
	Name         string        // 交换机名称
	ExchangeType ExchangeTypes // 交换机类型(direct、fanout、topic、headers 等)
	Durable      bool          // 是否持久化
	AutoDelete   bool          // 是否自动删除
	Internal     bool          // 是否为内部交换机
	NoWait       bool          // 是否不等待服务器响应
	Args         amqp.Table    // 其他参数
}

// QueueConfiguration 包含声明队列所需的配置信息
type QueueConfiguration struct {
	Name       string     // 队列名称
	Durable    bool       // 是否持久化
	AutoDelete bool       // 是否自动删除
	Exclusive  bool       // 是否独占
	NoWait     bool       // 是否不等待服务器响应
	Args       amqp.Table // 其他参数
}

// Config 整合了连接、交换机、队列等相关配置信息
type Config struct {
	ConnectionConfiguration ConnectionConfiguration // 连接配置
	ExchangeConfiguration   ExchangeConfiguration   // 交换机配置
	QueueConfiguration      QueueConfiguration      // 队列配置
	RoutingKey              string                  // 路由键
	ReconnectDelay          time.Duration           // 重连间隔时间
	PrefetchCount           int                     // 消费者预取数
	AutoAck                 bool                    // 是否自动确认
}

// RabbitMQ 结构体包含了与 RabbitMQ 交互所需的配置、连接、通道及内部状态
type RabbitMQ struct {
	config      *Config
	conn        *amqp.Connection
	channel     *amqp.Channel
	done        chan struct{}
	mu          sync.Mutex // 用于保护 conn、channel 等共享资源
	closeOnce   sync.Once  // 确保 Close() 只执行一次
	reconnectWG sync.WaitGroup
}

// NewRabbitMQ 创建一个新的 RabbitMQ 实例
func NewRabbitMQ(cfg *Config) *RabbitMQ {
	if cfg.ReconnectDelay == 0 {
		cfg.ReconnectDelay = 5 * time.Second
	}
	// 如果预取数小于 0,则设为 0
	if cfg.PrefetchCount < 0 {
		cfg.PrefetchCount = 0
	}
	return &RabbitMQ{
		config: cfg,
		done:   make(chan struct{}),
	}
}

// buildURL 构建连接 RabbitMQ 服务器的 URL
func (r *RabbitMQ) buildURL() string {
	return fmt.Sprintf("amqp://%s:%s@%s:%s/",
		r.config.ConnectionConfiguration.Username,
		r.config.ConnectionConfiguration.Password,
		r.config.ConnectionConfiguration.IP,
		r.config.ConnectionConfiguration.Port)
}

// closeResources 关闭与 RabbitMQ 相关的资源
func (r *RabbitMQ) closeResources() {
	if r.channel != nil {
		if err := r.channel.Close(); err != nil {
			log.Printf("关闭通道失败: %v", err)
		}
	}
	if r.conn != nil {
		if err := r.conn.Close(); err != nil {
			log.Printf("关闭连接失败: %v", err)
		}
	}
}

// setupExchangeAndQueue 声明交换机、队列以及队列绑定,并设置 QoS
func (r *RabbitMQ) setupExchangeAndQueue(ch *amqp.Channel) error {
	if r.config.ExchangeConfiguration.Name != "" {
		err := ch.ExchangeDeclare(
			r.config.ExchangeConfiguration.Name,
			r.config.ExchangeConfiguration.ExchangeType,
			r.config.ExchangeConfiguration.Durable,
			r.config.ExchangeConfiguration.AutoDelete,
			r.config.ExchangeConfiguration.Internal,
			r.config.ExchangeConfiguration.NoWait,
			r.config.ExchangeConfiguration.Args,
		)
		if err != nil {
			return fmt.Errorf("声明交换机 %s 失败: %w", r.config.ExchangeConfiguration.Name, err)
		}
	}
	if r.config.QueueConfiguration.Name != "" {
		q, err := ch.QueueDeclare(
			r.config.QueueConfiguration.Name,
			r.config.QueueConfiguration.Durable,
			r.config.QueueConfiguration.AutoDelete,
			r.config.QueueConfiguration.Exclusive,
			r.config.QueueConfiguration.NoWait,
			r.config.QueueConfiguration.Args,
		)
		if err != nil {
			return fmt.Errorf("声明队列 %s 失败: %w", r.config.QueueConfiguration.Name, err)
		}
		if r.config.ExchangeConfiguration.Name != "" {
			err = ch.QueueBind(
				q.Name,
				r.config.RoutingKey,
				r.config.ExchangeConfiguration.Name,
				false,
				nil,
			)
			if err != nil {
				return fmt.Errorf("队列绑定失败,队列 %s, 路由键 %s: %w", q.Name, r.config.RoutingKey, err)
			}
		}
	}
	if !r.config.AutoAck && r.config.PrefetchCount > 0 {
		err := ch.Qos(
			r.config.PrefetchCount,
			0,
			false,
		)
		if err != nil {
			return fmt.Errorf("设置 QoS 失败: %w", err)
		}
	}
	return nil
}

// Connect 连接到 RabbitMQ 服务器,并初始化交换机、队列等设置。
// 成功后会启动后台 goroutine 监控连接断开,自动重连。
func (r *RabbitMQ) Connect() error {
	r.mu.Lock()
	defer r.mu.Unlock()

	url := r.buildURL()
	conn, err := amqp.Dial(url)
	if err != nil {
		return fmt.Errorf("连接 RabbitMQ 失败: %w", err)
	}

	ch, err := conn.Channel()
	if err != nil {
		conn.Close()
		return fmt.Errorf("打开通道失败: %w", err)
	}

	if err = r.setupExchangeAndQueue(ch); err != nil {
		ch.Close()
		conn.Close()
		return err
	}

	r.conn = conn
	r.channel = ch
	log.Println("RabbitMQ 连接建立成功")

	// 启动连接监控,自动重连
	go r.handleReconnect()
	return nil
}

// handleReconnect 监控连接断开,并尝试自动重连
func (r *RabbitMQ) handleReconnect() {
	r.reconnectWG.Add(1)
	defer r.reconnectWG.Done()

	// 监听连接关闭事件
	closeChan := r.conn.NotifyClose(make(chan *amqp.Error))
	err, ok := <-closeChan
	if !ok {
		// 如果 closeChan 被正常关闭,则退出
		log.Println("RabbitMQ 连接正常关闭")
		return
	}
	log.Printf("RabbitMQ 连接断开: %v", err)

	// 尝试重连
	for {
		select {
		case <-r.done:
			log.Println("接收到关闭信号,不再重连")
			return
		default:
			log.Println("尝试重新连接 RabbitMQ...")
			r.mu.Lock()
			conn, errDial := amqp.Dial(r.buildURL())
			if errDial != nil {
				r.mu.Unlock()
				log.Printf("重连失败 (dial): %v", errDial)
				time.Sleep(r.config.ReconnectDelay)
				continue
			}
			ch, errCh := conn.Channel()
			if errCh != nil {
				conn.Close()
				r.mu.Unlock()
				log.Printf("重连失败 (打开通道): %v", errCh)
				time.Sleep(r.config.ReconnectDelay)
				continue
			}
			if errSetup := r.setupExchangeAndQueue(ch); errSetup != nil {
				ch.Close()
				conn.Close()
				r.mu.Unlock()
				log.Printf("重连失败 (设置交换机/队列): %v", errSetup)
				time.Sleep(r.config.ReconnectDelay)
				continue
			}
			r.conn = conn
			r.channel = ch
			r.mu.Unlock()
			log.Println("RabbitMQ 重连成功")
			// 重连成功后重新启动连接监控
			go r.handleReconnect()
			return
		}
	}
}

// Publish 向指定的交换机发送消息,支持并发调用。
func (r *RabbitMQ) Publish(message string) error {
	r.mu.Lock()
	defer r.mu.Unlock()

	if r.channel == nil {
		return fmt.Errorf("通道未打开")
	}
	err := r.channel.Publish(
		r.config.ExchangeConfiguration.Name,
		r.config.RoutingKey,
		false,
		false,
		amqp.Publishing{
			ContentType:  "text/plain",
			Body:         []byte(message),
			DeliveryMode: amqp.Persistent,
			Timestamp:    time.Now(),
		},
	)
	if err != nil {
		return fmt.Errorf("发布消息失败: %w", err)
	}
	return nil
}

// Consume 从指定队列消费消息,并返回消息通道,支持通过 context 取消消费。
func (r *RabbitMQ) Consume(ctx context.Context) (<-chan amqp.Delivery, error) {
	r.mu.Lock()
	defer r.mu.Unlock()

	if r.channel == nil {
		return nil, fmt.Errorf("通道未打开")
	}
	msgs, err := r.channel.Consume(
		r.config.QueueConfiguration.Name,
		"",
		r.config.AutoAck,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return nil, fmt.Errorf("启动消费失败: %w", err)
	}

	out := make(chan amqp.Delivery)
	// 开启一个 goroutine,通过 context 管理消费生命周期
	go func() {
		defer close(out)
		for {
			select {
			case d, ok := <-msgs:
				if !ok {
					return
				}
				out <- d
			case <-ctx.Done():
				return
			case <-r.done:
				return
			}
		}
	}()
	return out, nil
}

// Ack 手动确认消息已处理
func (r *RabbitMQ) Ack(delivery amqp.Delivery) error {
	return delivery.Ack(false)
}

// Nack 手动拒绝消息,可以选择是否重新入队
func (r *RabbitMQ) Nack(delivery amqp.Delivery) error {
	return delivery.Nack(false, true)
}

// Close 关闭与 RabbitMQ 的连接、通道,并停止自动重连
func (r *RabbitMQ) Close() {
	r.closeOnce.Do(func() {
		close(r.done)
		r.mu.Lock()
		defer r.mu.Unlock()
		r.closeResources()
		// 等待重连监控 goroutine 退出
		r.reconnectWG.Wait()
		log.Println("RabbitMQ 连接已关闭")
	})
}

example


producer

package main

import (
	"log"
	"rabbitmq_learn/rabbitmq"
	"time"
)

func main() {
	// 配置RabbitMQ连接信息
	config := &rabbitmq.Config{
		ConnectionConfiguration: rabbitmq.ConnectionConfiguration{
			IP:       "192.168.3.242",
			Port:     "5672",
			Username: "guest",
			Password: "123456",
		},
		ExchangeConfiguration: rabbitmq.ExchangeConfiguration{
			Name:         "logs_direct",
			ExchangeType: rabbitmq.ExchangeTypeDirect,
			Durable:      true,
			AutoDelete:   false,
			Internal:     false,
			NoWait:       false,
			Args:         nil,
		},
		QueueConfiguration: rabbitmq.QueueConfiguration{
			Name:       "my_queue",
			Durable:    true,
			AutoDelete: false,
			Exclusive:  false,
			NoWait:     false,
			Args:       nil,
		},
		RoutingKey:     "my_routing_key",
		ReconnectDelay: 5 * time.Second,
		PrefetchCount:  10,
		AutoAck:        false,
	}

	// 创建RabbitMQ实例
	rabbit := rabbitmq.NewRabbitMQ(config)

	// 连接到RabbitMQ
	err := rabbit.Connect()
	if err != nil {
		log.Fatalf("连接RabbitMQ失败: %v", err)
	}
	defer rabbit.Close()

	// 发送消息
	message := "Hello, RabbitMQ!"
	err = rabbit.Publish(message)
	if err != nil {
		log.Fatalf("发布消息失败: %v", err)
	}

	log.Println("消息已成功发送")
}

consumer.go

package main

import (
	"context"
	"log"
	"rabbitmq_learn/rabbitmq"
	"time"
)

func main() {
	// 配置RabbitMQ连接信息
	config := &rabbitmq.Config{
		ConnectionConfiguration: rabbitmq.ConnectionConfiguration{
			IP:       "192.168.3.242",
			Port:     "5672",
			Username: "guest",
			Password: "123456",
		},
		ExchangeConfiguration: rabbitmq.ExchangeConfiguration{
			Name:         "logs_direct",
			ExchangeType: rabbitmq.ExchangeTypeDirect,
			Durable:      true,
			AutoDelete:   false,
			Internal:     false,
			NoWait:       false,
			Args:         nil,
		},
		QueueConfiguration: rabbitmq.QueueConfiguration{
			Name:       "my_queue",
			Durable:    true,
			AutoDelete: false,
			Exclusive:  false,
			NoWait:     false,
			Args:       nil,
		},
		RoutingKey:     "my_routing_key",
		ReconnectDelay: 5 * time.Second,
		PrefetchCount:  10,
		AutoAck:        false,
	}

	// 创建RabbitMQ实例
	rabbit := rabbitmq.NewRabbitMQ(config)

	// 连接到RabbitMQ
	err := rabbit.Connect()
	if err != nil {
		log.Fatalf("连接RabbitMQ失败: %v", err)
	}
	defer rabbit.Close()

	// 消费消息
	msgs, err := rabbit.Consume(context.Background())
	if err != nil {
		log.Fatalf("消费消息失败: %v", err)
	}

	go func() {
		for d := range msgs {
			log.Printf("接收到消息: %s", d.Body)
			err := rabbit.Ack(d)
			if err != nil {
				log.Printf("确认消息失败: %v", err)
			}
		}
	}()

	select {}
}