Golang 基础之并发基本同步原语
sync.Cond
条件变量
Cond类型原型
type Cond struct {
// L 是在观察或改变状态时保持的
L Locker
// 包含过滤或未导出的字段
}
func NewCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
Cond 实现了一个条件变量,用于等待或宣布事件发生时 goroutine 的交汇点。 在这个定义中,“事件”是指两个或更多的goroutine之间的任何信号,仅指事件发生了,不包含其他任何信息。 通常,你可能想要在收到某个 goroutine 信号前令其处于等待状态。
sync.Cond
是 Go 语言中用于协调多个 goroutine 的条件变量,常用于在共享资源状态变化时通知等待的 goroutine。以下是实际使用案例及详细说明:
案例:生产者-消费者模型
生产者向队列中添加数据,消费者从队列中取出数据。当队列为空时,消费者等待;当队列有数据时,生产者通知消费者。
代码实现
package main
import (
"fmt"
"sync"
"time"
)
type Queue struct {
items []int
cond *sync.Cond
}
func NewQueue() *Queue {
q := &Queue{
items: []int{},
cond: sync.NewCond(&sync.Mutex{}),
}
return q
}
// 生产者:向队列添加数据并通知消费者
func (q *Queue) Add(item int) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.items = append(q.items, item)
fmt.Printf("Produced: %d\n", item)
q.cond.Signal() // 通知一个等待的消费者
}
// 消费者:从队列取出数据(队列空时阻塞等待)
func (q *Queue) Get() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.items) == 0 {
fmt.Println("Consumer waiting...")
q.cond.Wait() // 释放锁并阻塞,唤醒后重新加锁
}
item := q.items[0]
q.items = q.items[1:]
fmt.Printf("Consumed: %d\n", item)
return item
}
func main() {
q := NewQueue()
// 启动消费者
go func() {
for {
q.Get()
time.Sleep(500 * time.Millisecond) // 模拟处理耗时
}
}()
// 生产者生产数据
for i := 1; i <= 5; i++ {
q.Add(i)
time.Sleep(200 * time.Millisecond) // 模拟生产间隔
}
time.Sleep(3 * time.Second) // 等待程序运行
}
关键点解释
-
条件变量初始化
通过sync.NewCond(&sync.Mutex{})
创建条件变量,底层需要一个互斥锁。 -
生产者逻辑
- 加锁修改共享资源(队列)。
- 添加数据后调用
Signal()
通知一个等待的消费者(或Broadcast()
通知所有消费者)。
-
消费者逻辑
- 加锁后检查条件(队列是否为空)。
- 如果条件不满足,调用
Wait()
:- 释放锁,允许其他 goroutine 修改共享资源。
- 阻塞直到被
Signal()
或Broadcast()
唤醒。 - 唤醒后重新加锁,继续检查条件(需用
for
循环防止虚假唤醒)。
使用场景
-
资源池管理
当 goroutine 需要等待资源(如数据库连接)可用时,条件变量可以高效协调资源的分配。 -
事件触发
多个 goroutine 等待某个事件(如系统初始化完成),事件发生后通过Broadcast()
通知所有等待者。 -
任务调度
工作协程在任务队列为空时等待,主协程添加任务后唤醒工作协程。
注意事项
- 避免虚假唤醒:始终在
for
循环中检查条件,而不是if
语句。 - 锁的粒度:确保在调用
Wait()
、Signal()
或Broadcast()
时持有锁。 - 性能优化:
Signal()
唤醒一个等待者,Broadcast()
唤醒所有等待者,根据场景选择。
通过合理使用 sync.Cond
,可以高效解决复杂的 goroutine 同步问题。
sync.Cond
在 Go 中是一个强大的工具,用于在多个 goroutine 之间高效协调共享资源的状态变化。除了经典的生产者-消费者模型,它还可以解决许多复杂的同步问题。以下是更多实际应用场景及代码示例:
1. 资源池管理(连接池、对象池)
当资源(如数据库连接)数量有限时,sync.Cond
可以协调资源的分配。如果资源暂时不可用,请求的 goroutine 可以阻塞等待,直到资源被释放。
代码示例:数据库连接池
type ConnectionPool struct {
connections []*DBConn
cond *sync.Cond
maxSize int
}
func NewConnectionPool(maxSize int) *ConnectionPool {
return &ConnectionPool{
cond: sync.NewCond(&sync.Mutex{}),
maxSize: maxSize,
}
}
// 获取连接(如果池为空则阻塞等待)
func (p *ConnectionPool) Acquire() *DBConn {
p.cond.L.Lock()
defer p.cond.L.Unlock()
// 等待直到池中有可用连接
for len(p.connections) == 0 {
fmt.Println("Waiting for connection...")
p.cond.Wait()
}
// 取出一个连接
conn := p.connections[0]
p.connections = p.connections[1:]
return conn
}
// 释放连接(归还到池中)
func (p *ConnectionPool) Release(conn *DBConn) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
// 如果池未满,归还连接并通知等待者
if len(p.connections) < p.maxSize {
p.connections = append(p.connections, conn)
p.cond.Signal() // 通知一个等待的 goroutine
}
}
场景说明:
- 当连接池为空时,
Acquire()
调用Wait()
阻塞,直到其他协程释放连接并调用Signal()
。 - 释放连接时,如果池未满,归还后通知等待的协程。
2. 任务调度(工作池模式)
多个工作协程从任务队列中获取任务执行,当队列为空时,协程阻塞等待新任务的到来。
代码示例:动态任务分发
type TaskQueue struct {
tasks []func()
cond *sync.Cond
}
func NewTaskQueue() *TaskQueue {
return &TaskQueue{
cond: sync.NewCond(&sync.Mutex{}),
}
}
// 工作协程:不断处理任务
func (q *TaskQueue) Worker(id int) {
for {
q.cond.L.Lock()
for len(q.tasks) == 0 {
q.cond.Wait() // 等待新任务
}
task := q.tasks[0]
q.tasks = q.tasks[1:]
q.cond.L.Unlock()
fmt.Printf("Worker %d processing task\n", id)
task() // 执行任务
}
}
// 添加任务并唤醒一个工作协程
func (q *TaskQueue) AddTask(task func()) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.tasks = append(q.tasks, task)
q.cond.Signal() // 通知一个等待的 Worker
}
func main() {
q := NewTaskQueue()
// 启动 3 个工作协程
for i := 1; i <= 3; i++ {
go q.Worker(i)
}
// 添加 5 个任务
for i := 1; i <= 5; i++ {
taskID := i
q.AddTask(func() {
fmt.Printf("Task %d done\n", taskID)
})
}
time.Sleep(1 * time.Second)
}
场景说明:
- 工作协程在任务队列为空时阻塞,避免空转消耗 CPU。
- 添加任务时唤醒一个空闲协程,实现高效的任务调度。
3. 事件触发(等待初始化完成)
多个协程需要等待某个初始化操作完成后才能继续执行(如配置加载、服务启动)。
代码示例:等待系统初始化
type System struct {
isReady bool
cond *sync.Cond
}
func NewSystem() *System {
return &System{
cond: sync.NewCond(&sync.Mutex{}),
}
}
// 初始化完成时调用,通知所有等待者
func (s *System) MarkReady() {
s.cond.L.Lock()
defer s.cond.L.Unlock()
s.isReady = true
s.cond.Broadcast() // 唤醒所有等待的协程
}
// 其他协程调用此方法等待初始化完成
func (s *System) WaitUntilReady() {
s.cond.L.Lock()
defer s.cond.L.Unlock()
for !s.isReady {
s.cond.Wait() // 阻塞直到 isReady 为 true
}
fmt.Println("System is ready!")
}
func main() {
sys := NewSystem()
// 启动 3 个协程等待系统就绪
for i := 0; i < 3; i++ {
go func(id int) {
sys.WaitUntilReady()
fmt.Printf("Goroutine %d starts working\n", id)
}(i)
}
// 模拟初始化耗时
time.Sleep(2 * time.Second)
sys.MarkReady()
time.Sleep(1 * time.Second)
}
场景说明:
- 多个协程调用
WaitUntilReady()
阻塞,直到主协程调用MarkReady()
。 - 使用
Broadcast()
一次性唤醒所有等待的协程。
4. 限流控制(并发请求限制)
限制同时处理的请求数量,当达到阈值时,新请求阻塞等待,直到有资源释放。
代码示例:并发请求限流器
type RateLimiter struct {
current int
max int
cond *sync.Cond
}
func NewRateLimiter(maxConcurrent int) *RateLimiter {
return &RateLimiter{
cond: sync.NewCond(&sync.Mutex{}),
max: maxConcurrent,
}
}
// 请求进入时调用,如果超过限制则阻塞
func (r *RateLimiter) Acquire() {
r.cond.L.Lock()
defer r.cond.L.Unlock()
for r.current >= r.max {
r.cond.Wait() // 等待直到有资源释放
}
r.current++
}
// 请求完成时释放资源
func (r *RateLimiter) Release() {
r.cond.L.Lock()
defer r.cond.L.Unlock()
r.current--
r.cond.Signal() // 通知一个等待的请求
}
func main() {
limiter := NewRateLimiter(2) // 允许同时处理 2 个请求
for i := 1; i <= 5; i++ {
go func(id int) {
limiter.Acquire()
defer limiter.Release()
fmt.Printf("Request %d starts\n", id)
time.Sleep(1 * time.Second)
fmt.Printf("Request %d done\n", id)
}(i)
}
time.Sleep(5 * time.Second)
}
场景说明:
- 当并发请求数超过
max
时,新请求阻塞在Acquire()
。 - 请求完成后调用
Release()
,唤醒一个等待的请求。
5. 多阶段任务协调
多个协程需要按特定顺序执行任务(如阶段 A 完成后,阶段 B 才能开始)。
代码示例:阶段任务协调
type Coordinator struct {
phase int
cond *sync.Cond
}
func NewCoordinator() *Coordinator {
return &Coordinator{
cond: sync.NewCond(&sync.Mutex{}),
}
}
// 等待直到进入目标阶段
func (c *Coordinator) WaitForPhase(targetPhase int) {
c.cond.L.Lock()
defer c.cond.L.Unlock()
for c.phase < targetPhase {
c.cond.Wait()
}
}
// 推进到下一阶段并通知所有等待者
func (c *Coordinator) NextPhase() {
c.cond.L.Lock()
defer c.cond.L.Unlock()
c.phase++
c.cond.Broadcast() // 唤醒所有等待此阶段的协程
}
func main() {
coord := NewCoordinator()
// 阶段 1 的协程
go func() {
coord.WaitForPhase(1)
fmt.Println("Phase 1 tasks running")
}()
// 阶段 2 的协程
go func() {
coord.WaitForPhase(2)
fmt.Println("Phase 2 tasks running")
}()
time.Sleep(1 * time.Second)
coord.NextPhase() // 推进到阶段 1
time.Sleep(1 * time.Second)
coord.NextPhase() // 推进到阶段 2
time.Sleep(1 * time.Second)
}
总结
sync.Cond
的核心应用场景是 需要等待某个条件成立时的高效阻塞与唤醒,常见于以下模式:
- 资源争用:连接池、对象池、限流器。
- 任务协调:生产者-消费者、工作池、多阶段任务。
- 事件通知:系统初始化、状态变更。
关键注意事项:
- 始终在
for
循环中检查条件(避免虚假唤醒)。 - 确保在调用
Wait()
、Signal()
、Broadcast()
时持有锁。 - 根据场景选择
Signal()
(唤醒一个)或Broadcast()
(唤醒所有)。
合理使用 sync.Cond
可以避免轮询(如 time.Sleep
)带来的性能损耗,使并发控制更加高效和优雅。
检查队列是否为空的条件要用for循环来包裹q.cond.Wait()?
在 Go 的 sync.Cond
中,使用 for
循环包裹 q.cond.Wait()
是为了避免虚假唤醒(Spurious Wakeup)和条件状态可能在唤醒后再次失效的问题。这是条件变量(Condition Variable)的经典设计模式,原因如下:
1. 虚假唤醒(Spurious Wakeup)
操作系统底层的条件变量实现(如 POSIX 的 pthread_cond_wait
)允许没有明确调用 Signal
/Broadcast
时,Wait
也可能被唤醒。这种现象称为“虚假唤醒”。
即使没有其他协程通知,Wait()
也可能意外返回。如果此时用 if
检查条件,会导致协程在条件未满足时继续执行,引发逻辑错误。
例如:
- 消费者协程因虚假唤醒从
Wait()
返回,但此时队列仍然是空的。 - 若用
if
检查,消费者会错误地尝试从空队列取数据,导致 panic。
2. 共享状态的竞争
即使没有虚假唤醒,条件可能在唤醒后再次失效。例如:
- 队列中有 1 个元素,消费者 A 和 B 同时在
Wait()
。 - 生产者调用
Signal()
唤醒消费者 A。 - 消费者 A 被唤醒,但在它重新获取锁之前,另一个消费者 C 突然抢到锁并取走了元素。
- 此时队列再次为空,消费者 A 必须重新检查条件(
len(q.items) == 0
),否则会尝试从空队列取数据。
如果使用 if
,消费者 A 会直接执行后续逻辑,导致错误;而 for
会强制它重新检查条件。
3. 设计规范
Go 的 sync.Cond
文档明确要求:调用 Wait
时,必须在循环中检查条件。这是条件变量的标准使用模式。
Because the first thing a
Wait
does is unlock theLocker
, it is typical for the caller to have no knowledge of whether the condition is true at the time of theWait
. The condition must be rechecked after theWait
returns.
—— Go 官方文档
代码对比
错误写法(使用 if
)
q.cond.L.Lock()
if len(q.items) == 0 { // ❌ 使用 if
q.cond.Wait()
}
// 执行时可能条件已不满足!
item := q.items[0]
...
正确写法(使用 for
)
q.cond.L.Lock()
for len(q.items) == 0 { // ✅ 使用 for
q.cond.Wait()
}
// 确保条件满足后才执行
item := q.items[0]
...
总结
for
循环确保每次从Wait()
返回后重新检查条件,防止虚假唤醒或竞争导致的逻辑错误。- 这是条件变量的通用最佳实践,不仅适用于 Go,也适用于其他语言(如 C++、Java)。
在你的生产者-消费者模型中,for
循环保证了消费者协程只有在队列确实有数据时才会继续执行,从而避免对空队列操作引发 panic。