CSP CSP指顺序通信进程(communicating sequential processes)或被简称为CSP。
CSP是 一种现代的并发编程模型,在这种编程模型中值会在不同的运行实例(goroutine)中传递,尽管大多数情况下仍然是被限制在单一实例中。而传统的并发模型:多线程共享内存。
MPG
M: 操作系统上的主线程,物理线程
P: 协程执行需要的上下文
G:协程
go1.8后默认让程序运行在多核上
Goroutine goroutine不同于thread,threads是操作系统中的对于一个独立运行实例的描述,不 同操作系统,对于thread的实现也不尽相同;但是,操作系统并不知道goroutine的 存在,goroutine的调度是有Golang运行时进行管理的。启动thread虽然比process 所需的资源要少,但是多个thread之间的上下文切换仍然是需要大量的工作的(寄 存器/Program Count/Stack Pointer/…),Golang有自己的调度器,许多goroutine 的数据都是共享的,因此goroutine之间的切换会快很多,启动goroutine所耗费的资 源也很少,一个Golang程序同时存在几百个goroutine是很正常的。
channel,即“管道”,是用来传递数据(叫消息更为合适)的一个数据结构,即可以 从channel里面塞数据,也可以从中获取数据。channel本身并没有什么神奇的地 方,但是channel加上了goroutine,就形成了一种既简单又强大的请求处理模型, 即N个工作goroutine将处理的中间结果或者最终结果放入一个channel,另外有M个工作goroutine从这个channel拿数据,再进行进一步加工,通过组合这种过程,从而胜任各种复杂的业务模型。
当一个程序启动时,其主函数即在一个单独的goroutine中运行,我们叫它main goroutine。新的goroutine会用go语句来创建。在语法上,go语句是一个普通的函数或方法调用前加上关键字go。go语句会使其语句中的函数在一个新创建的 goroutine中运行。而go语句本身会迅速地完成。
1 2 f() // 调用f()函数,并等待f()返回 go f() // 创建一个新的goroutine去执行f(), 不需要等待
Goroutine之间通信 有两种方法
全局变量的互斥锁
使用管道channel
互斥锁
如下示例计算1-100各个数的阶乘,然后放到map中,最后打印出来,map被多个协程并发写,出现资源争夺,需要加入互斥锁解决
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package main import ( "time" "fmt" "sync" ) var ( myMap = make(map[int]int) // 零值为解锁状态 // Mutex类型的锁和线程无关,可以由不同的线程加锁和解锁。 lock sync.Mutex ) func f1(n int) { var r int = 1 for i := 1; i <= n; i++ { r = r * i } lock.Lock() myMap[n] = r // concurrent map writes lock.Unlock() } func main() { for i := 1; i <= 10; i++ { go f1(i) } time.Sleep(10 * time.Second) lock.Lock() for k, v := range myMap { fmt.Printf("%v的阶乘是: %v\n",k, v) } lock.Unlock() }
管道channel
channel是引用类型,必须用make初始化后才能使用,初始化时指定容量大小
没有使用协程时,如果channel中数据全部被取出,再取会报死锁
可以用close关闭channel,关闭后能再放数据,但还能取
支持for-range遍历,在遍历时,如果channel没有关闭,会报死锁,关闭后才可正常遍历
取数据时,可以接受一个ok返回值,判断是否获取成功
编译器如果发现一个管道只有写,没有读,会阻塞出现死锁
串联的channel Channels也可以用于将多个goroutine连接在一起,一个Channel的输出作为下一个 Channel的输入。这种串联的Channels就是所谓的管道(pipeline)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package main import ( "fmt" "time" ) func main() { naturals := make(chan int) squares := make(chan int) go func() { for x := 0; x < 5; x++ { naturals <- x time.Sleep(1 * time.Second) } close(naturals) }() go func() { // for { // x, ok := <-naturals // if !ok { // break // } // squares <- x * x // } for x := range naturals { squares <- x * x } close(squares) }() // for { // y, ok := <-squares // if !ok { // break // } // fmt.Println(y) // } for x := range squares { fmt.Println(x) } fmt.Println("done") }
单方向的channel 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package main import ( "fmt" "time" ) func Counter(out chan<- int) { for i := 0; i < 5; i++ { out <- i time.Sleep(1 * time.Second) } //只能关闭输出channel,语法限定 close(out) } func Squarer(out chan<- int, in <-chan int) { for x := range in { out <- x * x } //只能关闭输出类型channel,语法限定 //因为只有调用输出channel的groutine才能决定数据是否传送完毕 close(out) } func Printer(in <-chan int) { for val := range in { fmt.Println(val) } } func main() { naturals := make(chan int) squares := make(chan int) go Counter(naturals) go Squarer(squares, naturals) // Printer 执行printer不能用go // 因为那样main groutine会直接退 出,导致程序退出 Printer(squares) fmt.Println("done") }
带缓存的channel 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package main import ( "fmt" ) func mirroredQuery() string { response := make(chan string, 3) go func() { response <- request("url2") }() go func() { response <- request("url3") }() go func() { response <- request("url4") }() return <-response } func request(url string) string { return url + ": result" } func main() { fmt.Println(mirroredQuery()) }
goroutines泄露:如果我们使用了无缓存的channel ,那么两个慢的goroutines 将会因为没有人接收而被永远卡住。这种情况,称为goroutines 泄漏,这将是一个BUG。和垃圾变量不 同,泄漏的goroutines 并不会被自动回收,因此确保每个不再需要的goroutine能正常退出是重要的。
基于select的多路复用 10秒倒计时发射,期间可按键停止。
接收到输入时,abort通道被发送元素,然后就绪被select选择到,打印终止,然后return
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package main import ( "fmt" "os" "time" ) func launch() { fmt.Println("发射") } func main() { abort := make(chan struct{}) go func() { os.Stdin.Read(make([]byte, 1)) abort <- struct{}{} }() fmt.Println("准备发射。。。") //time.Tick会返回一个channel,系统会定时向这个channel发信号 tick := time.Tick(1 * time.Second) for countdown := 10; countdown > 0; countdown-- { fmt.Println(countdown) select { case <-tick: //从tick channel中取数据,每阻塞1s返回一次 case <-abort: fmt.Println("发射终止。。。") return } } launch() }
入门示例 下面的例子,main goroutine将计算菲波那契数列的第45个元素值。由于计算函数 使用低效的递归,所以会运行相当长时间,在此期间我们想让用户看到一个可见的 标识来表明程序依然在正常运行,所以来做一个动画的小图标:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package main import ( "fmt" "time" ) func main() { go spinner(100 * time.Millisecond) const n = 45 fibN := fib(n) fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN) } func spinner(delay time.Duration) { for { for _, r := range `-\|/` { fmt.Printf("\r%c", r) time.Sleep(delay) } } } func fib(x int) int { if x < 2 { return x } return fib(x-1) + fib(x-2) }
动画显示了几秒之后,fib(45)的调用成功地返回,并且打印结果:
1 Fibonacci(45) = 1134903170
然后主函数返回。主函数返回时,所有的goroutine都会被直接打断,程序退出。除了从主函数退出或者直接终止程序之外,没有其它的编程方法能够让一个 goroutine来打断另一个的执行,但是之后可以看到一种方式来实现这个目的,通过goroutine 之间的通信来让一个goroutine 请求其它的goroutine ,并让被请求的 goroutine自行结束执行。
留意一下这里的两个独立的单元是如何进行组合的,spinning和菲波那契的计算。 分别在独立的函数中,但两个函数会同时执行。
案例:并发的clock时钟去服务 该例子是顺序执行的时钟服务器,它会每隔一秒钟将当前时间写到客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package main import ( "io" "log" "net" "time" ) func handleConn(c net.Conn) { defer c.Close() for { // 格式化字符串是1月2日下午3点4分5秒零六年UTC-0700 _, err := io.WriteString(c, time.Now().Format("15:04:05\n")) if err != nil { log.Print(err) return } time.Sleep(time.Second * 1) } } func main() { listener, err := net.Listen("tcp", "localhost:9000") if err != nil { log.Fatal(err) } for { conn, err := listener.Accept() if err != nil { log.Fatal(err) continue } go handleConn(conn) } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package main import ( "io" "log" "net" "os" ) func dealRecvData(dst io.Writer, src io.Reader) { //将读缓冲 拷贝到写缓冲 if _, err := io.Copy(dst, src); err != nil { log.Fatal(err) } } func main() { conn, err := net.Dial("tcp", "127.0.0.1:9000") if err != nil { log.Fatal(err) } defer conn.Close() //os.Stdout标准输出,作为写io设备, conn客户端套接字作为读io设备 dealRecvData(os.Stdout, conn) }
1 2 3 4 5 // 启动服务器 go run server.go // 启动客户端,会得到时间信息 go run dial.go
案例:并发的echo服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 package main import ( "bufio" "fmt" "log" "net" "strings" "time" ) func main() { listener, err := net.Listen("tcp", "localhost:7000") if err != nil { log.Fatal(err) return } for { conn, err := listener.Accept() if err != nil { log.Print(err) return } handleConn(conn) } } func handleConn(c net.Conn) { //input是一个Scanner类型,该类型可以通过Scan方法 //依次迭代从io设备中读数据,直到遇到eof为止 input := bufio.NewScanner(c) //Scan方法 如果缓冲有数据会返回true,否则返回false for input.Scan() { //如果有数据 input.Text()可以取出 go echo(c, input.Text(), 1*time.Second) } c.Close() } func echo(c net.Conn, outstr string, delay time.Duration) { fmt.Fprintln(c, strings.ToUpper(outstr)) time.Sleep(delay) fmt.Fprintln(c, outstr) time.Sleep(delay) fmt.Fprintln(c, strings.ToLower(outstr)) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package main import ( "io" "log" "net" "os" ) func main() { conn, err := net.Dial("tcp", "127.0.0.1:7000") if err != nil { log.Fatal(err) } defer conn.Close() go dealRecvData(os.Stdout, conn) dealRecvData(conn, os.Stdin) } func dealRecvData(dst io.Writer, src io.Reader) { if _, err := io.Copy(dst, src); err != nil { log.Fatal(err) } }
1 2 3 4 5 // 启动服务器 go run echo_server.go // 启动客户端,输入单词后会依次收到三个响应 go run echo_client.go
案例:带超时的echo服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 package main import ( "bufio" "fmt" "log" "net" "strings" "time" ) func main() { listener, err := net.Listen("tcp", "localhost:7001") if err != nil { log.Fatal(err) return } for { conn, err := listener.Accept() if err != nil { log.Print(err) return } handleConn(conn) } } func handleConn(c net.Conn) { timeout := time.NewTicker(time.Second * 10) recvData := make(chan struct{}) go func() { //input是一个Scanner类型,该类型可以通过Scan方法 //依次迭代从io设备中读数据,直到遇到eof为止 input := bufio.NewScanner(c) //Scan方法 如果缓冲有数据会返回true,否则返回false for input.Scan() { recvData <- struct{}{} //如果有数据 input.Text()可以取出 go echo(c, input.Text(), 1*time.Second) } }() for { select { case <-recvData: //有数据发送过来 fmt.Println("get Data reset timeout to 5s") timeout.Stop() timeout = time.NewTicker(time.Second * 5) case <-timeout.C: //客户端超时,需要断开连接 fmt.Println("time out channel...") c.Close() timeout.Stop() return } } } func echo(c net.Conn, outstr string, delay time.Duration) { fmt.Fprintln(c, strings.ToUpper(outstr)) time.Sleep(delay) fmt.Fprintln(c, outstr) time.Sleep(delay) fmt.Fprintln(c, strings.ToLower(outstr)) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package main import ( "io" "log" "net" "os" ) func main() { conn, err := net.Dial("tcp", "127.0.0.1:7001") if err != nil { log.Fatal(err) } defer conn.Close() go dealRecvData(os.Stdout, conn) dealRecvData(conn, os.Stdin) } func dealRecvData(dst io.Writer, src io.Reader) { if _, err := io.Copy(dst, src); err != nil { log.Fatal(err) } }
案例:并发的网络聊天室服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 package main import ( "bufio" "fmt" "log" "net" ) //声明一个数据类型client 是一个只能写字符串 的channel type client chan<- string var ( //entering 表示登陆 channel, //如果有新的client登陆, //应该向该channel发送数据,告知是哪个client登陆 //用法比较特殊,entering是一个channel,里面存的元素也是一个client channel entering = make(chan client) //leaving 表示离开 channel, //如果有某个cilent退出链接, //应该向该channel发送数据,告知是哪个client离开 leaving = make(chan client) //message 表示正常聊天消息,广播消息事件, //client发送的正常聊天信息都应该向该channel发送数据 messages = make(chan string) ) func broadcaseter() { //用来记录所有已经链接的client客户端 var clients = make(map[client]bool) for { select { case msg := <-messages: //客户端正常消息事件 //取出全部在线的客户端client,将该消息发送给每个客户端 for cli := range clients { cli <- msg } case cli := <-entering: //client登陆事件 //给clients map增添一对 key-value clients[cli] = true case cli := <-leaving: //client离开事件 //给clients map删除一堆 key-value delete(clients, cli) //将该登出的client channel关闭 close(cli) } } } func handleConn(conn net.Conn) { //ch就是当前客户端的client //给当前客户端创建一个channel 这个channel和conn套接字保持同步 //只要向ch写数据, clientWrite 这个goroutine就会向对应的conn套接字写数据给客户端 ch := make(chan string) go clientWrite(conn, ch) who := conn.RemoteAddr().String() //给当前客户端回应 client的id ch <- "You are " + who //向全部已经在线的client广播 当前新的client已经登录 messages <- who + "已经登录" //发送当前ch登录事件 broadcaster会处理这个事件,将ch放在clients中 entering <- ch //阻塞等待客户端输入消息 input := bufio.NewScanner(conn) for input.Scan() { //将客户端输入消息进行广播 发给其他client messages <- who + ": " + input.Text() } //如果input.Scan退出,表示客户端断开连接 此处可以添加超时机制 //发送当前ch退出事件 leaving <- ch //想其他client广播 当前ch已经退出 messages <- who + "已经退出" //关闭当前客户端套接字conn conn.Close() } func clientWrite(conn net.Conn, ch <-chan string) { for msg := range ch { fmt.Fprintln(conn, msg) } } func main() { listener, err := net.Listen("tcp", "localhost:7000") if err != nil { log.Fatal(err) return } //开一个goroutine去处理3个channel事件 go broadcaseter() for { //等待新的客户端链接请求过来 conn, err := listener.Accept() if err != nil { log.Print(err) continue } //开一个goroutine去处理client请求 go handleConn(conn) } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package main import ( "io" "log" "net" "os" ) func main() { conn, err := net.Dial("tcp", "127.0.0.1:7000") if err != nil { log.Fatal(err) } defer conn.Close() go dealRecvData(os.Stdout, conn) dealRecvData(conn, os.Stdin) } func dealRecvData(dst io.Writer, src io.Reader) { if _, err := io.Copy(dst, src); err != nil { log.Fatal(err) } }
参考资料