Golang 通道

通道机制:让并发计算通过通讯来共享内存

Go也支持几种传统的数据同步技术,但是只有通道为一等公民。 通道是Go中的一种类型,所以我们可以无需引进任何代码包就可以使用通道。 几种传统的数据同步技术提供在sync和sync/atomic标准库包中。

通道类型和值

通道可以是双向的,也可以是单向的

  • chan T 表示一个元素类型为T的双向通道类型。编译器允许从此类型的值中接收或者向此类型的值中发送数据

  • chan <- T 表示一个元素类型为T的单向发送通道类型,编译器允许向此类型的值中发送数据,但是不允许从此类型的值中接收数据

  • <- chan T 表示一个元素类型为T的单向接收通道类型,编译器允许从此类型的值中接收数据,但是不允许向此类型的值中发送数据

  • 双向通道chan T的值可以被隐式转换为单向通道类型chan<- T<-chan T,但反之不行(即使显式也不行)。 类型chan<- T<-chan T的值也不能相互转换。

每个通道有一个容量属性。

一个容量为0的通道称为非缓冲通道

一个容量不为0的通道称为一个缓冲通道

通道类型的零值也使用预声明的nil来表示。 一个非零通道值必须通过内置的make函数来创建。 比如make(chan int, 10)将创建一个元素类型为int的通道值。 第二个参数指定了欲创建的通道的容量。此第二个实参是可选的,它的默认值为0。

通道值的比较

所有通道类型均为可比较类型。

当一个通道值被赋给另一个通道值后,这两个通道值将共享相同的底层部分。 换句话说,这两个通道引用着同一个底层的内部通道对象。 比较这两个通道的结果为true。

package main

import "fmt"

func main() {
	// 创建一个带有缓冲区的通道
	ch1 := make(chan int, 1)

	// 将 ch1 赋值给 ch2
	ch2 := ch1

	// 向通道 ch1 发送一个值
	ch1 <- 42

	// 从通道 ch2 接收该值
	value := <-ch2

	fmt.Println("接收到的值是:", value)

	// 比较两个通道
	fmt.Println("ch1 和 ch2 是相同的通道:", ch1 == ch2)
}

通道操作

<-在左边是输出,<-在右边是输入

Go中有五种通道相关的操作。

  1. 调用内置函数close来关闭一个通道:

    close(Ch)
    

    传给close函数调用的实参必须为一个通道值,并且此通道值不能为单向接收的。

    package main
    
    func main() {
    	// 创建一个带有缓冲区的通道
    	ch := make(chan int, 1)
    	var chRecv <-chan int = ch
    	close(chRecv)
    
    }
    
    

  2. 使用下面的语法向通道ch发送一个值v:

    ch <- v
    
    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    var wg sync.WaitGroup
    
    func main() {
    	ch := make(chan int, 2)
    	ch <- 12
    	ch <- 13
    	wg.Add(1)
    	go func() {
    		for num := range ch {
    			fmt.Println(num)
    		}
    		wg.Done()
    	}()
    	close(ch)
    	wg.Wait()
    }
    
    

  3. 使用下面的语法可以向通道ch取出一个值

    <-ch
    
    package main
    
    import "fmt"
    
    func main() {
    	ch := make(chan int, 3)
    	ch <- 1
    	ch <- 2
    	ch <- 3
    	fmt.Println(<-ch)
    	fmt.Println(<-ch)
    	fmt.Println(<-ch)
    }
    
    

    如果一个通道操作不永久阻塞,它总会返回至少一个值,此值的类型为通道ch的元素类型。

  4. 查询一个通道的容量:

    cap(ch)
    

其中cap是一个已经在容器类型(第18章)一文中介绍过的内置函数。 cap的返回值的类型为内置类型int。

package main

import "fmt"

func main() {
	ch := make(chan int, 3)
	fmt.Println(cap(ch))
}

  1. 查询一个通道的长度:

    len(ch)
    
    package main
    
    import "fmt"
    
    func main() {
    	ch := make(chan int, 3)
    	fmt.Println(cap(ch))
    	fmt.Println(len(ch))
    	ch <- 12
    	fmt.Println(len(ch))
    }
    
    

    其中len是一个已经在容器类型(第18章)一文中介绍过的内置函数。 len的返回值的类型也为内置类型int。 一个通道的长度是指当前有多少个已被发送到此通道但还未被接收出去的元素值。

Go中大多数的基本操作都是未同步的。换句话说,它们都不是并发安全的。 这些操作包括赋值、传参、和各种容器值操作等。 但是,上面列出的五种通道相关的操作都已经同步过了,因此它们可以在并发协程中安全运行而无需其它同步操作。注意:通道的赋值和其它类型值的赋值一样,是未同步的。 同样,将刚从一个通道接收出来的值赋给另一个值也是未同步的。如果被查询的通道为一个nil零值通道,则cap和len函数调用都返回0。 这两个操作是如此简单,所以后面将不再对它们进行详解。 事实上,这两个操作在实践中很少使用。

通道操作详解

为了让解释简单清楚,在本文后续部分,通道将被归为三类:

  1. 零值(nil)通道;
  2. 非零值但已关闭的通道;
  3. 非零值并且尚未关闭的通道。

下表简单地描述了三种通道操作施加到三类通道的结果。

对于上表中的五种未打上标的情形,规则很简单:

  • 关闭一个nil通道或者一个已经关闭的通道将产生一个恐慌。
  • 向一个已关闭的通道发送数据也将导致一个恐慌。
  • 向一个nil通道发送数据或者从一个nil通道接收数据将使当前协程永久阻塞。

我们可以认为一个通道内部维护了三个队列(均可被视为先进先出队列):

  1. 接收数据协程队列,此队列是一个没有长度限制的链表。 此队列中的协程均处于阻塞状态,它们正等待着从此通道接收数据。

  2. 发送数据协程队列.此队列也是一个没有长度限制的链表。 此队列中的协程亦均处于阻塞状态,它们正等待着向此通道发送数据。 此队列中的每个协程将要发送的值(或者此值的指针,取决于具体编译器实现)和此协程一起存储在此队列中。

  3. 数据缓冲队列。这是一个循环队列(绝对先进先出),它的长度为此通道的容量。此队列中存放的值的类型都为此通道的元素类型。 如果此队列中当前存放的值的个数已经达到此通道的容量,则我们说此通道已经处于满槽状态。 如果此队列中当前存放的值的个数为零,则我们说此通道处于空槽状态。 对于一个非缓冲通道(容量为零),它总是同时处于满槽状态和空槽状态。

每个通道内部维护着一个互斥锁用来在各种通道操作中防止数据竞争。

这里必须解释一下了,上面的单向发送通道指的是只写,也就是说,只能往里面写入数据,但是不能从里面读取数据

单向接收通道,指的是外面的变量来接收这个通道里的值,也就是只读,只能从里面读取数据,但是不能向里面写入数据

那么发送数据协程和接收数据协程又分别是干嘛的呢?

接收数据协程:它是从通道里面往出读数据的

发送数据协程: 它是向通道里面输送数据的

几种协程

在通道这个数据结构中,主要涉及到以下几种协程:

  1. 发送数据协程:尝试向通道发送数据的协程。如果通道的缓冲区未满或有接收数据协程等待,它可以非阻塞地发送数据;否则,它将进入阻塞状态,直到有接收数据协程取走数据。
  2. 接收数据协程:尝试从通道接收数据的协程。如果通道有数据可用或有发送数据协程等待,它可以非阻塞地接收数据;否则,它将进入阻塞状态,直到有新的数据可用。
  3. 关闭通道的协程:尝试关闭通道的协程。在通道关闭时,它会处理接收数据协程队列和发送数据协程队列,并触发相应的行为(如返回零值或产生恐慌)。

select-case

在Go语言的 select 语句中,每个 case 语句只有在能够从通道中读取到值或者能够向通道发送值的时候才会执行。如果某个 case 对应的通道操作会阻塞,那么 select 会跳过这个 case 并尝试下一个 case。如果所有的 case 都会阻塞,select 就会阻塞,直到某个 case 能够继续执行。

select 语句的执行顺序

select 语句的执行顺序并不是顺序执行,而是伪随机地选择一个可以执行的 case。具体细节如下:

  1. 可执行的 case:如果有多个 case 是可执行的(即通道已准备好),Go 会从中伪随机选择一个执行。
  2. 全部阻塞:如果所有 case 都会阻塞,则 select 自身也会阻塞,直到其中一个 case 可以执行。
  3. default 语句:如果有 default 语句且所有其他 case 都不能执行,则 default 语句将被执行。

通道锁顺序

每个通道内部维护一个互斥锁,用于确保并发安全。通道操作涉及到锁的获取和释放,具体如下:

  1. 发送操作

    • 尝试获取通道的锁。
    • 如果通道的接收协程队列不为空,从队列中取出一个接收协程,并将数据发送给它。
    • 如果通道的缓冲区未满,将数据放入缓冲区。
    • 如果上述条件都不满足,当前发送协程进入阻塞状态,等待接收协程唤醒。
  2. 接收操作

    • 尝试获取通道的锁。
    • 如果通道的缓冲区不为空,从缓冲区取出一个值。
    • 如果通道的发送协程队列不为空,从队列中取出一个发送协程,接收其发送的数据。
    • 如果上述条件都不满足,当前接收协程进入阻塞状态,等待发送协程唤醒。
  3. 关闭操作

    • 尝试获取通道的锁。
    • 将接收队列中的所有接收协程从队列中移出,并向它们发送通道元素类型的零值。
    • 将发送队列中的所有发送协程从队列中移出,并触发恐慌。

通道使用的例子

返回单向接收通道做为函数返回结果

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// 返回单向接收通道做为函数返回结果
func longTimeRequest() <-chan int32 {
	r := make(chan int32)
	go func() {
		fmt.Println("模仿负载")
		time.Sleep(time.Second * 3)
		num := rand.Int31n(100)
		r <- num
	}()
	return r
}

func calculate_sum(a, b int32) int32 {
	return a * b
}

func main() {
	a, b := longTimeRequest(), longTimeRequest()
	fmt.Println(calculate_sum(<-a, <-b))
}

将单向发送通道类型用做函数实参

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func longTimeRequest(r chan<- int32) {
	time.Sleep(time.Second * 3)
	num := rand.Int31n(100)
	r <- num
}
func calculateSum(a, b int32) int32 {
	return a*a + b*b
}

func main() {
	ra, rb := make(chan int32), make(chan int32)
	go longTimeRequest(ra)
	go longTimeRequest(rb)
	result := calculateSum(<-ra, <-rb)
	fmt.Println(result)

}

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func longTimeRequest(r chan<- int32) {
	time.Sleep(time.Second * 3)
	num := rand.Int31n(100)
	r <- num
}
func calculateSum(a, b int32) int32 {
	return a*a + b*b
}

func main() {
	//ra, rb := make(chan int32), make(chan int32)
	r := make(chan int32, 2)
	go longTimeRequest(r)
	go longTimeRequest(r)
	result := calculateSum(<-r, <-r)
	fmt.Println(result)

}

采用最快回应

有时候,一份数据可能同时从多个数据源获取。这些数据源将返回相同的数据。 因为各种因素,这些数据源的回应速度参差不一,甚至某个特定数据源的多次回应速度之间也可能相差很大。 同时从多个数据源获取一份相同的数据可以有效保障低延迟。我们只需采用最快的回应并舍弃其它较慢回应。

注意:如果有N个数据源,为了防止被舍弃的回应对应的协程永久阻塞,则传输数据用的通道必须为一个容量至少为N-1的缓冲通道。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func source(r chan<- int32) {
	ra, rb := rand.Int31(), rand.Intn(4)+1
	time.Sleep(time.Duration(rb) * time.Second)
	r <- ra

}
func main() {
	start := time.Now()
	r := make(chan int32, 5)
	for i := 0; i < cap(r); i++ {
		go source(r)
	}
	result := <-r
	elapsed := time.Since(start)
	fmt.Println("result is:", result)
	fmt.Println("used:", elapsed)

}

使用通道实现通知

通知可以被看作是特殊的请求/回应用例。在一个通知用例中,我们并不关心回应的值,我们只关心回应是否已发生。 所以我们常常使用空结构体类型struct{}来做为通道的元素类型,因为空结构体类型的尺寸为零,能够节省一些内存(虽然常常很少量)。

向一个通道发送一个值来实现单对单通知

我们已知道,如果一个通道中无值可接收,则此通道上的下一个接收操作将阻塞到另一个协程发送一个值到此通道为止。 所以一个协程可以向此通道发送一个值来通知另一个等待着从此通道接收数据的协程。

在下面这个例子中,通道done被用来做为一个信号通道来实现单对单通知。

package main

import (
	"crypto/rand"
	"fmt"
	"os"
	"sort"
	"time"
)

func calculate_action(done chan<- struct{}) {
	time.Sleep(time.Second * 3)
	fmt.Println("计算完成")
	done <- struct{}{}
}
func main() {
	start := time.Now()
	values := make([]byte, 32*1024*1024)
	if _, err := rand.Read(values); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	done := make(chan struct{}, 2)
	// 排序协程
	go func() {
		sort.Slice(values, func(i, j int) bool {
			return values[i] < values[j]
		})
		fmt.Println("生成随机byte数组内容完成")
		done <- struct{}{}
	}()
	// 计算协程 模拟耗时
	go calculate_action(done)

	for i := 0; i < cap(done); i++ {
		<-done
	}
	end := time.Since(start).Seconds()
	fmt.Println("任务已完成,耗时:", end)

}

多对单和单对多通知

略微扩展一下上面两个用例,我们可以很轻松地实现多对单和单对多通知。

package main

import (
	"log"
	"time"
)

type T struct{}

func worker(id int, ready <-chan T, done chan<- T) {
	<-ready
	log.Printf("worker %d is ready\n", id)
	// 模拟工作负载
	time.Sleep(time.Second * time.Duration(id+1))
	log.Printf("worker %d is done\n", id)
	// 通知已完成
	done <- T{}
}

func main() {
	ready, done := make(chan T), make(chan T, 3)

	// 启动3个worker协程
	for i := 1; i <= 3; i++ {
		go worker(i, ready, done)
	}

	// 向所有worker发送准备信号
	for i := 0; i < 3; i++ {
		time.Sleep(time.Second)
		ready <- T{}
	}

	// 等待所有worker完成
	for i := 0; i < 3; i++ {
		<-done
	}

	log.Println("所有任务已完成")
}

主协程通知子协程开始,子协程通知主协程完成

事实上,上例中展示的多对单和单对多通知实现方式在实践中用的并不多。 在实践中,我们多使用sync.WaitGroup来实现多对单通知,使用关闭一个通道的方式来实现单对多通知(详见下一个用例)。

package main

import (
	"log"
	"sync"
	"time"
)

var wg sync.WaitGroup

func worker(id int, wg *sync.WaitGroup) {
	defer wg.Done()
	log.Printf("worker %d is starting\n", id) // 模拟工作负载
	time.Sleep(time.Second * time.Duration(id+1))
	log.Printf("worker %d is done\n", id)
}
func main() {
	wokerCount := 3
	for i := 1; i <= wokerCount; i++ {
		wg.Add(1)
		go worker(i, &wg)
	}
	wg.Wait()
	log.Println("所有任务已完成")
}

通过关闭一个通道来实现群发通知

上一个用例中的单对多通知实现在实践中很少用,因为通过关闭一个通道的方式在来实现单对多通知的方式更简单。 我们已经知道,从一个已关闭的通道可以接收到无穷个值,我们可以利用这一特性来实现群发通知。

我们可以把上一个例子中的三个数据发送操作ready <- struct{}{}替换为一个通道关闭操作close(ready)来达到同样的单对多通知效果。

package main

import (
	"log"
	"time"
)

type T struct{}

func worker(id int, ready <-chan T, done chan<- T) {
	<-ready
	log.Printf("worker %d is ready\n", id)
	// 模拟工作负载
	time.Sleep(time.Second * time.Duration(id+1))
	log.Printf("worker %d is done\n", id)
	// 通知已完成
	done <- T{}
}

func main() {
	ready, done := make(chan T), make(chan T, 3)

	// 启动3个worker协程
	for i := 1; i <= 3; i++ {
		go worker(i, ready, done)
	}

	// 向所有worker发送准备信号
	close(ready)

	// 等待所有worker完成
	for i := 0; i < 3; i++ {
		<-done
	}

	log.Println("所有任务已完成")
}

当然,我们也可以通过关闭一个通道来实现单对单通知。事实上,关闭通道是实践中用得最多通知实现方式。
从一个已关闭的通道可以接收到无穷个值这一特性也将被用在很多其它在后面将要介绍的用例中。 实际上,这一特性被广泛地使用于标准库包中。比如,context标准库包使用了此特性来传达操作取消消息。

定时通知(timer)

用通道实现一个一次性的定时通知器是很简单的。 下面是一个自定义实现:

package main

import (
	"fmt"
	"time"
)

func AfterDuration(d time.Duration) <-chan struct{} {
	c := make(chan struct{})
	go func() {
		time.Sleep(d * time.Second)
		c <- struct{}{}
	}()
	return c
}
func main() {
	fmt.Println("hi")
	<-AfterDuration(1)
	fmt.Println("nice")
	<-AfterDuration(1)
	fmt.Println("nicesss")
}

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("hi")
	<-time.After(2 * time.Second)
	fmt.Println("nice")
	<-time.After(2 * time.Second)
	fmt.Println("nicesss")

}

https://www.geeksforgeeks.org/time-after-function-in-golang-with-examples/

time.After使用场景

  • 超时控制:在网络请求或长时间运行的操作中,使用 time.After 可以设置超时限制。
  • 定时任务:通过 time.After 可以在一定的延迟后执行某些操作。

time.After一般用来做超时提示,比如

// Select statement 
    select { 
  
    // Using case statement to receive 
    // or send operation on channel and 
    // calling After() method with its 
    // parameter 
    case <-time.After(3 * time.Second): 
  
        // Printed when timed out 
        fmt.Println("Time Out!") 
    } 

将通道用做互斥锁(mutex)

上面的某个例子提到了容量为1的缓冲通道可以用做一次性二元信号量 。 事实上,容量为1的缓冲通道也可以用做多次性二元信号量(即互斥锁)尽管这样的互斥锁效率不如sync标准库包中提供的互斥锁高效。

互斥:是指散步在不同任务之间的若干程序片断,当某个任务运行其中一个程序片段时,其它任务就不能运行它们之中的任一程序片段,只能等到该任务运行完这个程序片段后才可以运行。最基本的场景就是:一个公共资源同一时刻只能被一个进程或线程使用,多个进程或线程不能同时使用公共资源。

同步:是指散步在不同任务之间的若干程序片断,它们的运行必须严格按照规定的某种先后次序来运行,这种先后次序依赖于要完成的特定的任务。最基本的场景就是:两个或两个以上的进程或线程在运行过程中协同步调,按预定的先后次序运行。比如 A 任务的运行依赖于 B 任务产生的数据。

显然,同步是一种更为复杂的互斥,而互斥是一种特殊的同步。也就是说互斥是两个任务之间不可以同时运行,他们会相互排斥,必须等待一个线程运行完毕,另一个才能运行,而同步也是不能同时运行,但他是必须要安照某种次序来运行相应的线程(也是一种互斥)!因此互斥具有唯一性和排它性,但互斥并不限制任务的运行顺序,即任务是无序的,而同步的任务之间则有顺序关系。