在前几章中, 学到了go的并发, 通道是如何工作的, 接下来会介绍三种go常见的并发模式, runner, pool, worker.

本篇会展示使用通道来监视程序的执行时间, 生命周期, 监听终止信号等等. 我们把能执行上述功能的entity抽象成对象Runner. Runner在后台处理任务程序会很有用, 也可以作为基于定时任务(cron task)的云环境任务.

下面们定义了一个Runner类型, 从设计角度上思考, Runner需要完成下列工作:

  • 程序在分配时间内完成工作, 正常终止.
  • 程序没有完成工作, 应该被强制终止, 并返回超时错误.
  • 接收到系统发送的终端信号, 应该完成当前任务, 清理状态并停止工作.
  • 可以按顺序运行一系列工作.

按照上面的Runner需求, 我们定义一个Runner 类型.

package runner

// 定义一个Runner类型
type Runner struct{
    interrupt chan os.Signal    // 监听系统终端信号
    timeout <-chan time.Time    // 监听超时信号
    complete chan error         // 处理任务完成信号
    tasks []func(int)           // tasks是一组按索引依次执行的函数.
}

我们把不同的任务抽象成不同的函数, 因此Runner包含了一个成员tasks, 是一个函数切片. 此外Runner还有三个channel类型的成员:

  • interrupt 信号: 负责监听系统事件.
  • timeout 信号: 负责监听超时信号.
  • complete 信号: 负责监听每个单独task返回值, 成功还是error

根据Runner的内部四个数据类型, 定义一个Runner的工厂函数:

func New(d time.Duration) *Runner {
    return &Runner {
        interrupt: make(chan os.Signal, 1),
        complete: make(chan error),
        timeout: time.After(d)
    }
}

关于这个工厂函数:

  • 允许调用者设置超时时间, 设定的超时时间会传给函数time.After, time.After返回值是一个time.Time的通道, 当设定时间到达后, 会往该通道写值.
  • complete 通道被初始为无缓冲通道, 因为一旦一个task完成或者error out, 它就会像main函数(管理者)发送信号, 一旦信号被接受, Runner应该退出.
  • interrupt 被初始化为缓冲区容量为1的通道, 这样可以保证通道至少能接收一个来自os.Signal的值, 确保runtime发送这个事件不会被堵塞. 因为如果对应的goroutine还没准备好.

既然定义了Runner结构, 一个比较好的代码规范是, 我们也需要定义在Runner里常见的错误类型:

var (
    ErrTimeout = errors.New("执行超时")
    ErrInterrupt = errors.New("收到系统中断信号")
)

Runner需要能够添加不同的tasks, 所以给Runner增加一个Add 方法:

func (r *Runner) Add(tasks ...func(int)) {
    r.tasks = append(r.tasks, tasks...)
}

这里的Add方法接收一个名为tasks的可变参数, 接收可以是任意数量的只要满足类型是函数(接收一个整型参数, 不反回任何值)传入.

我们还需要一个方法告诉Runner来执行内部的tasks.

func (r *Runner) run() error{
    for id, task := range r.tasks{
        select {
            // 当中断事件触发时, 
            case <- r.interrupt :
                signal.Stop(r.interrupt)
                return ErrInterrupt
            default: // do nothing just continue
        }
        task(id)
    }
    return nil
}

在go里, 用select关键词来检查接收到的信号, 一般而言, select在没有收到信号的时候是堵塞的, 但加了default分支, 就不会堵塞. 这里我们按照tasks 被添加的顺序依次执行, 每执行一个新任务时, 都会检查有没有收到interrupt信号, 如果有收到, 那么就直接返回ErrInerrupt, 没有就继续执行下面一个任务. 此外, 采用signal.Stop() 可以阻止接收之后的所有事件.

上述基本就是Runner所需要的所有方法了, 还需要一个外部启动Runner的方法:

func (r *Runner) Start() error {
    // 我们希望接收所有中断信号
    signal.Notify(r.interrupt, os.Interrupt)

    // 用不同的goroutine执行不同的任务
    go func() {
        r.complete <- r.run()
    }()

    select {
        // 当任务完成时发出的信号
        case err := <- r.complete:
            return err
        // 当任务处理程序超时时发出的信号
        case <- r.timeout:
            return ErrTimeout
    }
}

完整的Runner代码:

package main

import (
	"errors"
	"os"
	"os/signal"
	"time"
)

var (
	ErrTimeout   = errors.New("执行超时")
	ErrInterrupt = errors.New("收到系统中断信号")
)

// 定义一个Runner 开放类型
type Runner struct {
	interrupt chan os.Signal   // 监听系统终端信号
	timeout   <-chan time.Time // 监听超时信号
	complete  chan error       // 处理任务完成信号
	tasks     []func(int)      // tasks是一组按索引依次执行的函数.
}

func New(d time.Duration) *Runner {
	return &Runner{
		interrupt: make(chan os.Signal, 1),
		complete:  make(chan error),
		timeout:   time.After(d),
	}
}

func (r *Runner) Add(tasks ...func(int)) {
	r.tasks = append(r.tasks, tasks...)
}

func (r *Runner) run() error {
	for id, task := range r.tasks {
		select {
		// 当中断事件触发时,
		case <-r.interrupt:
			signal.Stop(r.interrupt)
			return ErrInterrupt
		default: // do nothing just continue
		}
		task(id)
	}
	return nil
}

func (r *Runner) Start() error {
	// 我们希望接收所有中断信号
	signal.Notify(r.interrupt, os.Interrupt)

	// 用不同的goroutine执行不同的任务
	go func() {
		r.complete <- r.run()
	}()

	select {
	// 当任务完成时发出的信号
	case err := <-r.complete:
		return err
	// 当任务处理程序超时时发出的信号
	case <-r.timeout:
		return ErrTimeout
	}
}

再写一个main函数用来测试:

package main

import (
	"log"
	"os"
	"time"

	"github.com/xlk3099/runner"
)

// 定义超时时间
const timeout = 3 * time.Second

func main() {
	log.Println("任务开始...")

	// 用timeout创建一个新的runner instance
	r := runner.New(timeout)

	// 添加三个Task
	r.Add(createTask(), createTask(), createTask())

	// Run the tasks and handle the result.
	if err := r.Start(); err != nil {
		switch err {
		case runner.ErrTimeout:
			log.Println("由于超时, Runner终止.")
			os.Exit(1)
		case runner.ErrInterrupt:
			log.Println("由于操作系统干扰, 程序终止.")
			os.Exit(2)
		}
	}

	log.Println("Runner执行结束.")
}

// createTask 返回一个函数
func createTask() func(int) {
	return func(id int) {
		log.Printf("执行任务 - #%d.", id)
		time.Sleep(time.Duration(id) * time.Second)
	}
}

运行上述代码, 并使其自然终止, 我们得到输出:

➜  xlk3099 go run main.go
2018/04/15 22:24:21 任务开始...
2018/04/15 22:24:21 执行任务 - #0.
2018/04/15 22:24:21 执行任务 - #1.
2018/04/15 22:24:22 执行任务 - #2.
2018/04/15 22:24:24 由于超时, Runner终止.
exit status 1

运行上述代码一段时间, 然后按ctrl+c, 得到输出:

➜  xlk3099 go run main.go
2018/04/15 22:20:08 任务开始...
2018/04/15 22:20:08 执行任务 - #0.
2018/04/15 22:20:08 执行任务 - #1.
^C2018/04/15 22:20:09 由于操作系统干扰, 程序终止.
exit status 2

当然, 把超时时间设置成>3的话, 我们会看到:

➜  xlk3099 go run main.go
2018/04/15 22:22:46 任务开始...
2018/04/15 22:22:46 执行任务 - #0.
2018/04/15 22:22:46 执行任务 - #1.
2018/04/15 22:22:47 执行任务 - #2.
2018/04/15 22:22:49 Runner执行结束.

这个只是一个最基本的Runner模型, 还可以有更进一步的扩展, 大家可尝试研究.