建表:

CREATE TABLE users (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(255) NOT NULL UNIQUE,
    password_hash VARCHAR(255) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

生产者

package main

import (
	"encoding/json"
	"fmt"
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

type User struct {
	Username string `json:"username"`
	Password string `json:"password"`
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 连接 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明队列
	q, err := ch.QueueDeclare(
		"register_queue", // 队列名称
		true,             // 持久化
		false,            // 自动删除
		false,            // 独占
		false,            // 不等待
		nil,              // 参数
	)
	failOnError(err, "Failed to declare a queue")

	// 模拟 100 万用户注册请求
	for i := 0; i < 1000; i++ {
		user := User{
			Username: fmt.Sprintf("user%d", i),
			Password: fmt.Sprintf("password%d", i),
		}

		body, err := json.Marshal(user)
		failOnError(err, "Failed to marshal user data")

		err = ch.Publish(
			"",     // 交换机
			q.Name, // 路由键
			false,  // 强制
			false,  // 立即
			amqp.Publishing{
				ContentType: "application/json",
				Body:        body,
			},
		)
		failOnError(err, "Failed to publish a message")

		log.Printf(" [x] Sent %s", body)
	}
}

消费者

package main

import (
	"database/sql"
	"encoding/json"
	"log"
	"time"

	_ "github.com/go-sql-driver/mysql"
	amqp "github.com/rabbitmq/amqp091-go"
)

type User struct {
	Username string `json:"username"`
	Password string `json:"password"`
}

// BufferedMessage 用于存储用户数据和对应的 RabbitMQ 消息
type BufferedMessage struct {
	User User
	Msg  amqp.Delivery
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 连接 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 设置 QoS 限流(每次只拉取 10 条消息)
	err = ch.Qos(
		10,    // prefetch count
		0,     // prefetch size
		false, // global
	)
	failOnError(err, "Failed to set QoS")

	// 声明队列
	q, err := ch.QueueDeclare(
		"register_queue", // 队列名称
		true,             // 持久化
		false,            // 自动删除
		false,            // 独占
		false,            // 不等待
		nil,              // 参数
	)
	failOnError(err, "Failed to declare a queue")

	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标签
		false,  // 手动确认
		false,  // 独占
		false,  // 不等待
		false,  // 不等待
		nil,    // 参数
	)
	failOnError(err, "Failed to register a consumer")

	// 连接 MySQL
	db, err := sql.Open("mysql", "root:123456@tcp(192.168.3.242:3306)/testdb")
	failOnError(err, "Failed to connect to MySQL")
	defer db.Close()

	// 缓冲区(用于批量插入)
	var buffer []BufferedMessage
	batchSize := 100                          // 每批插入的数量
	ticker := time.NewTicker(1 * time.Second) // 定时器,每 2 秒触发一次批量插入
	defer ticker.Stop()

	for {
		select {
		case msg := <-msgs:
			var user User
			err := json.Unmarshal(msg.Body, &user)
			failOnError(err, "Failed to unmarshal message")

			// 将用户和消息添加到缓冲区
			buffer = append(buffer, BufferedMessage{User: user, Msg: msg})

			// 如果缓冲区达到批次大小,立即插入
			if len(buffer) >= batchSize {
				processBatch(db, buffer)
				buffer = nil // 清空缓冲区
			}

		case <-ticker.C:
			// 定时器触发,处理剩余的缓冲区数据
			if len(buffer) > 0 {
				processBatch(db, buffer)
				buffer = nil // 清空缓冲区
			}
		}
	}
}

// 批量插入函数
func processBatch(db *sql.DB, buffer []BufferedMessage) {
	tx, err := db.Begin()
	if err != nil {
		log.Printf("Failed to begin transaction: %v", err)
		return
	}

	stmt, err := tx.Prepare("INSERT INTO users (username, password_hash) VALUES (?, ?) ON DUPLICATE KEY UPDATE username=username")
	if err != nil {
		log.Printf("Failed to prepare statement: %v", err)
		tx.Rollback()
		return
	}
	defer stmt.Close()

	// 记录插入失败的消息
	var failedMessages []amqp.Delivery

	for _, bufferedMsg := range buffer {
		_, err := stmt.Exec(bufferedMsg.User.Username, bufferedMsg.User.Password)
		if err != nil {
			log.Printf("Failed to insert user: %v", err)
			// 如果插入失败,记录该消息
			failedMessages = append(failedMessages, bufferedMsg.Msg)
		} else {
			// 插入成功,确认消息
			bufferedMsg.Msg.Ack(false)
		}
	}

	err = tx.Commit()
	if err != nil {
		log.Printf("Failed to commit transaction: %v", err)
		tx.Rollback()
		return
	}

	log.Printf(" [x] Processed batch of %d users", len(buffer))

	// 处理失败的消息
	for _, msg := range failedMessages {
		msg.Nack(false, true) // 重新入队
	}
}

我们也可以用go routine优化

package main

import (
	"database/sql"
	"encoding/json"
	"log"
	"sync"
	"time"

	_ "github.com/go-sql-driver/mysql"
	amqp "github.com/rabbitmq/amqp091-go"
)

type User struct {
	Username string `json:"username"`
	Password string `json:"password"`
}

type BufferedMessage struct {
	User User
	Msg  amqp.Delivery
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 连接 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// QoS 限流
	err = ch.Qos(
		10,    // prefetch count
		0,     // prefetch size
		false, // global
	)
	failOnError(err, "Failed to set QoS")

	// 连接 MySQL
	db, err := sql.Open("mysql", "root:123456@tcp(192.168.3.242:3306)/testdb")
	failOnError(err, "Failed to connect to MySQL")
	defer db.Close()

	// 启动多个消费者
	numWorkers := 5
	var wg sync.WaitGroup

	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(ch, db, &wg, i)
	}

	wg.Wait()
}

// 消费者 worker
func worker(ch *amqp.Channel, db *sql.DB, wg *sync.WaitGroup, workerID int) {
	defer wg.Done()

	// 声明队列
	q, err := ch.QueueDeclare(
		"register_queue", true, false, false, false, nil,
	)
	failOnError(err, "Failed to declare a queue")

	msgs, err := ch.Consume(
		q.Name, "", false, false, false, false, nil,
	)
	failOnError(err, "Failed to register a consumer")

	// 批量插入缓冲区
	var buffer []BufferedMessage
	batchSize := 100
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case msg := <-msgs:
			var user User
			err := json.Unmarshal(msg.Body, &user)
			failOnError(err, "Failed to unmarshal message")

			buffer = append(buffer, BufferedMessage{User: user, Msg: msg})

			// 如果缓冲区满了,立即插入
			if len(buffer) >= batchSize {
				processBatch(db, buffer)
				buffer = nil
			}

		case <-ticker.C:
			// 定时批量插入
			if len(buffer) > 0 {
				processBatch(db, buffer)
				buffer = nil
			}
		}
	}
}

// 批量插入数据库
func processBatch(db *sql.DB, buffer []BufferedMessage) {
	tx, err := db.Begin()
	if err != nil {
		log.Printf("Failed to begin transaction: %v", err)
		return
	}

	stmt, err := tx.Prepare("INSERT INTO users (username, password_hash) VALUES (?, ?) ON DUPLICATE KEY UPDATE username=username")
	if err != nil {
		log.Printf("Failed to prepare statement: %v", err)
		tx.Rollback()
		return
	}
	defer stmt.Close()

	var failedMessages []amqp.Delivery

	for _, bufferedMsg := range buffer {
		_, err := stmt.Exec(bufferedMsg.User.Username, bufferedMsg.User.Password)
		if err != nil {
			log.Printf("Failed to insert user: %v", err)
			failedMessages = append(failedMessages, bufferedMsg.Msg)
		} else {
			bufferedMsg.Msg.Ack(false)
		}
	}

	err = tx.Commit()
	if err != nil {
		log.Printf("Failed to commit transaction: %v", err)
		tx.Rollback()
		return
	}

	log.Printf(" [Worker] Processed batch of %d users", len(buffer))

	// 处理失败的消息
	for _, msg := range failedMessages {
		msg.Nack(false, true)
	}
}