协程与并发

本文最后更新于:2021年9月20日 上午

主要整理了一下go语言并发的知识点

协程与并发

1. 什么是并发

并发是指在一个系统中拥有多个计算,这些计算有同时执行的特性,而且他们之间有着潜在的交互,因此系统可进行的运行路径会有多个,而且结果有不确定性

2. 为什么需要并发

  1. 可以充分利用多核CPU的计算能力
  2. 程序设计在某些情况下会更简单
  3. 程序响应会更快

3. 并发的缺点

  1. 使用并发会使得程序遇到很多匪夷所思的问题,需要设计人员更强的功底
  2. 频繁的上下文切换

Q: 为什么并发能够提升程序的运行速度,频繁的上下文切换时间不是反而增加了代码的运行时间嘛

A: 这个考虑在大部分情况下是正确的,频繁的使用并发不一定会得到性能的提升,但是在考虑到单个线程有阻塞时,这个时候程序必需要等待线程阻塞结束而不能运行其它代码,这时才是并发的用武之地。

4. Go并发模型

常见的并发编程有多种模型,主要有多线程、消息传递等,Go语言是消息传递并发模型的集大成者,它将基于CSP模型的并发编程内置到了语言中。通过一个go关键字就可以轻易启动一个Goroutine,不同的Goroutine之间使用channel通信

4.1 Goroutine

4.1.1 Goroutine和系统线程的区别

Goroutine是GO语言独有的并发体,是一种轻量级的线程,它由go关键字启动,goroutine使用了比系统级线程更小的栈,goroutine一般只会以一个2K或者4K大小的栈启动,当栈空间不足时自动扩容,但是系统级栈一般会有一个固定大小的栈(一般默认为2MB)。

同时Go的运行时还包括了自己的调度器,这个调度器只关注单独Go程序的Goroutine,Goroutine使用了半抢占的协作调度,只有当当前Goroutine发生阻塞时才会导致调度,这样做减少了上下文切换的时间。

4.1.2 go语句和Goroutine

一条go语句就相当于一个函数的并发执行,go语句由go关键字和表达式组成。当go语句执行时,其中go函数会被放到单独放在一个goroutine中,在这之后,该go函数的执行会独立于当前goroutine执行。

让我们来看一个简单的例子

1
2
3
4
5
6
7
8
// Hello,World并发版本
package main

import "fmt"

func main() {
go fmt.Println("Hello,World")
}

我们很希望能够打印出Hello,World字符,但是这行内容实际上并不会出现,因为系统在执行go语句时系统只会把go函数封装在一个Goroutine并放在Goroutine队列中,但是函数什么时候会运行还需要看调度器的具体调度情况(基本上永远不会执行),然后main函数就执行结束退出。

不完美的解决方案

1
2
3
4
5
6
7
8
9
// Hello,World并发不完美的解决方案版本
package main

import "fmt"

func main() {
go fmt.Println("Hello,World")
time.Sleep(time.Millisecond) // 等待1ms
}

time.Sleep()包会使得当前goroutine暂停一段时间,并且会使得调度器调度其它goroutine执行,加上这段代码之后,Hello,World终于能够正常打印,但是这种解决方案总是不完美的,当我们需要运行一段大于1ms的程序时,这种方法就会失效,因此我们还需要对这个方法进行改良

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Hello,World最终版
package main

import (
"fmt"
"sync"
)

func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Hello,World")
}()
wg.Wait()
}

在这里我们使用sync.WaitGroup去控制goroutine的执行顺序,wg.Add(1)添加一个计数,wg.Done()减少一个技术,wg.Wait()等待waitGroup计数为0时才能进行下一步操作。在此我们才能算是正确打印出了Hello,World

4.2 基于Channel的通信

Channel通信是在Goroutine之间进行同步的主要方法。它是go语言预定义的关键字之一,在同一个时刻,仅有一个goroutine能向一个通道发送值,同时也仅有一个goroutine能从它那里接受值,已被接收的值会立即在通道内被删除,在通道中,各个值都是严格按照发送顺序排列的,通道相当于一个FIFO的阻塞消息队列。

4.2.1 channel基本操作

4.2.1.1 channel声明

channel属于引用类型,一个channel类型的声明如下

1
2
3
var object chan T   // 双向通道
var object chan<- T // 只能用发送值的通道
var object <-chan T // 只能用于接收值的通道

为什么需要单向通道交由之后思考,这里不做说明

4.2.1.2 channel的初始化

正因为它是引用类型,所有channel在初始化之前它的值一定是nil,初始化通道方法如下

1
2
var object = make(chan int,20) // 声明并初始化了一个通道,其缓冲区大小为20
var object = make(chan int) // 声明并初始化了一个无缓冲区的通道

一个无缓冲的通道在通道内有一个值之后无法再往通道放入一个值,这个操作会被阻塞直到有goroutine读取这个通道的值

4.2.1.3 接受channel值

接受一个channel值方法如果

1
2
elem := <-intChan
elem,ok := <-intChan

这两行代码都可以从intChan中读取一个值,第二个写法和第一种写法的不同在于:如果在接收操作之前或者过程中该通道被关闭了,那么该操作会立即结束,并且变量elem会被赋予该元素通道类型的零值,由于通道本身就存在零值情况,这里使用了一个值代表两种含义(未读取到或者本身就是零),第二个参数就是为了防止这种状况,如果该通道是被关闭而结束的,该值为false,否则为true。

注意:不要试图从一个未被初始化的通道中读取值

4.2.1.4 发送channel值

发送语句如下

1
intChan <- 1
4.2.1.5 关闭channel

关闭代码如下

1
close(intChan)

不要试图往一个已关闭的通道里面写入值,会引发Panic

4.2.2 for语句和channel

for语句使用其range子句可以持续不断的从一个通道内接收元素值,直到通道被关闭。如果for语句读取的是单向通道中的发送通道会导致编译错误,而试图从一个未被初始化的通道中接受元素会导致当前goroutine永久阻塞,同样的,如果通道内没有任何元素值的话也会导致阻塞。

我们来看一段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Hello,World 并发高级一点的版本
package main

import (
"fmt"
"sync"
)

var strChan = make(chan string, 10)

var wg sync.WaitGroup

func main() {
wg.Add(2)
go send()
go receive()
wg.Wait()
}

func receive() {
defer wg.Done()
for elem := range strChan {
fmt.Printf(elem)
}
}

func send() {
defer wg.Done()
for _, elem := range []string{"H", "e", "l", "l", "o", ",", "W", "o", "r", "l", "d"} {
strChan <- elem
}
close(strChan)
}

4.2.3 select语句

select语句是一种仅能用于通道发送和接收操作的专用语句,一条select语句执行时会选择其中一个分支并执行,select语句书写如下

1
2
3
4
5
6
7
8
select {
case <- intChan:
// do something
case -> strChan:
// do something
default:
// do something
}

在执行一条select语句时,运行时系统会自上而下的判断每一个case中的发送或者接收操作是否可以立即执行(指的是当前goroutine不会因为这个操作而被阻塞),这个判断还需要依据通道的具体情况而定,只要发现有一个case上的判断时肯定的,该case就会被选择。

如果有多个case满足条件,那么运行时系统就会通过一个伪随机算法来选中一个case

如下代码在多次运行下会随机答应1或者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import "fmt"

func main() {
var intChan1 = make(chan int, 1)
var intChan2 = make(chan int, 1)
intChan1 <- 1
intChan2 <- 2
select {
case <-intChan1:
fmt.Println("1")
case <-intChan2:
fmt.Println("2")
}
}

5. 几种常见的并发写法

5.1 限制线程最大并发量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const MaxGoroutine = 100

func LimitWorkV1(maxWorNum int, f func()) {
var limit = make(chan struct{}, MaxGoroutine)

for i := 0; i < maxWorNum; i++ {
limit <- struct{}{} // 往队列中插入运行权利,如果队列已满,则阻塞,起到限制最大线程的作用
go func() {
defer func() {
<-limit // 归还运行权利
}()
// do something
f()
}()
}
}

5.2 生成者消费者模型

5.3 超时判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (cfg DBConfig) Gorm() *gorm.DB {

c := make(chan bool)
var db *gorm.DB
var err error

go func() {
db, err = gorm.Open("postgres", cfg.ToString())
c <- true
}()
select {
case <-c:
if err != nil {
log.Fatalf("不能链接数据库%v\n\n", err.Error())
}
db.DB().SetMaxOpenConns(10)
case <-time.After(time.Second * 30):
log.Fatalf(" 链接数据库超时\n\n")
}
return db
}

5.4 原子性

所谓的原子性时并发编程中的最小单元。通常如果有多个线程对同一个共享资源进行的操作是原子性的话,那么在一个时刻最多只能有一个线程对该资源进行操作。如果保证了对一个资源的所有操作都是原子操作的情况下,那么可以认为多个线程对这个资源的操作不会有别于单个线程操作。

让我们来看一个反例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 并发编程 原子性反例
package main

import (
"fmt"
"sync"
)

var data = 0

func main() {
var wg sync.WaitGroup
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
data++ // 这一行代码有问题
}()
}
wg.Wait()
fmt.Println(data)
}
// 运行结果
// 921 != 1000

这是一个经典的问题,因为从代码层度上看,我只运行了一条data++的语句,难道说这条data++会有问题嘛,会的,因为data++这条语句不保证原子性,在操作系统看来,这是三条语句get data;data add 1;set data,这当然不保证原子性,所以当有多个线程访问data这个变量的时候,可能获取的是同一个值,再给这个值加1,最后再设置回去,那么这时候数据就发生了错乱。

一般情况下,原子操作都是通过"互斥"访问来保证的,比如我们可以借助go官方实现的sync.Mutex来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"fmt"
"sync"
)

var data = 0

func main() {
var wg sync.WaitGroup
var mutex sync.Mutex
wg.Add(100000)
for i := 0; i < 100000; i++ {
go func() {
defer wg.Done()
// TODO
// 这里是保护一些操作的原子性,但是仔细思考一下,
// 我们的操作只是保护了一个数字类型的正确性,
// 那么有没有什么方式去修改使得这个操作更加简单通用呢
mutex.Lock() // 给这个操作加锁
defer mutex.Unlock() // 操作结束需要解锁
data++
}()
}
wg.Wait()
fmt.Println(data)
}

这个时候无论怎么运行都会得到正确的结果:100000。

思考:有没有更优的方案?使用一个重量级的互斥锁有没有必要?