并发与同步

Catalogue
  1. 1. CSP
  2. 2. MPG
  3. 3. Goroutine
  4. 4. Goroutine之间通信
    1. 4.0.1. 互斥锁
    2. 4.0.2. 管道channel
    3. 4.0.3. 串联的channel
    4. 4.0.4. 单方向的channel
    5. 4.0.5. 带缓存的channel
    6. 4.0.6. 基于select的多路复用
  • 5.
  • 6. 入门示例
  • 7. 案例:并发的clock时钟去服务
  • 8. 案例:并发的echo服务
  • 9. 案例:带超时的echo服务
  • 10. 案例:并发的网络聊天室服务
  • 11. 参考资料
  • CSP

    CSP指顺序通信进程(communicating sequential processes)或被简称为CSP。

    CSP是 一种现代的并发编程模型,在这种编程模型中值会在不同的运行实例(goroutine)中传递,尽管大多数情况下仍然是被限制在单一实例中。而传统的并发模型:多线程共享内存。

    MPG

    • M: 操作系统上的主线程,物理线程
    • P: 协程执行需要的上下文
    • G:协程

    go1.8后默认让程序运行在多核上

    Goroutine

    goroutine不同于thread,threads是操作系统中的对于一个独立运行实例的描述,不 同操作系统,对于thread的实现也不尽相同;但是,操作系统并不知道goroutine的 存在,goroutine的调度是有Golang运行时进行管理的。启动thread虽然比process 所需的资源要少,但是多个thread之间的上下文切换仍然是需要大量的工作的(寄 存器/Program Count/Stack Pointer/…),Golang有自己的调度器,许多goroutine 的数据都是共享的,因此goroutine之间的切换会快很多,启动goroutine所耗费的资 源也很少,一个Golang程序同时存在几百个goroutine是很正常的。

    channel,即“管道”,是用来传递数据(叫消息更为合适)的一个数据结构,即可以 从channel里面塞数据,也可以从中获取数据。channel本身并没有什么神奇的地 方,但是channel加上了goroutine,就形成了一种既简单又强大的请求处理模型, 即N个工作goroutine将处理的中间结果或者最终结果放入一个channel,另外有M个工作goroutine从这个channel拿数据,再进行进一步加工,通过组合这种过程,从而胜任各种复杂的业务模型。

    当一个程序启动时,其主函数即在一个单独的goroutine中运行,我们叫它main goroutine。新的goroutine会用go语句来创建。在语法上,go语句是一个普通的函数或方法调用前加上关键字go。go语句会使其语句中的函数在一个新创建的 goroutine中运行。而go语句本身会迅速地完成。

    1
    2
    f() // 调用f()函数,并等待f()返回 
    go f() // 创建一个新的goroutine去执行f(), 不需要等待

    Goroutine之间通信

    有两种方法

    1. 全局变量的互斥锁
    2. 使用管道channel

    互斥锁

    • 如下示例计算1-100各个数的阶乘,然后放到map中,最后打印出来,map被多个协程并发写,出现资源争夺,需要加入互斥锁解决
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    package main

    import (
    "time"
    "fmt"
    "sync"
    )

    var (
    myMap = make(map[int]int)

    // 零值为解锁状态
    // Mutex类型的锁和线程无关,可以由不同的线程加锁和解锁。
    lock sync.Mutex
    )

    func f1(n int) {

    var r int = 1
    for i := 1; i <= n; i++ {
    r = r * i
    }

    lock.Lock()
    myMap[n] = r // concurrent map writes
    lock.Unlock()
    }

    func main() {

    for i := 1; i <= 10; i++ {
    go f1(i)
    }

    time.Sleep(10 * time.Second)

    lock.Lock()
    for k, v := range myMap {
    fmt.Printf("%v的阶乘是: %v\n",k, v)
    }
    lock.Unlock()

    }

    管道channel

    • channel是引用类型,必须用make初始化后才能使用,初始化时指定容量大小

    • 没有使用协程时,如果channel中数据全部被取出,再取会报死锁

    • 可以用close关闭channel,关闭后能再放数据,但还能取

    • 支持for-range遍历,在遍历时,如果channel没有关闭,会报死锁,关闭后才可正常遍历

    • 取数据时,可以接受一个ok返回值,判断是否获取成功

    • 编译器如果发现一个管道只有写,没有读,会阻塞出现死锁

    串联的channel

    Channels也可以用于将多个goroutine连接在一起,一个Channel的输出作为下一个 Channel的输入。这种串联的Channels就是所谓的管道(pipeline)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package main

    import (
    "fmt"
    "time"
    )

    func main() {
    naturals := make(chan int)
    squares := make(chan int)

    go func() {
    for x := 0; x < 5; x++ {
    naturals <- x
    time.Sleep(1 * time.Second)
    }
    close(naturals)
    }()

    go func() {
    // for {
    // x, ok := <-naturals
    // if !ok {
    // break
    // }
    // squares <- x * x
    // }

    for x := range naturals {
    squares <- x * x
    }
    close(squares)
    }()

    // for {
    // y, ok := <-squares
    // if !ok {
    // break
    // }
    // fmt.Println(y)
    // }
    for x := range squares {
    fmt.Println(x)
    }

    fmt.Println("done")
    }

    单方向的channel

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    package main

    import (
    "fmt"
    "time"
    )

    func Counter(out chan<- int) {
    for i := 0; i < 5; i++ {
    out <- i
    time.Sleep(1 * time.Second)
    }
    //只能关闭输出channel,语法限定
    close(out)
    }

    func Squarer(out chan<- int, in <-chan int) {
    for x := range in {
    out <- x * x
    }
    //只能关闭输出类型channel,语法限定
    //因为只有调用输出channel的groutine才能决定数据是否传送完毕
    close(out)
    }

    func Printer(in <-chan int) {
    for val := range in {
    fmt.Println(val)
    }
    }

    func main() {

    naturals := make(chan int)
    squares := make(chan int)

    go Counter(naturals)
    go Squarer(squares, naturals)

    // Printer 执行printer不能用go
    // 因为那样main groutine会直接退 出,导致程序退出
    Printer(squares)
    fmt.Println("done")
    }

    带缓存的channel

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    package main

    import (
    "fmt"
    )

    func mirroredQuery() string {
    response := make(chan string, 3)
    go func() {
    response <- request("url2")
    }()
    go func() {
    response <- request("url3")
    }()
    go func() {
    response <- request("url4")
    }()
    return <-response
    }

    func request(url string) string {
    return url + ": result"
    }

    func main() {
    fmt.Println(mirroredQuery())

    }

    goroutines泄露:如果我们使用了无缓存的channel,那么两个慢的goroutines将会因为没有人接收而被永远卡住。这种情况,称为goroutines泄漏,这将是一个BUG。和垃圾变量不 同,泄漏的goroutines并不会被自动回收,因此确保每个不再需要的goroutine能正常退出是重要的。

    基于select的多路复用

    10秒倒计时发射,期间可按键停止。

    接收到输入时,abort通道被发送元素,然后就绪被select选择到,打印终止,然后return

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    package main

    import (
    "fmt"
    "os"
    "time"
    )

    func launch() {
    fmt.Println("发射")
    }
    func main() {

    abort := make(chan struct{})

    go func() {
    os.Stdin.Read(make([]byte, 1))
    abort <- struct{}{}
    }()

    fmt.Println("准备发射。。。")

    //time.Tick会返回一个channel,系统会定时向这个channel发信号
    tick := time.Tick(1 * time.Second)
    for countdown := 10; countdown > 0; countdown-- {
    fmt.Println(countdown)

    select {
    case <-tick: //从tick channel中取数据,每阻塞1s返回一次
    case <-abort:
    fmt.Println("发射终止。。。")
    return
    }
    }

    launch()
    }

    入门示例

    下面的例子,main goroutine将计算菲波那契数列的第45个元素值。由于计算函数 使用低效的递归,所以会运行相当长时间,在此期间我们想让用户看到一个可见的 标识来表明程序依然在正常运行,所以来做一个动画的小图标:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    package main

    import (
    "fmt"
    "time"
    )

    func main() {
    go spinner(100 * time.Millisecond)
    const n = 45
    fibN := fib(n)
    fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
    }

    func spinner(delay time.Duration) {
    for {
    for _, r := range `-\|/` {
    fmt.Printf("\r%c", r)
    time.Sleep(delay)
    }
    }
    }

    func fib(x int) int {
    if x < 2 {
    return x
    }
    return fib(x-1) + fib(x-2)
    }

    动画显示了几秒之后,fib(45)的调用成功地返回,并且打印结果:

    1
    Fibonacci(45) = 1134903170 

    然后主函数返回。主函数返回时,所有的goroutine都会被直接打断,程序退出。除了从主函数退出或者直接终止程序之外,没有其它的编程方法能够让一个 goroutine来打断另一个的执行,但是之后可以看到一种方式来实现这个目的,通过goroutine之间的通信来让一个goroutine请求其它的goroutine,并让被请求的 goroutine自行结束执行。

    留意一下这里的两个独立的单元是如何进行组合的,spinning和菲波那契的计算。 分别在独立的函数中,但两个函数会同时执行。

    案例:并发的clock时钟去服务

    该例子是顺序执行的时钟服务器,它会每隔一秒钟将当前时间写到客户端:

    • server.go
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    package main

    import (
    "io"
    "log"
    "net"
    "time"
    )

    func handleConn(c net.Conn) {
    defer c.Close()

    for {
    // 格式化字符串是1月2日下午3点4分5秒零六年UTC-0700
    _, err := io.WriteString(c, time.Now().Format("15:04:05\n"))

    if err != nil {
    log.Print(err)
    return
    }
    time.Sleep(time.Second * 1)
    }
    }

    func main() {
    listener, err := net.Listen("tcp", "localhost:9000")
    if err != nil {
    log.Fatal(err)
    }
    for {
    conn, err := listener.Accept()
    if err != nil {
    log.Fatal(err)
    continue
    }

    go handleConn(conn)
    }
    }

    • dial.go
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    package main

    import (
    "io"
    "log"
    "net"
    "os"
    )

    func dealRecvData(dst io.Writer, src io.Reader) {

    //将读缓冲 拷贝到写缓冲
    if _, err := io.Copy(dst, src); err != nil {
    log.Fatal(err)
    }

    }

    func main() {
    conn, err := net.Dial("tcp", "127.0.0.1:9000")
    if err != nil {
    log.Fatal(err)
    }
    defer conn.Close()

    //os.Stdout标准输出,作为写io设备, conn客户端套接字作为读io设备
    dealRecvData(os.Stdout, conn)
    }
    • 测试
    1
    2
    3
    4
    5
    // 启动服务器
    go run server.go

    // 启动客户端,会得到时间信息
    go run dial.go

    案例:并发的echo服务

    • echo_server.go
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    package main

    import (
    "bufio"
    "fmt"
    "log"
    "net"
    "strings"
    "time"
    )

    func main() {
    listener, err := net.Listen("tcp", "localhost:7000")
    if err != nil {
    log.Fatal(err)
    return
    }

    for {
    conn, err := listener.Accept()
    if err != nil {
    log.Print(err)
    return
    }

    handleConn(conn)
    }
    }

    func handleConn(c net.Conn) {
    //input是一个Scanner类型,该类型可以通过Scan方法
    //依次迭代从io设备中读数据,直到遇到eof为止
    input := bufio.NewScanner(c)

    //Scan方法 如果缓冲有数据会返回true,否则返回false
    for input.Scan() {

    //如果有数据 input.Text()可以取出
    go echo(c, input.Text(), 1*time.Second)
    }
    c.Close()
    }

    func echo(c net.Conn, outstr string, delay time.Duration) {
    fmt.Fprintln(c, strings.ToUpper(outstr))
    time.Sleep(delay)

    fmt.Fprintln(c, outstr)
    time.Sleep(delay)

    fmt.Fprintln(c, strings.ToLower(outstr))
    }
    • echo_client.go
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    package main

    import (
    "io"
    "log"
    "net"
    "os"
    )

    func main() {
    conn, err := net.Dial("tcp", "127.0.0.1:7000")
    if err != nil {
    log.Fatal(err)
    }
    defer conn.Close()

    go dealRecvData(os.Stdout, conn)

    dealRecvData(conn, os.Stdin)
    }

    func dealRecvData(dst io.Writer, src io.Reader) {
    if _, err := io.Copy(dst, src); err != nil {
    log.Fatal(err)
    }
    }
    • 测试
    1
    2
    3
    4
    5
    // 启动服务器
    go run echo_server.go

    // 启动客户端,输入单词后会依次收到三个响应
    go run echo_client.go

    案例:带超时的echo服务

    • server.go
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    package main

    import (
    "bufio"
    "fmt"
    "log"
    "net"
    "strings"
    "time"
    )

    func main() {
    listener, err := net.Listen("tcp", "localhost:7001")
    if err != nil {
    log.Fatal(err)
    return
    }

    for {
    conn, err := listener.Accept()
    if err != nil {
    log.Print(err)
    return
    }

    handleConn(conn)
    }
    }

    func handleConn(c net.Conn) {
    timeout := time.NewTicker(time.Second * 10)
    recvData := make(chan struct{})

    go func() {
    //input是一个Scanner类型,该类型可以通过Scan方法
    //依次迭代从io设备中读数据,直到遇到eof为止
    input := bufio.NewScanner(c)

    //Scan方法 如果缓冲有数据会返回true,否则返回false
    for input.Scan() {

    recvData <- struct{}{}

    //如果有数据 input.Text()可以取出
    go echo(c, input.Text(), 1*time.Second)
    }
    }()

    for {
    select {
    case <-recvData: //有数据发送过来
    fmt.Println("get Data reset timeout to 5s")
    timeout.Stop()
    timeout = time.NewTicker(time.Second * 5)
    case <-timeout.C: //客户端超时,需要断开连接
    fmt.Println("time out channel...")
    c.Close()
    timeout.Stop()
    return
    }

    }

    }

    func echo(c net.Conn, outstr string, delay time.Duration) {
    fmt.Fprintln(c, strings.ToUpper(outstr))
    time.Sleep(delay)

    fmt.Fprintln(c, outstr)
    time.Sleep(delay)

    fmt.Fprintln(c, strings.ToLower(outstr))
    }
    • client.go
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    package main

    import (
    "io"
    "log"
    "net"
    "os"
    )

    func main() {
    conn, err := net.Dial("tcp", "127.0.0.1:7001")
    if err != nil {
    log.Fatal(err)
    }
    defer conn.Close()

    go dealRecvData(os.Stdout, conn)

    dealRecvData(conn, os.Stdin)
    }

    func dealRecvData(dst io.Writer, src io.Reader) {
    if _, err := io.Copy(dst, src); err != nil {
    log.Fatal(err)
    }
    }

    案例:并发的网络聊天室服务

    • server.go
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    package main

    import (
    "bufio"
    "fmt"
    "log"
    "net"
    )

    //声明一个数据类型client 是一个只能写字符串 的channel
    type client chan<- string

    var (
    //entering 表示登陆 channel,
    //如果有新的client登陆,
    //应该向该channel发送数据,告知是哪个client登陆
    //用法比较特殊,entering是一个channel,里面存的元素也是一个client channel
    entering = make(chan client)

    //leaving 表示离开 channel,
    //如果有某个cilent退出链接,
    //应该向该channel发送数据,告知是哪个client离开
    leaving = make(chan client)

    //message 表示正常聊天消息,广播消息事件,
    //client发送的正常聊天信息都应该向该channel发送数据
    messages = make(chan string)
    )

    func broadcaseter() {
    //用来记录所有已经链接的client客户端
    var clients = make(map[client]bool)

    for {
    select {
    case msg := <-messages:
    //客户端正常消息事件
    //取出全部在线的客户端client,将该消息发送给每个客户端
    for cli := range clients {
    cli <- msg
    }

    case cli := <-entering:
    //client登陆事件
    //给clients map增添一对 key-value
    clients[cli] = true

    case cli := <-leaving:
    //client离开事件
    //给clients map删除一堆 key-value
    delete(clients, cli)
    //将该登出的client channel关闭
    close(cli)
    }
    }

    }

    func handleConn(conn net.Conn) {

    //ch就是当前客户端的client
    //给当前客户端创建一个channel 这个channel和conn套接字保持同步
    //只要向ch写数据, clientWrite 这个goroutine就会向对应的conn套接字写数据给客户端
    ch := make(chan string)
    go clientWrite(conn, ch)

    who := conn.RemoteAddr().String()

    //给当前客户端回应 client的id
    ch <- "You are " + who

    //向全部已经在线的client广播 当前新的client已经登录
    messages <- who + "已经登录"

    //发送当前ch登录事件 broadcaster会处理这个事件,将ch放在clients中
    entering <- ch

    //阻塞等待客户端输入消息
    input := bufio.NewScanner(conn)
    for input.Scan() {

    //将客户端输入消息进行广播 发给其他client
    messages <- who + ": " + input.Text()
    }

    //如果input.Scan退出,表示客户端断开连接 此处可以添加超时机制

    //发送当前ch退出事件
    leaving <- ch

    //想其他client广播 当前ch已经退出
    messages <- who + "已经退出"

    //关闭当前客户端套接字conn
    conn.Close()
    }

    func clientWrite(conn net.Conn, ch <-chan string) {
    for msg := range ch {
    fmt.Fprintln(conn, msg)
    }
    }

    func main() {
    listener, err := net.Listen("tcp", "localhost:7000")
    if err != nil {
    log.Fatal(err)
    return
    }

    //开一个goroutine去处理3个channel事件
    go broadcaseter()

    for {
    //等待新的客户端链接请求过来
    conn, err := listener.Accept()
    if err != nil {
    log.Print(err)
    continue
    }

    //开一个goroutine去处理client请求
    go handleConn(conn)
    }
    }
    • client.go
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    package main

    import (
    "io"
    "log"
    "net"
    "os"
    )

    func main() {
    conn, err := net.Dial("tcp", "127.0.0.1:7000")
    if err != nil {
    log.Fatal(err)
    }
    defer conn.Close()

    go dealRecvData(os.Stdout, conn)

    dealRecvData(conn, os.Stdin)
    }

    func dealRecvData(dst io.Writer, src io.Reader) {
    if _, err := io.Copy(dst, src); err != nil {
    log.Fatal(err)
    }
    }

    参考资料