建表:
CREATE TABLE users (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(255) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
生产者
package main
import (
"encoding/json"
"fmt"
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
type User struct {
Username string `json:"username"`
Password string `json:"password"`
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"register_queue", // 队列名称
true, // 持久化
false, // 自动删除
false, // 独占
false, // 不等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
// 模拟 100 万用户注册请求
for i := 0; i < 1000; i++ {
user := User{
Username: fmt.Sprintf("user%d", i),
Password: fmt.Sprintf("password%d", i),
}
body, err := json.Marshal(user)
failOnError(err, "Failed to marshal user data")
err = ch.Publish(
"", // 交换机
q.Name, // 路由键
false, // 强制
false, // 立即
amqp.Publishing{
ContentType: "application/json",
Body: body,
},
)
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
}
消费者
package main
import (
"database/sql"
"encoding/json"
"log"
"time"
_ "github.com/go-sql-driver/mysql"
amqp "github.com/rabbitmq/amqp091-go"
)
type User struct {
Username string `json:"username"`
Password string `json:"password"`
}
// BufferedMessage 用于存储用户数据和对应的 RabbitMQ 消息
type BufferedMessage struct {
User User
Msg amqp.Delivery
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 设置 QoS 限流(每次只拉取 10 条消息)
err = ch.Qos(
10, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
// 声明队列
q, err := ch.QueueDeclare(
"register_queue", // 队列名称
true, // 持久化
false, // 自动删除
false, // 独占
false, // 不等待
nil, // 参数
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标签
false, // 手动确认
false, // 独占
false, // 不等待
false, // 不等待
nil, // 参数
)
failOnError(err, "Failed to register a consumer")
// 连接 MySQL
db, err := sql.Open("mysql", "root:123456@tcp(192.168.3.242:3306)/testdb")
failOnError(err, "Failed to connect to MySQL")
defer db.Close()
// 缓冲区(用于批量插入)
var buffer []BufferedMessage
batchSize := 100 // 每批插入的数量
ticker := time.NewTicker(1 * time.Second) // 定时器,每 2 秒触发一次批量插入
defer ticker.Stop()
for {
select {
case msg := <-msgs:
var user User
err := json.Unmarshal(msg.Body, &user)
failOnError(err, "Failed to unmarshal message")
// 将用户和消息添加到缓冲区
buffer = append(buffer, BufferedMessage{User: user, Msg: msg})
// 如果缓冲区达到批次大小,立即插入
if len(buffer) >= batchSize {
processBatch(db, buffer)
buffer = nil // 清空缓冲区
}
case <-ticker.C:
// 定时器触发,处理剩余的缓冲区数据
if len(buffer) > 0 {
processBatch(db, buffer)
buffer = nil // 清空缓冲区
}
}
}
}
// 批量插入函数
func processBatch(db *sql.DB, buffer []BufferedMessage) {
tx, err := db.Begin()
if err != nil {
log.Printf("Failed to begin transaction: %v", err)
return
}
stmt, err := tx.Prepare("INSERT INTO users (username, password_hash) VALUES (?, ?) ON DUPLICATE KEY UPDATE username=username")
if err != nil {
log.Printf("Failed to prepare statement: %v", err)
tx.Rollback()
return
}
defer stmt.Close()
// 记录插入失败的消息
var failedMessages []amqp.Delivery
for _, bufferedMsg := range buffer {
_, err := stmt.Exec(bufferedMsg.User.Username, bufferedMsg.User.Password)
if err != nil {
log.Printf("Failed to insert user: %v", err)
// 如果插入失败,记录该消息
failedMessages = append(failedMessages, bufferedMsg.Msg)
} else {
// 插入成功,确认消息
bufferedMsg.Msg.Ack(false)
}
}
err = tx.Commit()
if err != nil {
log.Printf("Failed to commit transaction: %v", err)
tx.Rollback()
return
}
log.Printf(" [x] Processed batch of %d users", len(buffer))
// 处理失败的消息
for _, msg := range failedMessages {
msg.Nack(false, true) // 重新入队
}
}
我们也可以用go routine优化
package main
import (
"database/sql"
"encoding/json"
"log"
"sync"
"time"
_ "github.com/go-sql-driver/mysql"
amqp "github.com/rabbitmq/amqp091-go"
)
type User struct {
Username string `json:"username"`
Password string `json:"password"`
}
type BufferedMessage struct {
User User
Msg amqp.Delivery
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://guest:123456@192.168.3.242:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// QoS 限流
err = ch.Qos(
10, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
// 连接 MySQL
db, err := sql.Open("mysql", "root:123456@tcp(192.168.3.242:3306)/testdb")
failOnError(err, "Failed to connect to MySQL")
defer db.Close()
// 启动多个消费者
numWorkers := 5
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(ch, db, &wg, i)
}
wg.Wait()
}
// 消费者 worker
func worker(ch *amqp.Channel, db *sql.DB, wg *sync.WaitGroup, workerID int) {
defer wg.Done()
// 声明队列
q, err := ch.QueueDeclare(
"register_queue", true, false, false, false, nil,
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, "", false, false, false, false, nil,
)
failOnError(err, "Failed to register a consumer")
// 批量插入缓冲区
var buffer []BufferedMessage
batchSize := 100
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case msg := <-msgs:
var user User
err := json.Unmarshal(msg.Body, &user)
failOnError(err, "Failed to unmarshal message")
buffer = append(buffer, BufferedMessage{User: user, Msg: msg})
// 如果缓冲区满了,立即插入
if len(buffer) >= batchSize {
processBatch(db, buffer)
buffer = nil
}
case <-ticker.C:
// 定时批量插入
if len(buffer) > 0 {
processBatch(db, buffer)
buffer = nil
}
}
}
}
// 批量插入数据库
func processBatch(db *sql.DB, buffer []BufferedMessage) {
tx, err := db.Begin()
if err != nil {
log.Printf("Failed to begin transaction: %v", err)
return
}
stmt, err := tx.Prepare("INSERT INTO users (username, password_hash) VALUES (?, ?) ON DUPLICATE KEY UPDATE username=username")
if err != nil {
log.Printf("Failed to prepare statement: %v", err)
tx.Rollback()
return
}
defer stmt.Close()
var failedMessages []amqp.Delivery
for _, bufferedMsg := range buffer {
_, err := stmt.Exec(bufferedMsg.User.Username, bufferedMsg.User.Password)
if err != nil {
log.Printf("Failed to insert user: %v", err)
failedMessages = append(failedMessages, bufferedMsg.Msg)
} else {
bufferedMsg.Msg.Ack(false)
}
}
err = tx.Commit()
if err != nil {
log.Printf("Failed to commit transaction: %v", err)
tx.Rollback()
return
}
log.Printf(" [Worker] Processed batch of %d users", len(buffer))
// 处理失败的消息
for _, msg := range failedMessages {
msg.Nack(false, true)
}
}