Golang 基础之并发基本同步原语

sync.Cond

条件变量

Cond类型原型

type Cond struct {
  // L 是在观察或改变状态时保持的
  L Locker
  // 包含过滤或未导出的字段
}
​
func NewCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()

Cond 实现了一个条件变量,用于等待或宣布事件发生时 goroutine 的交汇点。 在这个定义中,“事件”是指两个或更多的goroutine之间的任何信号,仅指事件发生了,不包含其他任何信息。 通常,你可能想要在收到某个 goroutine 信号前令其处于等待状态。

sync.Cond 是 Go 语言中用于协调多个 goroutine 的条件变量,常用于在共享资源状态变化时通知等待的 goroutine。以下是实际使用案例及详细说明:


案例:生产者-消费者模型

生产者向队列中添加数据,消费者从队列中取出数据。当队列为空时,消费者等待;当队列有数据时,生产者通知消费者。

代码实现

package main

import (
    "fmt"
    "sync"
    "time"
)

type Queue struct {
    items []int
    cond  *sync.Cond
}

func NewQueue() *Queue {
    q := &Queue{
        items: []int{},
        cond:  sync.NewCond(&sync.Mutex{}),
    }
    return q
}

// 生产者:向队列添加数据并通知消费者
func (q *Queue) Add(item int) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    q.items = append(q.items, item)
    fmt.Printf("Produced: %d\n", item)
    q.cond.Signal() // 通知一个等待的消费者
}

// 消费者:从队列取出数据(队列空时阻塞等待)
func (q *Queue) Get() int {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    for len(q.items) == 0 {
        fmt.Println("Consumer waiting...")
        q.cond.Wait() // 释放锁并阻塞,唤醒后重新加锁
    }
    item := q.items[0]
    q.items = q.items[1:]
    fmt.Printf("Consumed: %d\n", item)
    return item
}

func main() {
    q := NewQueue()

    // 启动消费者
    go func() {
        for {
            q.Get()
            time.Sleep(500 * time.Millisecond) // 模拟处理耗时
        }
    }()

    // 生产者生产数据
    for i := 1; i <= 5; i++ {
        q.Add(i)
        time.Sleep(200 * time.Millisecond) // 模拟生产间隔
    }

    time.Sleep(3 * time.Second) // 等待程序运行
}

关键点解释

  1. 条件变量初始化
    通过 sync.NewCond(&sync.Mutex{}) 创建条件变量,底层需要一个互斥锁。

  2. 生产者逻辑

    • 加锁修改共享资源(队列)。
    • 添加数据后调用 Signal() 通知一个等待的消费者(或 Broadcast() 通知所有消费者)。
  3. 消费者逻辑

    • 加锁后检查条件(队列是否为空)。
    • 如果条件不满足,调用 Wait()
      • 释放锁,允许其他 goroutine 修改共享资源。
      • 阻塞直到被 Signal()Broadcast() 唤醒。
      • 唤醒后重新加锁,继续检查条件(需用 for 循环防止虚假唤醒)。

使用场景

  1. 资源池管理
    当 goroutine 需要等待资源(如数据库连接)可用时,条件变量可以高效协调资源的分配。

  2. 事件触发
    多个 goroutine 等待某个事件(如系统初始化完成),事件发生后通过 Broadcast() 通知所有等待者。

  3. 任务调度
    工作协程在任务队列为空时等待,主协程添加任务后唤醒工作协程。


注意事项

  • 避免虚假唤醒:始终在 for 循环中检查条件,而不是 if 语句。
  • 锁的粒度:确保在调用 Wait()Signal()Broadcast() 时持有锁。
  • 性能优化Signal() 唤醒一个等待者,Broadcast() 唤醒所有等待者,根据场景选择。

通过合理使用 sync.Cond,可以高效解决复杂的 goroutine 同步问题。

sync.Cond 在 Go 中是一个强大的工具,用于在多个 goroutine 之间高效协调共享资源的状态变化。除了经典的生产者-消费者模型,它还可以解决许多复杂的同步问题。以下是更多实际应用场景及代码示例:


1. 资源池管理(连接池、对象池)

当资源(如数据库连接)数量有限时,sync.Cond 可以协调资源的分配。如果资源暂时不可用,请求的 goroutine 可以阻塞等待,直到资源被释放。

代码示例:数据库连接池

type ConnectionPool struct {
    connections []*DBConn
    cond        *sync.Cond
    maxSize     int
}

func NewConnectionPool(maxSize int) *ConnectionPool {
    return &ConnectionPool{
        cond:    sync.NewCond(&sync.Mutex{}),
        maxSize: maxSize,
    }
}

// 获取连接(如果池为空则阻塞等待)
func (p *ConnectionPool) Acquire() *DBConn {
    p.cond.L.Lock()
    defer p.cond.L.Unlock()

    // 等待直到池中有可用连接
    for len(p.connections) == 0 {
        fmt.Println("Waiting for connection...")
        p.cond.Wait()
    }

    // 取出一个连接
    conn := p.connections[0]
    p.connections = p.connections[1:]
    return conn
}

// 释放连接(归还到池中)
func (p *ConnectionPool) Release(conn *DBConn) {
    p.cond.L.Lock()
    defer p.cond.L.Unlock()

    // 如果池未满,归还连接并通知等待者
    if len(p.connections) < p.maxSize {
        p.connections = append(p.connections, conn)
        p.cond.Signal() // 通知一个等待的 goroutine
    }
}

场景说明

  • 当连接池为空时,Acquire() 调用 Wait() 阻塞,直到其他协程释放连接并调用 Signal()
  • 释放连接时,如果池未满,归还后通知等待的协程。

2. 任务调度(工作池模式)

多个工作协程从任务队列中获取任务执行,当队列为空时,协程阻塞等待新任务的到来。

代码示例:动态任务分发

type TaskQueue struct {
    tasks []func()
    cond  *sync.Cond
}

func NewTaskQueue() *TaskQueue {
    return &TaskQueue{
        cond: sync.NewCond(&sync.Mutex{}),
    }
}

// 工作协程:不断处理任务
func (q *TaskQueue) Worker(id int) {
    for {
        q.cond.L.Lock()
        for len(q.tasks) == 0 {
            q.cond.Wait() // 等待新任务
        }
        task := q.tasks[0]
        q.tasks = q.tasks[1:]
        q.cond.L.Unlock()

        fmt.Printf("Worker %d processing task\n", id)
        task() // 执行任务
    }
}

// 添加任务并唤醒一个工作协程
func (q *TaskQueue) AddTask(task func()) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    q.tasks = append(q.tasks, task)
    q.cond.Signal() // 通知一个等待的 Worker
}

func main() {
    q := NewTaskQueue()
    // 启动 3 个工作协程
    for i := 1; i <= 3; i++ {
        go q.Worker(i)
    }
    // 添加 5 个任务
    for i := 1; i <= 5; i++ {
        taskID := i
        q.AddTask(func() {
            fmt.Printf("Task %d done\n", taskID)
        })
    }
    time.Sleep(1 * time.Second)
}

场景说明

  • 工作协程在任务队列为空时阻塞,避免空转消耗 CPU。
  • 添加任务时唤醒一个空闲协程,实现高效的任务调度。

3. 事件触发(等待初始化完成)

多个协程需要等待某个初始化操作完成后才能继续执行(如配置加载、服务启动)。

代码示例:等待系统初始化

type System struct {
    isReady bool
    cond    *sync.Cond
}

func NewSystem() *System {
    return &System{
        cond: sync.NewCond(&sync.Mutex{}),
    }
}

// 初始化完成时调用,通知所有等待者
func (s *System) MarkReady() {
    s.cond.L.Lock()
    defer s.cond.L.Unlock()
    s.isReady = true
    s.cond.Broadcast() // 唤醒所有等待的协程
}

// 其他协程调用此方法等待初始化完成
func (s *System) WaitUntilReady() {
    s.cond.L.Lock()
    defer s.cond.L.Unlock()
    for !s.isReady {
        s.cond.Wait() // 阻塞直到 isReady 为 true
    }
    fmt.Println("System is ready!")
}

func main() {
    sys := NewSystem()
    // 启动 3 个协程等待系统就绪
    for i := 0; i < 3; i++ {
        go func(id int) {
            sys.WaitUntilReady()
            fmt.Printf("Goroutine %d starts working\n", id)
        }(i)
    }
    // 模拟初始化耗时
    time.Sleep(2 * time.Second)
    sys.MarkReady()
    time.Sleep(1 * time.Second)
}

场景说明

  • 多个协程调用 WaitUntilReady() 阻塞,直到主协程调用 MarkReady()
  • 使用 Broadcast() 一次性唤醒所有等待的协程。

4. 限流控制(并发请求限制)

限制同时处理的请求数量,当达到阈值时,新请求阻塞等待,直到有资源释放。

代码示例:并发请求限流器

type RateLimiter struct {
    current int
    max     int
    cond    *sync.Cond
}

func NewRateLimiter(maxConcurrent int) *RateLimiter {
    return &RateLimiter{
        cond: sync.NewCond(&sync.Mutex{}),
        max:  maxConcurrent,
    }
}

// 请求进入时调用,如果超过限制则阻塞
func (r *RateLimiter) Acquire() {
    r.cond.L.Lock()
    defer r.cond.L.Unlock()
    for r.current >= r.max {
        r.cond.Wait() // 等待直到有资源释放
    }
    r.current++
}

// 请求完成时释放资源
func (r *RateLimiter) Release() {
    r.cond.L.Lock()
    defer r.cond.L.Unlock()
    r.current--
    r.cond.Signal() // 通知一个等待的请求
}

func main() {
    limiter := NewRateLimiter(2) // 允许同时处理 2 个请求
    for i := 1; i <= 5; i++ {
        go func(id int) {
            limiter.Acquire()
            defer limiter.Release()
            fmt.Printf("Request %d starts\n", id)
            time.Sleep(1 * time.Second)
            fmt.Printf("Request %d done\n", id)
        }(i)
    }
    time.Sleep(5 * time.Second)
}

场景说明

  • 当并发请求数超过 max 时,新请求阻塞在 Acquire()
  • 请求完成后调用 Release(),唤醒一个等待的请求。

5. 多阶段任务协调

多个协程需要按特定顺序执行任务(如阶段 A 完成后,阶段 B 才能开始)。

代码示例:阶段任务协调

type Coordinator struct {
    phase   int
    cond    *sync.Cond
}

func NewCoordinator() *Coordinator {
    return &Coordinator{
        cond: sync.NewCond(&sync.Mutex{}),
    }
}

// 等待直到进入目标阶段
func (c *Coordinator) WaitForPhase(targetPhase int) {
    c.cond.L.Lock()
    defer c.cond.L.Unlock()
    for c.phase < targetPhase {
        c.cond.Wait()
    }
}

// 推进到下一阶段并通知所有等待者
func (c *Coordinator) NextPhase() {
    c.cond.L.Lock()
    defer c.cond.L.Unlock()
    c.phase++
    c.cond.Broadcast() // 唤醒所有等待此阶段的协程
}

func main() {
    coord := NewCoordinator()
    // 阶段 1 的协程
    go func() {
        coord.WaitForPhase(1)
        fmt.Println("Phase 1 tasks running")
    }()
    // 阶段 2 的协程
    go func() {
        coord.WaitForPhase(2)
        fmt.Println("Phase 2 tasks running")
    }()

    time.Sleep(1 * time.Second)
    coord.NextPhase() // 推进到阶段 1
    time.Sleep(1 * time.Second)
    coord.NextPhase() // 推进到阶段 2
    time.Sleep(1 * time.Second)
}

总结

sync.Cond 的核心应用场景是 需要等待某个条件成立时的高效阻塞与唤醒,常见于以下模式:

  1. 资源争用:连接池、对象池、限流器。
  2. 任务协调:生产者-消费者、工作池、多阶段任务。
  3. 事件通知:系统初始化、状态变更。

关键注意事项

  • 始终在 for 循环中检查条件(避免虚假唤醒)。
  • 确保在调用 Wait()Signal()Broadcast() 时持有锁。
  • 根据场景选择 Signal()(唤醒一个)或 Broadcast()(唤醒所有)。

合理使用 sync.Cond 可以避免轮询(如 time.Sleep)带来的性能损耗,使并发控制更加高效和优雅。

检查队列是否为空的条件要用for循环来包裹q.cond.Wait()?

在 Go 的 sync.Cond 中,使用 for 循环包裹 q.cond.Wait() 是为了避免虚假唤醒(Spurious Wakeup)条件状态可能在唤醒后再次失效的问题。这是条件变量(Condition Variable)的经典设计模式,原因如下:


1. 虚假唤醒(Spurious Wakeup)

操作系统底层的条件变量实现(如 POSIX 的 pthread_cond_wait)允许没有明确调用 Signal/Broadcast 时,Wait 也可能被唤醒。这种现象称为“虚假唤醒”。
即使没有其他协程通知,Wait() 也可能意外返回。如果此时用 if 检查条件,会导致协程在条件未满足时继续执行,引发逻辑错误。

例如:

  • 消费者协程因虚假唤醒从 Wait() 返回,但此时队列仍然是空的。
  • 若用 if 检查,消费者会错误地尝试从空队列取数据,导致 panic。

2. 共享状态的竞争

即使没有虚假唤醒,条件可能在唤醒后再次失效。例如:

  1. 队列中有 1 个元素,消费者 A 和 B 同时在 Wait()
  2. 生产者调用 Signal() 唤醒消费者 A。
  3. 消费者 A 被唤醒,但在它重新获取锁之前,另一个消费者 C 突然抢到锁并取走了元素。
  4. 此时队列再次为空,消费者 A 必须重新检查条件(len(q.items) == 0),否则会尝试从空队列取数据。

如果使用 if,消费者 A 会直接执行后续逻辑,导致错误;而 for 会强制它重新检查条件。


3. 设计规范

Go 的 sync.Cond 文档明确要求:调用 Wait 时,必须在循环中检查条件。这是条件变量的标准使用模式。

Because the first thing a Wait does is unlock the Locker, it is typical for the caller to have no knowledge of whether the condition is true at the time of the Wait. The condition must be rechecked after the Wait returns.
—— Go 官方文档


代码对比

错误写法(使用 if

q.cond.L.Lock()
if len(q.items) == 0 { // ❌ 使用 if
    q.cond.Wait()
}
// 执行时可能条件已不满足!
item := q.items[0]
...

正确写法(使用 for

q.cond.L.Lock()
for len(q.items) == 0 { // ✅ 使用 for
    q.cond.Wait()
}
// 确保条件满足后才执行
item := q.items[0]
...

总结

  • for 循环确保每次从 Wait() 返回后重新检查条件,防止虚假唤醒或竞争导致的逻辑错误。
  • 这是条件变量的通用最佳实践,不仅适用于 Go,也适用于其他语言(如 C++、Java)。

在你的生产者-消费者模型中,for 循环保证了消费者协程只有在队列确实有数据时才会继续执行,从而避免对空队列操作引发 panic。