当前位置 博文首页 > Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)

    Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)

    作者:雪山飞猪 时间:2021-06-16 18:25

    前言

    同步适合多个连续执行的,每一步的执行依赖于上一步操作,异步执行则和任务执行顺序无关(如从10个站点抓取数据)

    同步执行类RunnerAsync

    支持返回超时检测,系统中断检测

    错误常量定义

    //超时错误
    var ErrTimeout = errors.New("received timeout")
    //操作系统系统中断错误
    var ErrInterrupt = errors.New("received interrupt")

    实现代码如下

    package task
    import (
     "os"
     "time"
     "os/signal"
     "sync"
    )
     
    //异步执行任务
    type Runner struct {
     //操作系统的信号检测
     interrupt chan os.Signal
     //记录执行完成的状态
     complete chan error
     //超时检测
     timeout <-chan time.Time
     //保存所有要执行的任务,顺序执行
     tasks []func(id int) error
     waitGroup sync.WaitGroup
     lock sync.Mutex
     errs []error
    }
     
    //new一个Runner对象
    func NewRunner(d time.Duration) *Runner {
     return &Runner{
     interrupt: make(chan os.Signal, 1),
     complete: make(chan error),
     timeout: time.After(d),
     waitGroup: sync.WaitGroup{},
     lock: sync.Mutex{},
     }
    }
     
    //添加一个任务
    func (this *Runner) Add(tasks ...func(id int) error) {
     this.tasks = append(this.tasks, tasks...)
    }
     
    //启动Runner,监听错误信息
    func (this *Runner) Start() error {
     //接收操作系统信号
     signal.Notify(this.interrupt, os.Interrupt)
     //并发执行任务
     go func() {
     this.complete <- this.Run()
     }()
     select {
     //返回执行结果
     case err := <-this.complete:
     return err
     //超时返回
     case <-this.timeout:
     return ErrTimeout
     }
    }
     
    //异步执行所有的任务
    func (this *Runner) Run() error {
     for id, task := range this.tasks {
     if this.gotInterrupt() {
      return ErrInterrupt
     }
     this.waitGroup.Add(1)
     go func(id int) {
      this.lock.Lock()
      //执行任务
      err := task(id)
      //加锁保存到结果集中
      this.errs = append(this.errs, err)
     
      this.lock.Unlock()
      this.waitGroup.Done()
     }(id)
     }
     this.waitGroup.Wait()
     
     return nil
    }
     
    //判断是否接收到操作系统中断信号
    func (this *Runner) gotInterrupt() bool {
     select {
     case <-this.interrupt:
     //停止接收别的信号
     signal.Stop(this.interrupt)
     return true
     //正常执行
     default:
     return false
     }
    }
     
    //获取执行完的error
    func (this *Runner) GetErrs() []error {
     return this.errs
    }

    使用方法    

    Add添加一个任务,任务为接收int类型的一个闭包

    Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作)

    测试示例代码

    package task
    import (
     "testing"
     "time"
     "fmt"
     "os"
     "runtime"
    )
     
    func TestRunnerAsync_Start(t *testing.T) {
     //开启多核
     runtime.GOMAXPROCS(runtime.NumCPU())
     //创建runner对象,设置超时时间
     runner := NewRunnerAsync(8 * time.Second)
     //添加运行的任务
     runner.Add(
     createTaskAsync(),
     createTaskAsync(),
     createTaskAsync(),
     createTaskAsync(),
     createTaskAsync(),
     createTaskAsync(),
     createTaskAsync(),
     createTaskAsync(),
     createTaskAsync(),
     createTaskAsync(),
     createTaskAsync(),
     createTaskAsync(),
     createTaskAsync(),
     )
     fmt.Println("同步执行任务")
     //开始执行任务
     if err := runner.Start(); err != nil {
     switch err {
     case ErrTimeout:
      fmt.Println("执行超时")
      os.Exit(1)
     case ErrInterrupt:
      fmt.Println("任务被中断")
      os.Exit(2)
     }
     }
     t.Log("执行结束")
    }
     
    //创建要执行的任务
    func createTaskAsync() func(id int) {
     return func(id int) {
     fmt.Printf("正在执行%v个任务\n", id)
     //模拟任务执行,sleep两秒
     //time.Sleep(1 * time.Second)
     }
    }

    执行结果  

    同步执行任务
    正在执行0个任务
    正在执行1个任务
    正在执行2个任务
    正在执行3个任务
    正在执行4个任务
    正在执行5个任务
    正在执行6个任务
    正在执行7个任务
    正在执行8个任务
    正在执行9个任务
    正在执行10个任务
    正在执行11个任务
    正在执行12个任务
     runnerAsync_test.go:49: 执行结束

    异步执行类Runner

    支持返回超时检测,系统中断检测

    实现代码如下

    package task
    import (
     "os"
     "time"
     "os/signal"
     "sync"
    )
     
    //异步执行任务
    type Runner struct {
     //操作系统的信号检测
     interrupt chan os.Signal
     //记录执行完成的状态
     complete chan error
     //超时检测
     timeout <-chan time.Time
     //保存所有要执行的任务,顺序执行
     tasks []func(id int) error
     waitGroup sync.WaitGroup
     lock sync.Mutex
     errs []error
    }
     
    //new一个Runner对象
    func NewRunner(d time.Duration) *Runner {
     return &Runner{
      interrupt: make(chan os.Signal, 1),
      complete: make(chan error),
      timeout: time.After(d),
      waitGroup: sync.WaitGroup{},
      lock:  sync.Mutex{},
     }
    }
     
    //添加一个任务
    func (this *Runner) Add(tasks ...func(id int) error) {
     this.tasks = append(this.tasks, tasks...)
    }
     
    //启动Runner,监听错误信息
    func (this *Runner) Start() error {
     //接收操作系统信号
     signal.Notify(this.interrupt, os.Interrupt)
     //并发执行任务
     go func() {
      this.complete <- this.Run()
     }()
     select {
     //返回执行结果
     case err := <-this.complete:
      return err
      //超时返回
     case <-this.timeout:
      return ErrTimeout
     }
    }
     
    //异步执行所有的任务
    func (this *Runner) Run() error {
     for id, task := range this.tasks {
      if this.gotInterrupt() {
       return ErrInterrupt
      }
      this.waitGroup.Add(1)
      go func(id int) {
       this.lock.Lock()
       //执行任务
       err := task(id)
       //加锁保存到结果集中
       this.errs = append(this.errs, err)
       this.lock.Unlock()
       this.waitGroup.Done()
      }(id)
     }
     this.waitGroup.Wait()
     return nil
    }
     
    //判断是否接收到操作系统中断信号
    func (this *Runner) gotInterrupt() bool {
     select {
     case <-this.interrupt:
      //停止接收别的信号
      signal.Stop(this.interrupt)
      return true
      //正常执行
     default:
      return false
     }
    }
     
    //获取执行完的error
    func (this *Runner) GetErrs() []error {
     return this.errs
    }

    使用方法    

    Add添加一个任务,任务为接收int类型,返回类型error的一个闭包

    Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作)

    getErrs获取所有的任务执行结果

    测试示例代码

    package task
    import (
     "testing"
     "time"
     "fmt"
     "os"
     "runtime"
    )
     
    func TestRunner_Start(t *testing.T) {
     //开启多核心
     runtime.GOMAXPROCS(runtime.NumCPU())
     //创建runner对象,设置超时时间
     runner := NewRunner(18 * time.Second)
     //添加运行的任务
     runner.Add(
      createTask(),
      createTask(),
      createTask(),
      createTask(),
      createTask(),
      createTask(),
      createTask(),
      createTask(),
      createTask(),
      createTask(),
      createTask(),
      createTask(),
      createTask(),
      createTask(),
     )
     fmt.Println("异步执行任务")
     //开始执行任务
     if err := runner.Start(); err != nil {
      switch err {
      case ErrTimeout:
       fmt.Println("执行超时")
       os.Exit(1)
      case ErrInterrupt:
       fmt.Println("任务被中断")
       os.Exit(2)
      }
     }
     t.Log("执行结束")
     t.Log(runner.GetErrs())
    }
     
    //创建要执行的任务
    func createTask() func(id int) error {
     return func(id int) error {
      fmt.Printf("正在执行%v个任务\n", id)
      //模拟任务执行,sleep
      //time.Sleep(1 * time.Second)
      return nil
     }
    }

    执行结果

    异步执行任务
    正在执行2个任务
    正在执行1个任务
    正在执行4个任务
    正在执行3个任务
    正在执行6个任务
    正在执行5个任务
    正在执行9个任务
    正在执行7个任务
    正在执行10个任务
    正在执行13个任务
    正在执行8个任务
    正在执行11个任务
    正在执行12个任务
    正在执行0个任务
     runner_test.go:49: 执行结束
     runner_test.go:51: [<nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil>]

    总结

    js
    下一篇:没有了