go实战读书笔记(十六): 并发模式 - Work
Contents
work 包利用了无缓冲通道来创建一个goroutine池, (注意, 上一章我们创建了一个资源池, 这里我们创建的是goroutine池) 这些goroutine执行并控制一组工作, 让其并发执行. 这种case下, 使用无缓冲通道的效果比指定一个有缓冲区通道好, 因为这种case下不需要一个工作队列, 也不需要一组goroutine 配合执行.
无缓冲通道的一大好处, 就是保证了两个goroutine之间的数据交换. 使用无缓冲通道的方法允许使用者知道什么时候goroutine池正在执行工作, 如果里面所有的goroutine都忙, 无法接受新的工作, 也能通过通道来告诉调用者.
这么看来, 这章其实叫做goroutine pool更好…😁
// 定义一个Pool
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
type Worker interface {
Task()
}
上面我们定义了一个Pool类型, Pool 有一个内部类型work用来接收Worker类型数据, 还有一个WaitGroup参数, 用来等待所有work 被完成.
Pool还需要有其他方法, 比如工厂函数, 给定大小, 建一个新的goroutine池, 添加新的任务.
func New(size int) *Pool {
p := Pool {
work : make(chan Worker),
}
p.wg.Add(size)
for i:=0; i<size; i++{
go func(){
for w := range p.work{
w.Task()
}
p.wg.Done()
}()
}
return &p
}
Pool 需要一个可以添加任务的方法:
func (p *Pool) Run(w Worker){
p.work <- w
}
外部可以关闭Pool
func (p *Pool) Shutdown() {
close(p.work)
p.wg.Wait()
}
其实整个逻辑很简单, goroutine Pool里有个无缓冲通道, 当Pool 被创建的时候, 会启动给定数量的goroitines, 同时监听那个无缓冲通道. 外部可以往无缓冲通道里写入任务, 由于一个任务只能被一个接收者(goroutine)接收, 所以剩余的goroutines会继续等待任务, 一旦所有的goroutines都处在忙碌状态, 那么外部也不能再往无缓冲通道里写入任务.
完整的Pool代码:
package pool
import "sync"
// 定义一个Pool
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
type Worker interface {
Task()
}
func New(size int) *Pool {
p := Pool{
work: make(chan Worker),
}
p.wg.Add(size)
for i := 0; i < size; i++ {
go func() {
for w := range p.work {
w.Task()
}
p.wg.Done()
}()
}
return &p
}
func (p *Pool) Run(w Worker) {
p.work <- w
}
func (p *Pool) Shutdown() {
close(p.work)
p.wg.Wait()
}
测试程序(来自go实战):
package main
import (
"log"
"sync"
"time"
"github.com/xlk3099/work"
)
// names提供了一组用来显示的名字
var names = []string{
"steve",
"bob",
"mary",
"therese",
"jason",
}
// namePrinter使用特定方式打印名字
type namePrinter struct {
name string
}
// Task实现Worker接口
func (m *namePrinter) Task() {
log.Println(m.name)
time.Sleep(time.Second)
}
// main是所有Go程序的入口
func main() {
//使用两个goroutine来创建工作池
p := work.New(2)
var wg sync.WaitGroup
wg.Add(100 * len(names))
for i := 0; i < 100; i++ { // 迭代names切片
for _, name := range names {
// 创建一个namePrinter并提供 // 指定的名字
np := namePrinter{
name: name,
}
go func() {
p.Run(&np)
wg.Done()
}()
}
}
wg.Wait()
p.Shutdown()
}
work这一章跟Pool其实很像, 区别在于资源池内部的资源是其它resources, 比如DB connection. 但work 里面的Pool资源是goroutines, 负责管理多少个goroutines处理一个任务.
附上第7章小结:
- 可以使用通道来控制程序的生命周期.
- 可以使用default来防止select语句阻塞.
- 用缓冲通道可以用来管理一组可复用的资源.
- 使用无缓冲通道可以来创建完成工作的goroutine池.
- 任何时候都可以使用同步通道来让两个goroutine交换数据, 在通道操作完成时一定保证对方接收到了数据.
Author xlk3099
LastMod 2018-04-16