go协程的通信

tech2022-12-19  102

go并发编程原理

Go 语言的协程实现被称之为 goroutine,由 Go 运行时管理,在 Go 语言中通过协程实现并发编程非常简单:我们可以在一个处理进程中通过关键字 go 启用多个协程,然后在不同的协程中完成不同的子任务,这些用户在代码中创建和维护的协程本质上是用户级线程,Go 语言运行时会在底层通过调度器将用户级线程交给操作系统的系统级线程去处理,如果在运行过程中遇到某个 IO 操作而暂停运行,调度器会将用户级线程和系统级线程分离,以便让系统级线程去处理其他用户级线程,而当 IO 操作完成,需要恢复运行,调度器又会调度空闲的系统级线程来处理这个用户级线程,从而达到并发处理多个协程的目的。此外,调度器还会在系统级线程不够用时向操作系统申请创建新的系统级线程,而在系统级线程过多的情况下销毁一些空闲的线程,这个过程和 PHP-FPM 的工作机制有点类似,实际上这也是很多进程/线程池管理器的工作机制,这样一来,可以保证对系统资源的高效利用,避免系统资源的浪费。

以上,就是 Go 语言并发编程的独特实现模型。

一个简单的示例:

package main import "fmt" func add(a, b int) int { var c = a + b fmt.Printf("%d + %d = %d", a, b, c) return c } func main() { go add(1, 2) }

main 函数运行在一个主协程中。从 go 关键字开始,从主协程中叉出一条新路。和之前不使用协程的方式相比,由此也引入了不确定性:我们不知道子协程什么时候执行完毕,运行到了什么状态。 另外,我们也不要试图从 add 函数返回处理结果,因为在主协程中,根本获取不到子协程的返回值,从子协程开始执行起就已经和主协程没有任何关系了,返回值会被丢弃。

如果要显示出子协程的打印结果,一种方式是在主协程中等待足够长的时间再退出,以便保证子协程中的所有代码执行完毕。

time.Sleep(1e9)

当然,我们肯能不确定到底需要等待多长事件,这时我们就需要一种更精准的方式在子协程执行完毕后,立即退出主协程,这就涉及到协程间的通信。

总结:从语法结构来说,Go 语言的协程是非常简单的,只需要通过 go 关键字声明即可,难点在于并发引起的不确定性,以及为了协调这种不确定性在不同协程间所要进行的通信,在并发开篇教程中,我们也介绍过在工程上,常见的并发通信模型有两种:共享内存和消息传递。

协程通信

共享内存

package main import ( "fmt" "runtime" "sync" "time" ) // 全局变量用作共享内存 var counter int = 0 var t int = 0 func add(lock *sync.Mutex) { lock.Lock() counter++ t++ lock.Unlock() //fmt.Println("NumGoroutine:", runtime.NumGoroutine()) //返回正在执行和排队的任务总数 } func main() { fmt.Println("cpus:", runtime.NumCPU()) //返回当前系统的 CPU 核数量 //runtime.GOMAXPROCS(1) //设置最大的可同时使用的 CPU 核数 start := time.Now() lock := &sync.Mutex{} for i := 0; i < 100000; i++ { go add(lock) } for { runtime.Gosched() //让当前 goroutine 让出 CPU,当一个 goroutine 发生阻塞,Go 会自动地把与该 goroutine 处于同一系统线程的其他 goroutine 转移到另一个系统线程上去,以使这些 goroutine 不阻塞 if counter >= 100000 { break } } fmt.Println(t) end := time.Now() consume := end.Sub(start).Seconds() fmt.Println("程序执行耗时(s):", consume) }

通过 channel 进行消息传递

通过共享内存实现协程通信太过繁琐,且维护成本高,Go 语言推荐使用消息传递实现并发通信,这种消息通信机制被称为 channel,中文译作「通道」,可理解为传递消息的通道。

chan是go在语言级别提供的协程通信方式,它是一种数据类型,本身是并发安全的,我们可以使用它在多个goroutine之间传递消息,而不必担心通道中的值被污染。

注意: 通道是进程内的通信方式,因此通过通道传递对象的过程和调用函数时的参数传递行为一直,也可以传递指针。如果需要跨进程通信,建议通过分布式系统的方法来解决,比如使用socket或者http等通信协议。

前面我们说到通道是一种数据类型,和数组/切片类型类似,一个通道只能传递一种类型的值,这个类型需要在声明 通道时指定。在使用通道时,需要通过make 进行声明,通道对应的类型关键字是 chan:

ch := make(chan int)

我们可以把通道看作是一个先进先出(FIFO)的队列,通道中的元素会严格按照发送顺序排列,继而按照排列顺序被接收,通道元素的发送和接收都可以通过 <- 操作符来实现,发送时元素值在右,通道变量在左:

ch <- 1 // 表示把元素 1 发送到通道 ch

接收时通道变量在右,可以通过指定变量接收元素值:

element := <-ch

也可以留空表示忽略:

<-ch

这样一来,通过箭头指向我们就可以清楚的判断是写入数据到通道还是从通道读取数据,非常简单形象。

package main import ( "fmt" "time" ) func add(a, b int, ch chan int) { c := a + b fmt.Printf("%d + %d = %d\n", a, b, c) ch <- 1 } func main() { start := time.Now() chs := make([]chan int, 10) for i := 0; i < 10; i++ { chs[i] = make(chan int) go add(1, i, chs[i]) } for _, ch := range chs { <- ch } end := time.Now() consume := end.Sub(start).Seconds() fmt.Println("程序执行耗时(s):", consume) }

之所以上述这段代码可以实现和「共享内存+锁」一样的效果,是因为往通道写入数据和从通道接收数据都是原子操作,或者说是同步阻塞的,当我们向某个通道写入数据时,就相当于该通道被加锁,直到写入操作完成才能执行从该通道读取数据的操作,反过来,当我们从某个通道读取数据时,其他协程也不能操作该通道,直到读取完成,如果通道中没有数据,则会阻塞在这里,直到通道被写入数据。因此,可以看到通道的发送和接收操作是互斥的,同一时间同一个进程内的所有协程对某个通道只能执行发送或接收操作,两者不可能同时进行,这样就保证了并发的安全性,数据不可能被污染。

最新回复(0)