Go语言学习Part5-并发

tech2026-01-10  12

刨坑很深的我,脚踏实地可能会仰望天空,终于go学了基本的了

Go程

Go程:Go运行时管理的轻量级线程go f(x, y, z):f, x, y 和 z 的求值发生在当前的 Go 程中,而 f 的执行发生在新的 Go 程中 //case1 package main import ( "fmt" "time" ) func say(s string) { for i := 0; i < 5; i++ { fmt.Println(s) time.Sleep(100 * time.Millisecond) } } func main() { go say("world") time.Sleep(500 * time.Millisecond) //分别改成100、200试试 fmt.Println("nice") } say(“world”)不是没有运行,而是主协程已经停止运行,没等到子协程打印信息就退出了。 主协程运行完毕之后,整个程序就会退出。go关键字是用来在当前协程上开启一个新协程的。那go say(“world”)函数是被放置在了一个子协程上运行的。而在say(“hello”)后,主协程已经运行完毕并且退出,所以子协程被迫结束。

信道

信道在使用前必须创建 ch:= make(chan int)将值发送至信道ch从信道ch接收值并赋予v //case2 package main import "fmt" func sum(s []int, c chan int) { sum := 0 for _, v := range s { sum += v } c <- sum // 将和送入 c } func main() { s := []int{7, 2, 8, -9, 4, 0} c := make(chan int) go sum(s[:len(s)/2], c) go sum(s[len(s)/2:], c) x, y := <-c, <-c // 从 c 中接收 fmt.Println(x, y, x+y) }

带缓冲的信道

将缓冲长度作为第二个参数提供给 make 来初始化一个带缓冲的信道仅当信道的缓冲区填满后,向其发送数据时才会阻塞。当缓冲区为空时,接受方会阻塞。修改示例填满缓冲区:运行会报错,比如你试试下面代码 //case3 package main import "fmt" func main() { ch := make(chan int, 2) // change to 3 is ok ch <- 1 ch <- 2 ch <- 3 fmt.Println(<-ch) fmt.Println(<-ch) }

range和close

cap()函数返回的是数组切片分配的空间大小通过 close 关闭一个信道来表示没有需要发送的值了。接收者可以通过为接收表达式分配第二个参数来测试信道是否被关闭:若没有值可以接收且信道已被关闭,那么在执行完以下表达式1,ok 会被设置为 false。 v, ok := <-ch //表达式1 for i := range c //表达式2 循环(表达式2)会不断从信道接收值,直到它被关闭。向一个已经关闭的信道发送数据会引发程序恐慌(panic)。 //case4 package main import ( "fmt" ) func fibonacci(n int, c chan int) { x, y := 0, 1 for i := 0; i < n; i++ { c <- x x, y = y, x+y } close(c) } func main() { c := make(chan int, 10) go fibonacci(cap(c), c) for i := range c { fmt.Println(i) } z := 300 c <- z }

select语句

select 会阻塞到某个分支可以继续执行为止,这时就会执行该分支。当多个分支都准备好时会随机选择一个执行。一直往通道c里面塞东西,输出10次之后,往通道quit里面塞东西,select就进入了quit case,然后就退出了 //case5 package main import "fmt" func fibonacci(c, quit chan int) { x, y := 0, 1 for { select { case c <- x: x, y = y, x+y case <-quit: fmt.Println("quit") return } } } func main() { c := make(chan int) quit := make(chan int) go func() { for i := 0; i < 10; i++ { fmt.Println(<-c) } quit <- 0 }() fibonacci(c, quit) } 当 select 中的其它分支都没有准备好时,default 分支就会执行。 time.After()表示多长时间长的时候后返回一条time.Time类型的通道消息。但是在取出channel内容之前不阻塞,后续程序可以继续执行。 //case6 package main import ( "fmt" "time" ) func main() { tick := time.Tick(100 * time.Millisecond) boom := time.After(500 * time.Millisecond) for { select { case <-tick: fmt.Println("tick.") case <-boom: fmt.Println("BOOM!") return default: fmt.Println(" .") time.Sleep(50 * time.Millisecond) } } }

练习:等价二叉查找树

//case7 package main import "golang.org/x/tour/tree" import "fmt" type Tree struct { Left *Tree Value int Right *Tree } // Walk 步进 tree t 将所有的值从 tree 发送到 channel ch。 func Walk(t *tree.Tree, ch chan int){ if t.Left != nil { Walk(t.Left, ch) } ch <- t.Value if t.Right != nil { Walk(t.Right, ch) } } // Same 检测树 t1 和 t2 是否含有相同的值。 func Same(t1, t2 *tree.Tree) bool { num := 10 c1 := make(chan int, num) c2 := make(chan int, num) go Walk(t1, c1) go Walk(t2, c2) for i:=0; i<num; i++ { if <-c1 != <-c2 { return false } } return true } func main() { t1 := tree.New(10) t2 := tree.New(10) fmt.Println(Same(t1, t2)) }

互斥锁

在代码前调用Lock方法,在代码后调用Unlock方法,保证一段代码的互斥执行互斥数据结构:互斥锁Mutex锁写在struct里面defer return前被执行,多个defer是压栈的形式,return前被执行,先进去的后执行 //case8 package main import ( "fmt" "sync" "time" ) // SafeCounter 的并发使用是安全的。 type SafeCounter struct { v map[string]int mux sync.Mutex } // Inc 增加给定 key 的计数器的值。 func (c *SafeCounter) Inc(key string) { c.mux.Lock() // Lock 之后同一时刻只有一个 goroutine 能访问 c.v c.v[key]++ c.mux.Unlock() } // Value 返回给定 key 的计数器的当前值。 func (c *SafeCounter) Value(key string) int { c.mux.Lock() // Lock 之后同一时刻只有一个 goroutine 能访问 c.v defer c.mux.Unlock() return c.v[key] } func main() { c := SafeCounter{v: make(map[string]int)} for i := 0; i < 1000; i++ { go c.Inc("somekey") } time.Sleep(time.Second) fmt.Println(c.Value("somekey")) }

练习web爬虫

基本概念

https://zengweigang.gitbooks.io/core-go/content/eBook/14.2.html没有爬过的url,可以用两种方法运行这个协程 channel:数据通过通道,同一时间只有一个协程可以访问数据等待组实现:http://c.biancheng.net/view/108.html

使用channel实现code

代码
//case9 channel package main import ( "fmt" "sync" "time" ) type Fetcher interface { // Fetch 返回 URL 的 body 内容,并且将在这个页面上找到的 URL 放到一个 slice 中。 Fetch(url string) (body string, urls []string, err error) } //基本思路是建一个dictionary,然后每次要往抓取url的时候,检测是否已经抓取过了 type SafeCounter struct { v map[string]bool mux sync.Mutex } func (c *SafeCounter) Put(key string) { c.mux.Lock() // Lock 之后同一时刻只有一个 goroutine 能访问 c.v c.v[key] = true c.mux.Unlock() } func (c *SafeCounter) Contains(key string) bool { c.mux.Lock() // Lock 之后同一时刻只有一个 goroutine 能访问 c.v defer c.mux.Unlock() _, ok := c.v[key] return ok } // Crawl 使用 fetcher 从某个 URL 开始递归的爬取页面,直到达到最大深度。 var urlMap *SafeCounter = &SafeCounter{v: make(map[string]bool)} func Crawl(url string, depth int, fetcher Fetcher, ch chan string) { // TODO: 并行的抓取 URL。 // TODO: 不重复抓取页面。 // 下面并没有实现上面两种情况: if depth <= 0 { return } urlMap.Put(url) body, urls, err := fetcher.Fetch(url) if err != nil { fmt.Println(err) return } ch <- url fmt.Printf("found: %s %q\n", url, body) for _, u := range urls { if urlMap.Contains(u) { continue } go Crawl(u, depth-1, fetcher, ch) } return } func main() { ch := make(chan string) go Crawl("https://golang.org/", 4, fetcher, ch) boom := time.After(3 * time.Second) for { select { case r := <-ch: fmt.Printf("found: %s\n", r) boom = time.After(3 * time.Second) case <-boom: fmt.Printf("time out\n") return } } } // fakeFetcher 是返回若干结果的 Fetcher。 type fakeFetcher map[string]*fakeResult type fakeResult struct { body string urls []string } func (f fakeFetcher) Fetch(url string) (string, []string, error) { if res, ok := f[url]; ok { return res.body, res.urls, nil } return "", nil, fmt.Errorf("not found: %s", url) } // fetcher 是填充后的 fakeFetcher。 var fetcher = fakeFetcher{ "https://golang.org/": &fakeResult{ "The Go Programming Language", []string{ "https://golang.org/pkg/", "https://golang.org/cmd/", }, }, "https://golang.org/pkg/": &fakeResult{ "Packages", []string{ "https://golang.org/", "https://golang.org/cmd/", "https://golang.org/pkg/fmt/", "https://golang.org/pkg/os/", }, }, "https://golang.org/pkg/fmt/": &fakeResult{ "Package fmt", []string{ "https://golang.org/", "https://golang.org/pkg/", }, }, "https://golang.org/pkg/os/": &fakeResult{ "Package os", []string{ "https://golang.org/", "https://golang.org/pkg/", }, }, }
输出结果
found: https://golang.org/ found: https://golang.org/ "The Go Programming Language" not found: https://golang.org/cmd/ found: https://golang.org/pkg/ "Packages" found: https://golang.org/pkg/ found: https://golang.org/pkg/os/ found: https://golang.org/pkg/fmt/ found: https://golang.org/pkg/fmt/ "Package fmt" found: https://golang.org/pkg/os/ "Package os" time out

使用等待组实现code

http://c.biancheng.net/view/108.html

在 sync.WaitGroup(等待组)类型中,每个 sync.WaitGroup 值在内部维护着一个计数,此计数的初始默认值为零。对于一个可寻址的 sync.WaitGroup 值 wg: 我们可以使用方法调用 wg.Add(delta) 来改变值 wg 维护的计数。方法调用 wg.Done() 和 wg.Add(-1) 是完全等价的。如果一个 wg.Add(delta) 或者 wg.Done() 调用将 wg 维护的计数更改成一个负数,一个恐慌将产生。当一个协程调用了 wg.Wait() 时, 如果此时 wg 维护的计数为零,则此 wg.Wait() 此操作为一个空操作(noop); 否则(计数为一个正整数),此协程将进入阻塞状态。当以后其它某个协程将此计数更改至 0 时(一般通过调用 wg.Done()),此协程将重新进入运行状态(即 wg.Wait() 将返回)。 等待组内部拥有一个计数器,计数器的值可以通过方法调用实现计数器的增加和减少。当我们添加了 N 个并发任务进行工作时,就将等待组的计数器值增加 N。每个任务完成时,这个值减 1。同时,在另外一个 goroutine 中等待这个等待组的计数器值为 0 时,表示所有任务已经完成。
代码
//case10 WaitGroup package main import ( "fmt" "sync" ) type SafeMap struct { v map[string]bool mux sync.Mutex // 访问互斥锁 wg sync.WaitGroup // 等待组 } type Fetcher interface { // Fetch 返回 URL 的 body 内容,并且将在这个页面上找到的 URL 放到一个 slice 中。 Fetch(url string) (body string, urls []string, err error) } func (c *SafeMap) Contains(key string) bool { c.mux.Lock() // Lock 之后同一时刻只有一个 goroutine 能访问 c.v defer c.mux.Unlock() _, ok := c.v[key] if ok { return true } else { c.v[key] = true c.wg.Add(1) return false } return ok } var urlMap *SafeMap = &SafeMap{v: make(map[string]bool)} // Crawl 使用 fetcher 从某个 URL 开始递归的爬取页面,直到达到最大深度。 func Crawl(url string, depth int, fetcher Fetcher) { defer urlMap.wg.Done() //这个程序执行完了,则执行其他的协程 // TODO: 并行的抓取 URL。 // TODO: 不重复抓取页面。 // 下面并没有实现上面两种情况: if depth <= 0 { return } body, urls, err := fetcher.Fetch(url) if err != nil { fmt.Println(err) return } fmt.Printf("found: %s %q\n", url, body) for _, u := range urls { if urlMap.Contains(u) == false { go Crawl(u, depth-1, fetcher) } } return } func main() { urlMap.Contains("https://golang.org/") Crawl("https://golang.org/", 4, fetcher) urlMap.wg.Wait() } // fakeFetcher 是返回若干结果的 Fetcher。 type fakeFetcher map[string]*fakeResult type fakeResult struct { body string urls []string } func (f fakeFetcher) Fetch(url string) (string, []string, error) { if res, ok := f[url]; ok { return res.body, res.urls, nil } return "", nil, fmt.Errorf("not found: %s", url) } // fetcher 是填充后的 fakeFetcher。 var fetcher = fakeFetcher{ "https://golang.org/": &fakeResult{ "The Go Programming Language", []string{ "https://golang.org/pkg/", "https://golang.org/cmd/", }, }, "https://golang.org/pkg/": &fakeResult{ "Packages", []string{ "https://golang.org/", "https://golang.org/cmd/", "https://golang.org/pkg/fmt/", "https://golang.org/pkg/os/", }, }, "https://golang.org/pkg/fmt/": &fakeResult{ "Package fmt", []string{ "https://golang.org/", "https://golang.org/pkg/", }, }, "https://golang.org/pkg/os/": &fakeResult{ "Package os", []string{ "https://golang.org/", "https://golang.org/pkg/", }, }, }
输出结果
最新回复(0)