Golang 中的并发

关于什么是并发,竞争和死锁等在这里就不讨论了。Golang 中实现并发系统的主要工具就是 goroutine。

1 Goroutine

Goroutine 是一个独立于程序其余部分执行的函数,它的基本元素是一个函数。每个函数都可以成为 goroutine,只需要在函数调用之前添加 go 关键字。每个程序都存在一个 goroutine,那就是 main goroutine。

来看一个例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
fmt.Println("Launch goroutine")
go printNumber()
time.Sleep(1 * time.Minute)
}

func printNumber() {
i := 0
for {
time.Sleep(1 * time.Second)
i++
fmt.Println(i)
}
}

虽然 main 函数睡眠了1分钟,但是 printNumber 函数还是会照常执行。

2 Channel

2.1 定义

Channel 是一种类型化的通信管道。Goroutine 可以通过通道相互通信。通道可以看作是两个 goroutine 之间的数据管道。此管道只能支持特定类型。

2.2 Channel 类型

Channel 分为 3 种类型,分别是:

  • 只能发送,语法为:chan <- T(T 为 channel 所支持的类型,下同)
  • 只能接收,语法为:<- chan T
  • 双向,语法为:chan T
    channel的三种类型

2.3 Channel 的初始化

Channel 同样使用 Go 内建的 make 函数进行初始化。

1
2
3
4
5
// 1
ch := make(chan int)

// 2
ch := make(chan int, 3)

第一种方式创建了一个 int 类型的不带缓冲的 channel,它等价于 make(chan int, 0),第二种方式创建了一个 int 类型缓冲大小为 3 的 channel。

可以通过 len 函数获取 channel 中的元素个数,cap 函数则可以获取到 channel 的缓冲区大小。

2.4 使用 channel 发送数据

使用 <- 来表示将数据发送到这个 channel,这个符号代表数据从右边流动到左边。

1
2
ch := make(chan int, 3)
ch <- 42

上面的代码初始化了一个缓存为3的 channel,然后将 42 发送到这个 channel。

发送数据需要注意一些特性:

  1. channel 和表达式在通信之前执行;
  2. 仅在 channel 打开的时候可以发送,如果是 close 的时候发送则会引起程序 panic;
  3. 如果 channel 为 nil,向其发送数据会永久阻塞程序执行。

2.5 关闭 channel

使用内建的 close 函数来关闭 channel,关闭 channel 意味着不再有数据发送从这个 channel 发送。

  • 无法关闭一个仅接收的 channel;
  • 无法向一个关闭的 channel 发送数据;
  • 无法关闭一个已经关闭的 channel;
  • 可以从一个已经关闭的 channel 接收数据。
    1
    2
    3
    ch := make(chan int, 3)
    ch <- 42
    close(ch)

2.6 接收 channel 的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ch := make(chan int, 3)
ch <- 42
ch <- 41

// 1
val := <- ch
fmt.Println(val)

// 2
val, ok := <- ch
if !ok {
fmt.Println("Channel is empty or closed.")
} else {
fmt.Println(val)
}

在方式 1 中,使用变量 val 来接收 channel 中的数据,输出为 42。
方式 2 中,使用了两个变量来接收 channel 的数据,ok 用来判断 channel 是否为空或者是否已关闭。ok 为 true 则 val 能正常接收到数据,ok 为 false 则表示 channel 为空或已关闭。本例中,输出为 42。

2.7 无缓冲 Channel

当向无缓冲的 channel 发送数据时,goroutine 会被阻塞,直到数据被另一个 goroutine 接收。这是因为 channel 没有缓冲来存储发送的数据,因此当前 goroutine 阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"log"
"time"
)

func main() {
ch := make(chan int)
go printNumber(ch)

log.Println("Launch goroutine")
ch <- 42
log.Println("Received")

time.Sleep(time.Minute * 1)
}

func printNumber(ch chan int) {
time.Sleep(time.Second * 3)
val := <-ch
log.Println(val)
}

输出如下:

1
2
3
2022/09/09 08:51:23 Launch goroutine
2022/09/09 08:51:26 Received
2022/09/09 08:51:26 42

可以看到,因为 channel 没有缓冲,并且接收数据的 goroutine 睡眠了 3 秒,因此发送数据的 channel 阻塞了 3 秒才继续执行。

2.8 缓冲 channel

将前一节的 ch := make(chan int) 修改为 ch := make(chan int, 1) ,执行后输出如下:

1
2
3
2022/09/09 08:59:50 Launch goroutine 
2022/09/09 08:59:50 Received
2022/09/09 08:59:53 42

可以看到,带有缓冲的 channel 因为有缓冲区,因此在缓冲区还未满的时候发送数据不会被阻塞。

2.9 Channel 用例

2.9.1 线程同步

无缓冲 channel 可用来实现两个 goroutine 之间的同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"time"
)

func main() {
syncCh := make(chan bool)
go func() {
process2()
syncCh <- true
}()
process1()
<- syncCh
}

func process2() {
time.Sleep(time.Second * 1)
}

func process1() {
time.Sleep(time.Second * 3)
}

接收操作 <-syncCh 一直阻塞,直到另一个 goroutine 完成。为了表示它已经完成,第二个 goroutine 将在通道中发送值 true

2.10 死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"log"
)

func main() {
ch := make(chan int, 1)
go receive(ch)
log.Println("waiting for reception...")
ch <- 45
ch <- 58
ch <- 100
}

func receive(c chan int) {
smth := <-c
log.Println("has received something", smth)
}

初始化一个缓冲大小为 1 的 channel,然后将45,58,100 三个数字传给 channel,receive 函数中会接收45,58可以存储在缓冲中,而 100 则因为没有多余的空间存储,所以会阻塞main goroutine 而无限地等待下去。

2.11 Select 语句

  • Select 语句类似于其它语言中的 switch case ,只不过是用于通信操作。
  • Select 语句中有多个 case 和可选的 default 实现。
  • 第一个非阻塞选项将会被选中。
  • 如果有两个或更多的未阻塞 case,则采用统一伪随机选择一个。
  • 如果所有 case 都被阻塞,则会执行 default
    select 执行逻辑

2.12 Wait groups

Wait group 是标准库提供的一种同步工具,它可以用在等待一组 goroutine 完成它们的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
var waitGroup sync.WaitGroup
// wait counter +10
waitGroup.Add(10)
for i := 0; i < 10; i++ {
go concurrentTask(i, &waitGroup)
}
// 阻塞当前 goroutine,直到wait counter为0
waitGroup.Wait()
}

func concurrentTask(taskId int, waitGroup *sync.WaitGroup) {
time.Sleep(time.Millisecond * 100)
// 将wait counter -1
waitGroup.Done()
}

2.13 Mutexes

Mutex 是一种同步工具,Mutex 是 Mutual Exclusion 的缩写。当两件事情不能同时发生的时候我们称之为互斥。可以使用互斥来避免数据竞争。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var number int

func main() {
var group sync.WaitGroup
group.Add(10)
for i := 0; i < 10; i++ {
go increment(&group)
}
group.Wait()
}

func increment(group *sync.WaitGroup) {
for j := 0; j < 100; j++ {
number++
log.Println(number)
}
group.Done()
}

以上代码开了10个 goroutine,每个goroutine对变量number执行100次+1。但是执行结果很可能并不是我们所期待的1000。这是因为产生了data race,有 goroutine 同时修改了 number,导致最终数据不正确。

只要将代码稍加修改,也就是加了一把互斥锁就能正确执行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
var number int
var mutext sync.Mutex

func main() {
var group sync.WaitGroup
group.Add(10)
for i := 0; i < 10; i++ {
go increment(&group)
}
group.Wait()
}

func increment(group *sync.WaitGroup) {
for j := 0; j < 100; j++ {
mutex.Lock()
number++
mutex.Unlock()mutex
log.Println(number)
}
group.Done()
}