并发与同步

CSP

CSP指顺序通信进程(communicating sequential processes)或被简称为CSP。

CSP是 一种现代的并发编程模型,在这种编程模型中值会在不同的运行实例(goroutine)中传递,尽管大多数情况下仍然是被限制在单一实例中。而传统的并发模型:多线程共享内存。

MPG

  • M: 操作系统上的主线程,物理线程
  • P: 协程执行需要的上下文
  • G:协程

go1.8后默认让程序运行在多核上

Goroutine

goroutine不同于thread,threads是操作系统中的对于一个独立运行实例的描述,不 同操作系统,对于thread的实现也不尽相同;但是,操作系统并不知道goroutine的 存在,goroutine的调度是有Golang运行时进行管理的。启动thread虽然比process 所需的资源要少,但是多个thread之间的上下文切换仍然是需要大量的工作的(寄 存器/Program Count/Stack Pointer/…),Golang有自己的调度器,许多goroutine 的数据都是共享的,因此goroutine之间的切换会快很多,启动goroutine所耗费的资 源也很少,一个Golang程序同时存在几百个goroutine是很正常的。

channel,即“管道”,是用来传递数据(叫消息更为合适)的一个数据结构,即可以 从channel里面塞数据,也可以从中获取数据。channel本身并没有什么神奇的地 方,但是channel加上了goroutine,就形成了一种既简单又强大的请求处理模型, 即N个工作goroutine将处理的中间结果或者最终结果放入一个channel,另外有M个工作goroutine从这个channel拿数据,再进行进一步加工,通过组合这种过程,从而胜任各种复杂的业务模型。

当一个程序启动时,其主函数即在一个单独的goroutine中运行,我们叫它main goroutine。新的goroutine会用go语句来创建。在语法上,go语句是一个普通的函数或方法调用前加上关键字go。go语句会使其语句中的函数在一个新创建的 goroutine中运行。而go语句本身会迅速地完成。

1
2
f() // 调用f()函数,并等待f()返回 
go f() // 创建一个新的goroutine去执行f(), 不需要等待

Goroutine之间通信

有两种方法

  1. 全局变量的互斥锁
  2. 使用管道channel

互斥锁

  • 如下示例计算1-100各个数的阶乘,然后放到map中,最后打印出来,map被多个协程并发写,出现资源争夺,需要加入互斥锁解决
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"time"
"fmt"
"sync"
)

var (
myMap = make(map[int]int)

// 零值为解锁状态
// Mutex类型的锁和线程无关,可以由不同的线程加锁和解锁。
lock sync.Mutex
)

func f1(n int) {

var r int = 1
for i := 1; i <= n; i++ {
r = r * i
}

lock.Lock()
myMap[n] = r // concurrent map writes
lock.Unlock()
}

func main() {

for i := 1; i <= 10; i++ {
go f1(i)
}

time.Sleep(10 * time.Second)

lock.Lock()
for k, v := range myMap {
fmt.Printf("%v的阶乘是: %v\n",k, v)
}
lock.Unlock()

}

管道channel

  • channel是引用类型,必须用make初始化后才能使用,初始化时指定容量大小

  • 没有使用协程时,如果channel中数据全部被取出,再取会报死锁

  • 可以用close关闭channel,关闭后能再放数据,但还能取

  • 支持for-range遍历,在遍历时,如果channel没有关闭,会报死锁,关闭后才可正常遍历

  • 取数据时,可以接受一个ok返回值,判断是否获取成功

  • 编译器如果发现一个管道只有写,没有读,会阻塞出现死锁

串联的channel

Channels也可以用于将多个goroutine连接在一起,一个Channel的输出作为下一个 Channel的输入。这种串联的Channels就是所谓的管道(pipeline)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"fmt"
"time"
)

func main() {
naturals := make(chan int)
squares := make(chan int)

go func() {
for x := 0; x < 5; x++ {
naturals <- x
time.Sleep(1 * time.Second)
}
close(naturals)
}()

go func() {
// for {
// x, ok := <-naturals
// if !ok {
// break
// }
// squares <- x * x
// }

for x := range naturals {
squares <- x * x
}
close(squares)
}()

// for {
// y, ok := <-squares
// if !ok {
// break
// }
// fmt.Println(y)
// }
for x := range squares {
fmt.Println(x)
}

fmt.Println("done")
}

单方向的channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"fmt"
"time"
)

func Counter(out chan<- int) {
for i := 0; i < 5; i++ {
out <- i
time.Sleep(1 * time.Second)
}
//只能关闭输出channel,语法限定
close(out)
}

func Squarer(out chan<- int, in <-chan int) {
for x := range in {
out <- x * x
}
//只能关闭输出类型channel,语法限定
//因为只有调用输出channel的groutine才能决定数据是否传送完毕
close(out)
}

func Printer(in <-chan int) {
for val := range in {
fmt.Println(val)
}
}

func main() {

naturals := make(chan int)
squares := make(chan int)

go Counter(naturals)
go Squarer(squares, naturals)

// Printer 执行printer不能用go
// 因为那样main groutine会直接退 出,导致程序退出
Printer(squares)
fmt.Println("done")
}

带缓存的channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"
)

func mirroredQuery() string {
response := make(chan string, 3)
go func() {
response <- request("url2")
}()
go func() {
response <- request("url3")
}()
go func() {
response <- request("url4")
}()
return <-response
}

func request(url string) string {
return url + ": result"
}

func main() {
fmt.Println(mirroredQuery())

}

goroutines泄露:如果我们使用了无缓存的channel,那么两个慢的goroutines将会因为没有人接收而被永远卡住。这种情况,称为goroutines泄漏,这将是一个BUG。和垃圾变量不 同,泄漏的goroutines并不会被自动回收,因此确保每个不再需要的goroutine能正常退出是重要的。

基于select的多路复用

10秒倒计时发射,期间可按键停止。

接收到输入时,abort通道被发送元素,然后就绪被select选择到,打印终止,然后return

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"fmt"
"os"
"time"
)

func launch() {
fmt.Println("发射")
}
func main() {

abort := make(chan struct{})

go func() {
os.Stdin.Read(make([]byte, 1))
abort <- struct{}{}
}()

fmt.Println("准备发射。。。")

//time.Tick会返回一个channel,系统会定时向这个channel发信号
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)

select {
case <-tick: //从tick channel中取数据,每阻塞1s返回一次
case <-abort:
fmt.Println("发射终止。。。")
return
}
}

launch()
}

入门示例

下面的例子,main goroutine将计算菲波那契数列的第45个元素值。由于计算函数 使用低效的递归,所以会运行相当长时间,在此期间我们想让用户看到一个可见的 标识来表明程序依然在正常运行,所以来做一个动画的小图标:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"time"
)

func main() {
go spinner(100 * time.Millisecond)
const n = 45
fibN := fib(n)
fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}

func spinner(delay time.Duration) {
for {
for _, r := range `-\|/` {
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}

func fib(x int) int {
if x < 2 {
return x
}
return fib(x-1) + fib(x-2)
}

动画显示了几秒之后,fib(45)的调用成功地返回,并且打印结果:

1
Fibonacci(45) = 1134903170 

然后主函数返回。主函数返回时,所有的goroutine都会被直接打断,程序退出。除了从主函数退出或者直接终止程序之外,没有其它的编程方法能够让一个 goroutine来打断另一个的执行,但是之后可以看到一种方式来实现这个目的,通过goroutine之间的通信来让一个goroutine请求其它的goroutine,并让被请求的 goroutine自行结束执行。

留意一下这里的两个独立的单元是如何进行组合的,spinning和菲波那契的计算。 分别在独立的函数中,但两个函数会同时执行。

案例:并发的clock时钟去服务

该例子是顺序执行的时钟服务器,它会每隔一秒钟将当前时间写到客户端:

  • server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"io"
"log"
"net"
"time"
)

func handleConn(c net.Conn) {
defer c.Close()

for {
// 格式化字符串是1月2日下午3点4分5秒零六年UTC-0700
_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))

if err != nil {
log.Print(err)
return
}
time.Sleep(time.Second * 1)
}
}

func main() {
listener, err := net.Listen("tcp", "localhost:9000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal(err)
continue
}

go handleConn(conn)
}
}

  • dial.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"io"
"log"
"net"
"os"
)

func dealRecvData(dst io.Writer, src io.Reader) {

//将读缓冲 拷贝到写缓冲
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}

}

func main() {
conn, err := net.Dial("tcp", "127.0.0.1:9000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()

//os.Stdout标准输出,作为写io设备, conn客户端套接字作为读io设备
dealRecvData(os.Stdout, conn)
}
  • 测试
1
2
3
4
5
// 启动服务器
go run server.go

// 启动客户端,会得到时间信息
go run dial.go

案例:并发的echo服务

  • echo_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main

import (
"bufio"
"fmt"
"log"
"net"
"strings"
"time"
)

func main() {
listener, err := net.Listen("tcp", "localhost:7000")
if err != nil {
log.Fatal(err)
return
}

for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
return
}

handleConn(conn)
}
}

func handleConn(c net.Conn) {
//input是一个Scanner类型,该类型可以通过Scan方法
//依次迭代从io设备中读数据,直到遇到eof为止
input := bufio.NewScanner(c)

//Scan方法 如果缓冲有数据会返回true,否则返回false
for input.Scan() {

//如果有数据 input.Text()可以取出
go echo(c, input.Text(), 1*time.Second)
}
c.Close()
}

func echo(c net.Conn, outstr string, delay time.Duration) {
fmt.Fprintln(c, strings.ToUpper(outstr))
time.Sleep(delay)

fmt.Fprintln(c, outstr)
time.Sleep(delay)

fmt.Fprintln(c, strings.ToLower(outstr))
}
  • echo_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"io"
"log"
"net"
"os"
)

func main() {
conn, err := net.Dial("tcp", "127.0.0.1:7000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()

go dealRecvData(os.Stdout, conn)

dealRecvData(conn, os.Stdin)
}

func dealRecvData(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
  • 测试
1
2
3
4
5
// 启动服务器
go run echo_server.go

// 启动客户端,输入单词后会依次收到三个响应
go run echo_client.go

案例:带超时的echo服务

  • server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package main

import (
"bufio"
"fmt"
"log"
"net"
"strings"
"time"
)

func main() {
listener, err := net.Listen("tcp", "localhost:7001")
if err != nil {
log.Fatal(err)
return
}

for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
return
}

handleConn(conn)
}
}

func handleConn(c net.Conn) {
timeout := time.NewTicker(time.Second * 10)
recvData := make(chan struct{})

go func() {
//input是一个Scanner类型,该类型可以通过Scan方法
//依次迭代从io设备中读数据,直到遇到eof为止
input := bufio.NewScanner(c)

//Scan方法 如果缓冲有数据会返回true,否则返回false
for input.Scan() {

recvData <- struct{}{}

//如果有数据 input.Text()可以取出
go echo(c, input.Text(), 1*time.Second)
}
}()

for {
select {
case <-recvData: //有数据发送过来
fmt.Println("get Data reset timeout to 5s")
timeout.Stop()
timeout = time.NewTicker(time.Second * 5)
case <-timeout.C: //客户端超时,需要断开连接
fmt.Println("time out channel...")
c.Close()
timeout.Stop()
return
}

}

}

func echo(c net.Conn, outstr string, delay time.Duration) {
fmt.Fprintln(c, strings.ToUpper(outstr))
time.Sleep(delay)

fmt.Fprintln(c, outstr)
time.Sleep(delay)

fmt.Fprintln(c, strings.ToLower(outstr))
}
  • client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"io"
"log"
"net"
"os"
)

func main() {
conn, err := net.Dial("tcp", "127.0.0.1:7001")
if err != nil {
log.Fatal(err)
}
defer conn.Close()

go dealRecvData(os.Stdout, conn)

dealRecvData(conn, os.Stdin)
}

func dealRecvData(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}

案例:并发的网络聊天室服务

  • server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main

import (
"bufio"
"fmt"
"log"
"net"
)

//声明一个数据类型client 是一个只能写字符串 的channel
type client chan<- string

var (
//entering 表示登陆 channel,
//如果有新的client登陆,
//应该向该channel发送数据,告知是哪个client登陆
//用法比较特殊,entering是一个channel,里面存的元素也是一个client channel
entering = make(chan client)

//leaving 表示离开 channel,
//如果有某个cilent退出链接,
//应该向该channel发送数据,告知是哪个client离开
leaving = make(chan client)

//message 表示正常聊天消息,广播消息事件,
//client发送的正常聊天信息都应该向该channel发送数据
messages = make(chan string)
)

func broadcaseter() {
//用来记录所有已经链接的client客户端
var clients = make(map[client]bool)

for {
select {
case msg := <-messages:
//客户端正常消息事件
//取出全部在线的客户端client,将该消息发送给每个客户端
for cli := range clients {
cli <- msg
}

case cli := <-entering:
//client登陆事件
//给clients map增添一对 key-value
clients[cli] = true

case cli := <-leaving:
//client离开事件
//给clients map删除一堆 key-value
delete(clients, cli)
//将该登出的client channel关闭
close(cli)
}
}

}

func handleConn(conn net.Conn) {

//ch就是当前客户端的client
//给当前客户端创建一个channel 这个channel和conn套接字保持同步
//只要向ch写数据, clientWrite 这个goroutine就会向对应的conn套接字写数据给客户端
ch := make(chan string)
go clientWrite(conn, ch)

who := conn.RemoteAddr().String()

//给当前客户端回应 client的id
ch <- "You are " + who

//向全部已经在线的client广播 当前新的client已经登录
messages <- who + "已经登录"

//发送当前ch登录事件 broadcaster会处理这个事件,将ch放在clients中
entering <- ch

//阻塞等待客户端输入消息
input := bufio.NewScanner(conn)
for input.Scan() {

//将客户端输入消息进行广播 发给其他client
messages <- who + ": " + input.Text()
}

//如果input.Scan退出,表示客户端断开连接 此处可以添加超时机制

//发送当前ch退出事件
leaving <- ch

//想其他client广播 当前ch已经退出
messages <- who + "已经退出"

//关闭当前客户端套接字conn
conn.Close()
}

func clientWrite(conn net.Conn, ch <-chan string) {
for msg := range ch {
fmt.Fprintln(conn, msg)
}
}

func main() {
listener, err := net.Listen("tcp", "localhost:7000")
if err != nil {
log.Fatal(err)
return
}

//开一个goroutine去处理3个channel事件
go broadcaseter()

for {
//等待新的客户端链接请求过来
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}

//开一个goroutine去处理client请求
go handleConn(conn)
}
}
  • client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"io"
"log"
"net"
"os"
)

func main() {
conn, err := net.Dial("tcp", "127.0.0.1:7000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()

go dealRecvData(os.Stdout, conn)

dealRecvData(conn, os.Stdin)
}

func dealRecvData(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}

参考资料