代码仓库

并发概要

随着多核CPU的普及, 为了更快的处理任务, 出现了各种并发编程的模型,
主要有以下几种:

模型名称 优点 缺点
多进程 简单, 隔离性好, 进程间几乎无影响 开销最大
多线程 目前使用最多的方式, 开销比多进程小 高并发模式下, 效率会有影响
异步 相比多线程而言, 可以减少线程的数量 编码要求高, 需要对流程分割合理
协程 用户态线程, 不需要操作系统来调度, 所以轻量, 开销极小 需要语言支持

goroutine-pool

协程介绍

协程是个抽象的概念, 可以映射到到操作系统层面的进程, 线程等概念.
由于协程是用户态的线程, 不用操作系统来调度, 所以不受操作系统的限制,
可以轻松的创建百万个, 因此也被称为 “轻量级线程”.

在 golang 中, 协程不是由库实现的, 而是受语言级别支持的, 因此, 在 golang
中, 使用协程非常方便.
下面通过例子演示在 golang 中, 如何使用协程来完成并发操作.

golang的协程管理

golang 并发

golang协程机制很方便的解决了并发编程的问题,但是协程并不是没有开销的,所以也需要适当限制一下数量。

实现方式

golang 中, 通过 go 关键字可以非常简单的启动一个协程,
几乎没有什么学习成本.
当然并发编程中固有的业务上的困难依然存在(比如并发时的同步, 超时等), 但是
golang 在语言级别给我们提供了优雅简洁的解决这些问题的途径.

理解了 golang 中协程的使用, 会给我们写并发程序时带来极大的便利.
首先以一个简单的例子开始 golang 的并发编程.

package mainimport (     "fmt"     "time")func main() {     for i := 0; i < 10; i++ {             go sum     }     time.Sleep(time.Second * 5)}func sum(start, end int) int {     var sum int = 0     for i := start; i < end; i++ {             sum += i     }     fmt.Printf("Sum from %d to %d is %d\n", start, end, sum)     return sum}

执行结果如下: (同时启动10个协程做累加运算,
10个协程的执行顺序可能会不一样)

$ go run main.goSum from 0 to 10 is 45Sum from 6 to 16 is 105Sum from 7 to 17 is 115Sum from 2 to 12 is 65Sum from 8 to 18 is 125Sum from 1 to 11 is 55Sum from 9 to 19 is 135Sum from 3 to 13 is 75Sum from 4 to 14 is 85Sum from 5 to 15 is 95

通过 go 关键字启动协程之后, 主进程并不会等待协程的执行,
而是继续执行直至结束.
本例中, 如果没有 time.Sleep(time.Second * 5) 等待5秒的话,
那么主进程不会等待那10个协程的运行结果, 直接就结束了.
主进程结束也会导致那10个协程的执行中断, 所以, 如果去掉 time.Sleep
这行代码, 可能屏幕上什么显示也没有.

不使用协程池的代码(示例代码使用chan实现,代码略啰嗦)

简单示例

实际使用协程时, 我们一般会等待所有协程执行完成后, 才会结束主进程,
但是不会用 time.Sleep 这种方式,
因为主进程并不知道协程什么时候会结束, 没法设置等待时间.

这时, 就看出 golang 中的 channel 机制所带来的好处了. 下面用 channel
来改造上面的 time.Sleep

package mainimport "fmt"func main() {     var ch = make(chan string)     for i := 0; i < 10; i++ {             go sum(i, i+10, ch)     }     for i := 0; i < 10; i++ {             fmt.Print     }}func sum(start, end int, ch chan string) {     var sum int = 0     for i := start; i < end; i++ {             sum += i     }     ch <- fmt.Sprintf("Sum from %d to %d is %d\n", start, end, sum)}

程序执行结果和上面一样, 因为是并发的缘故, 可能输出的 sum
顺序可能会不一样.

$ go run main.goSum from 9 to 19 is 135Sum from 0 to 10 is 45Sum from 5 to 15 is 95Sum from 6 to 16 is 105Sum from 7 to 17 is 115Sum from 2 to 12 is 65Sum from 8 to 18 is 125Sum from 3 to 13 is 75Sum from 1 to 11 is 55Sum from 4 to 14 is 85

golang 的 chan 可以是任意类型的, 上面的例子中定义的是 string 型.
从上面的程序可以看出, 往 chan 中写入数据之后, 协程会阻塞在那里,
直到在某个地方将 chan 中的值读取出来, 协程才会继续运行下去.

上面的例子中, 我们启动了10个协程, 每个协程都往 chan 中写入了一个字符串,
然后在 main 函数中, 依次读取 chan 中的字符串, 并在屏幕上打印出来.
通过 golang 中的 chan, 不仅实现了主进程 和 协程之间的通信, 而且不用像
time.Sleep 那样不可控(因为你不知道要 Sleep 多长时间).

func (p *converter) upload(bytes [][]byte) ([]string, error) {
  ch := make(chan struct{}, 4)
  wg := &sync.WaitGroup{}
  wg.Add(len(bytes))
  ret := make([]string, len(bytes))
  // 上传
  for index, item := range bytes {
    ch <- struct{}{}
    go func(index int, imageData []byte) {
      defer func() {
        wg.Done()
        <-ch
      }()
      link, err := qiniu.UploadBinary(imageData, fmt.Sprintf("%d.png", time.Now().UnixNano()))
      if err != nil {
        log.Println("上传图片失败", err.Error())
        return
      }
      ret[index] = link
    }(index, item)
  }
  wg.Wait()
  return ret, nil
}

并发时的缓冲

上面的例子中, 所有协程使用的是同一个 chan, chan 的容量默认只有 1,
当某个协程向 chan 中写入数据时, 其他协程再次向 chan 中写入数据时,
其实是阻塞的.
等到 chan 中的数据被读出之后, 才会再次让某个其他协程写入,
因为每个协程都执行的非常快, 所以看不出来.

改造下上面的例子, 加入些 Sleep 代码, 延长每个协程的执行时间,
我们就可以看出问题, 代码如下:

package mainimport (     "fmt"     "time")func main() {     var ch = make(chan string)     for i := 0; i < 5; i++ {             go sum(i, i+10, ch)     }     for i := 0; i < 10; i++ {             time.Sleep(time.Second * 1)             fmt.Print     }}func sum(start, end int, ch chan string) int {     ch <- fmt.Sprintf("Sum from %d to %d is starting at %s\n", start, end, time.Now().String     var sum int = 0     for i := start; i < end; i++ {             sum += i     }     time.Sleep(time.Second * 10)     ch <- fmt.Sprintf("Sum from %d to %d is %d at %s\n", start, end, sum, time.Now().String     return sum}

执行结果如下:

$ go run main.goSum from 4 to 14 is starting at 2015-10-13 13:59:56.025633342 +0800 CSTSum from 3 to 13 is starting at 2015-10-13 13:59:56.025608644 +0800 CSTSum from 0 to 10 is starting at 2015-10-13 13:59:56.025508327 +0800 CSTSum from 2 to 12 is starting at 2015-10-13 13:59:56.025574486 +0800 CSTSum from 1 to 11 is starting at 2015-10-13 13:59:56.025593711 +0800 CSTSum from 4 to 14 is 85 at 2015-10-13 14:00:07.030611465 +0800 CSTSum from 3 to 13 is 75 at 2015-10-13 14:00:08.031926629 +0800 CSTSum from 0 to 10 is 45 at 2015-10-13 14:00:09.036724803 +0800 CSTSum from 2 to 12 is 65 at 2015-10-13 14:00:10.038125044 +0800 CSTSum from 1 to 11 is 55 at 2015-10-13 14:00:11.040366206 +0800 CST

为了演示 chan 的阻塞情况, 上面的代码中特意加了一些 time.Sleep 函数.

  • 每个执行 Sum 函数的协程都会运行 10 秒
  • main函数中每隔 1 秒读一次 chan 中的数据

从打印结果我们可以看出, 所有协程几乎是同一时间开始的,
说明了协程确实是并发的.
其中, 最快的协程(Sum from 4 to 14…)执行了 11 秒左右, 为什么是 11
秒左右呢?
说明它阻塞在了 Sum 函数中的第一行上, 等了 1 秒之后, main 函数开始读出
chan 中数据后才继续运行.
它自身运行需要 10 秒, 加上等待的 1 秒, 正好 11 秒左右.

最慢的协程执行了 15 秒左右, 这个也很好理解, 总共启动了 5 个协程, main
函数每隔 1 秒 读出一次 chan, 最慢的协程等待了 5 秒,
再加上自身执行了 10 秒, 所以一共 15 秒左右.

到这里, 我们很自然会想到能否增加 chan 的容量, 从而使得每个协程尽快执行,
完成自己的操作, 而不用等待, 消除由于 main 函数的处理所带来的瓶颈呢?
答案是当然可以, 而且在 golang 中实现还很简单, 只要在创建 chan 时, 指定
chan 的容量就行.

package mainimport (     "fmt"     "time")func main() {     var ch = make(chan string, 10)     for i := 0; i < 5; i++ {             go sum(i, i+10, ch)     }     for i := 0; i < 10; i++ {             time.Sleep(time.Second * 1)             fmt.Print     }}func sum(start, end int, ch chan string) int {     ch <- fmt.Sprintf("Sum from %d to %d is starting at %s\n", start, end, time.Now().String     var sum int = 0     for i := start; i < end; i++ {             sum += i     }     time.Sleep(time.Second * 10)     ch <- fmt.Sprintf("Sum from %d to %d is %d at %s\n", start, end, sum, time.Now().String     return sum}

执行结果如下:

$ go run main.goSum from 0 to 10 is starting at 2015-10-13 14:22:14.64534265 +0800 CSTSum from 2 to 12 is starting at 2015-10-13 14:22:14.645382961 +0800 CSTSum from 3 to 13 is starting at 2015-10-13 14:22:14.645408947 +0800 CSTSum from 4 to 14 is starting at 2015-10-13 14:22:14.645417257 +0800 CSTSum from 1 to 11 is starting at 2015-10-13 14:22:14.645427028 +0800 CSTSum from 1 to 11 is 55 at 2015-10-13 14:22:24.6461138 +0800 CSTSum from 3 to 13 is 75 at 2015-10-13 14:22:24.646330223 +0800 CSTSum from 2 to 12 is 65 at 2015-10-13 14:22:24.646325521 +0800 CSTSum from 4 to 14 is 85 at 2015-10-13 14:22:24.646343061 +0800 CSTSum from 0 to 10 is 45 at 2015-10-13 14:22:24.64634674 +0800 CST

从执行结果可以看出, 所有协程几乎都是 10秒完成的. 所以在使用协程时,
记住可以通过使用缓存来进一步提高并发性.

需要实现的需求有两个:

并发时的超时

并发编程, 由于不能确保每个协程都能及时响应, 有时候协程长时间没有响应,
主进程不可能一直等待, 这时候就需要超时机制.
在 golang 中, 实现超时机制也很简单.

package mainimport (     "fmt"     "time")func main() {     var ch = make(chan string, 1)     var timeout = make(chan bool, 1)     go sum(1, 10, ch)     go func() {             time.Sleep(time.Second * 5) // 5 秒超时             timeout <- true     }()     select {     case sum := <-ch:             fmt.Print     case <-timeout:             fmt.Println("Sorry, TIMEOUT!")     }}func sum(start, end int, ch chan string) int {     var sum int = 0     for i := start; i < end; i++ {             sum += i     }     time.Sleep(time.Second * 10)     ch <- fmt.Sprintf("Sum from %d to %d is %d\n", start, end, sum)     return sum}

通过一个匿名函数来控制超时, 然后同时启动 计算 sum 的协程和timeout协程,
在 select 中看谁先结束,
如果 timeout 结束后, 计算 sum 的协程还没有结束的话, 就会进入超时处理.

上例中, timeout 只有5秒, sum协程会执行10秒, 所以执行结果如下:

$ go run main.goSorry, TIMEOUT!

修改 time.Sleep(time.Second * 5) 为 time.Sleep(time.Second * 15) 的话,
就会看到 sum 协程的执行结果

限制最大协程数,本例为4

网站地图xml地图