go并发设计模式runner模式

[复制链接]
发表于 2024-12-2 20:15:17 | 显示全部楼层 |阅读模式
go并发设计模式runner模式


真正运行的程序不可能是单线程运行的,go语言中最值得骄傲的就是CSP模型了,可以说go语言是CSP模型的实现。
假设现在有一个程序需要实现,这个程序有以下要求:



  • 程序可以在分配的时间内完成工作,正常终止;
  • 程序没有及时完成工作,“自杀”;
  • 接收到操作体系发送的中断事件,程序立刻试图清理状态并停止工作
数据范例设计

程序需要在规定时间内完成工作的最简单方法就是使用goroutine和channel,我们需要一个chan用来接收操作完成的信号,完成任务的函数可能有错误信息返回,因此我们这里定义一个错误范例的通道,用来关照什么时间完成任务以及完成任务的错误信息。
  1. complete chan error
复制代码
任务执行超时的最简单方法就是使用time包提供的After函数,当指定的时间内没有完成任务那么就出发一下超时通道,因为只需要接收超时的信号,因此只需要定义一个单向接收通道即可
  1. timeout <-chan time.Time
复制代码
当发生体系中断事件时,程序能立刻清理状态然后清理资源并停止工作,因此我们需要一个信号通道来接收操作体系发送的中断信号,这里我们使用signal包提供的Notify函数来注册信号,当操作体系发送信号时,会通过信号通道发送信号
  1. interrupt chan os.Signal
复制代码
程序最重要的是可以或许处理任务,用户需要处理多少任务提前是不能确定的,我们需要一个任务列表,这里我们使用一个切片来保存这些任务。
  1. tasks []func(int)
复制代码
颠末上述设计,我们定义一个Runner布局体,用来保存这些通道和任务列表。
  1. // 并且在操作体系发送中断信号时竣事这些任务type Runner struct {        // interrupt channel 用来接收操作体系发送的信号        interrupt chan os.Signal
  2.         // complete channel 用来关照任务已经完成        complete chan error
  3.         // timeout channel 用来关照任务已经超时的接收通道        timeout <-chan time.Time
  4.         // tasks 用来保存任务列表        tasks []func(int)
  5. }
复制代码
错误体系设计

错误体系设计,我们希望在任务执行完成或者超时或者操作体系发送的中断信号时返回错误,因此我们定义两个个错误变量,分别用来保存超时错误,中断错误和正常完成错误。
  1. // ErrTimeout 定义一个超时错误, 会在人物执行超时时被返回
  2. var ErrTimeout = errors.New("received timeout")
  3. // ErrInterrupt 定义一个中断错误, 会在收到操作系统事件的时候返回
  4. var ErrInterrupt = errors.New("received interrupt")
复制代码
数据范例说明

Signal

os.Signal 是一个接口范例,是对不同操作体系上捕捉的信号的一个抽象接口,用来从操作体系接收中断事件。
  1. // A Signal represents an operating system signal.
  2. // The usual underlying implementation is operating system-dependent:
  3. // on Unix it is syscall.Signal.
  4. type Signal interface {
  5.     String() string
  6.     Signal() // to distinguish from other Stringers
  7. }
复制代码
Error

error 是一个接口范例,用来表现错误,所有错误范例都实现了error接口,因此我们可以通过error接口来判断错误范例。
Time

time.Time 是一个布局体范例,用来表现一个时间,包含年月日时分秒纳秒等信息。
  1. type Time struct {
  2.         // wall and ext encode the wall time seconds, wall time nanoseconds,
  3.         // and optional monotonic clock reading in nanoseconds.
  4.         //
  5.         // From high to low bit position, wall encodes a 1-bit flag (hasMonotonic),
  6.         // a 33-bit seconds field, and a 30-bit wall time nanoseconds field.
  7.         // The nanoseconds field is in the range [0, 999999999].
  8.         // If the hasMonotonic bit is 0, then the 33-bit field must be zero
  9.         // and the full signed 64-bit wall seconds since Jan 1 year 1 is stored in ext.
  10.         // If the hasMonotonic bit is 1, then the 33-bit field holds a 33-bit
  11.         // unsigned wall seconds since Jan 1 year 1885, and ext holds a
  12.         // signed 64-bit monotonic clock reading, nanoseconds since process start.
  13.         wall uint64
  14.         ext  int64
  15.         // loc specifies the Location that should be used to
  16.         // determine the minute, hour, month, day, and year
  17.         // that correspond to this Time.
  18.         // The nil location means UTC.
  19.         // All UTC times are represented with loc==nil, never loc==&utcLoc.
  20.         loc *Location
  21. }
复制代码
方法设计

在go中方法需要示例进行调用,因此我们最后定义一个用来创建Runner实例的New方法,避免用户自行创建实例,导致示例的创建不同一。
名为 New 的工厂函数。这个函数接收一个 time.Duration 范例的值,并返回 Runner 范例的指针。这个函数会创建一个 Runner 范例的值,并初始化每个通道字段。因为 task 字段的零值是 nil,已经满意初始化的要求,所以没有被明确初始化。每个通道字段都有独立的初始化过程
通道 interrupt 被初始化为缓冲区容量为 1 的通道。这可以保证通道至少能接收一个来自语言运行时的 os.Signal 值,确保语言运行时发送这个事件的时间不会被壅闭。假如 goroutine没有预备好接收这个值,这个值就会被扬弃。例如,假如用户反复敲 Ctrl+C 组合键,程序只会在这个通道的缓冲区可用的时间接收事件,其余的所有事件都会被扬弃。
通道 complete 被初始化为无缓冲的通道。当执行任务的 goroutine 完成时,会向这个通道发送一个 error 范例的值或者 nil 值。之后就会等待 main 函数接收这个值。一旦 main 接收了这个 error 值, goroutine 就可以安全地终止了。
最后一个通道 timeout 是用 time 包的 After 函数初始化的。 After 函数返回一个time.Time 范例的通道。语言运行时会在指定的 duration 时间到期之后,向这个通道发送一个 time.Time 的值。
  1. // New 返回一个Runner实例
  2. func New(d time.Duration) *Runner {
  3.         return &Runner{
  4.                 // 1个缓冲的信号通道
  5.                 interrupt: make(chan os.Signal, 1),
  6.                 // 没有缓冲的信号通道,如果没有接受者那么会阻塞
  7.                 complete: make(chan error),
  8.                 timeout:  time.After(d),
  9.         }
  10. }
复制代码
Add 方法用来添加任务,因为需要执行的任务前期并不确定有多少,因此Add接收一个名为tasks的可变参数,可变参数可以接受任意数目的值作为传入参数。这个例子里,这些传入的值必须是一个接收一个整数且什么都
不返回的函数。函数执行时的参数 tasks 是一个存储所有这些传入函数值的切片。
  1. // Add 方法用来添加任务,这个任务是一个接收int类型的ID作为参数的函数
  2. func (r *Runner) Add(tasks ...func(int)) {
  3.         r.tasks = append(r.tasks, tasks...)
  4. }
复制代码
run 方法会迭代 tasks 切片,并按顺序执行每个函数
  1. func (r *Runner) run() error {
  2.         for id, task := range r.tasks {
  3.                 if r.gotInterrupt() {
  4.                         return ErrInterrupt
  5.                 }
  6.                 // 执行注册的任务
  7.                 task(id)
  8.         }
  9.         return nil
  10. }
复制代码
gotInterrupt 展示了带 default 分支的 select 语句的经典用法。代码试图从 interrupt 通道去接收信号。一样平常来说, select 语句在没有任何要接收的数据时会壅闭,不过有了 default 分支就不会壅闭了。 default 分支会将接收 interrupt 通道的壅闭调用转变为非壅闭的。假如 interrupt 通道有中断信号需要接收,就会接收并处理这个中断。假如没有需要接收的信号,就会执行 default 分支。当收到中断信号后,代码会通过调用 Stop 方法来停止接收之后的所有事件。之后函数返回 true。假如没有收到中断信号,在第 99 行该方法会返回 false。本质上,gotInterrupt 方法会让 goroutine 检查中断信号,假如没有发出中断信号,就继续处理工作。
  1. // gotInterrupt 检查是否接收到中断信号
  2. func (r *Runner) gotInterrupt() bool {
  3.         select {
  4.         // 如果有中断信号那么返回true
  5.         case <-r.interrupt:
  6.                 // 接收到中断信号,停止后续再接收到中断信号
  7.                 signal.Stop(r.interrupt)
  8.                 return true
  9.         // 没有终端信号返回false,继续执行
  10.         default:
  11.                 return false
  12.         }
  13. }
复制代码
统统步骤都执行完了,现在开始执行任务
  1. // Start 方法用来开始执行任务,并监视通道事件
  2. func (r *Runner) Start() error {
  3.         // 我们希望接收所有中断信号
  4.         signal.Notify(r.interrupt, os.Interrupt)
  5.         // 异步执行任务
  6.         go func() {
  7.                 r.complete <- r.run()
  8.         }()
  9.         select {
  10.         // 当任务处理完成时该通道会返回
  11.         case err := <-r.complete:
  12.                 return err
  13.         // 当任务处理程序运行超时时发出信号
  14.         case <-r.timeout:
  15.                 return ErrTimeout
  16.         }
  17. }
复制代码
将以上代码全部都整合到runner.go文件中
  1. // Package runner 处理任务的运行和声明周期管理package runnerimport (        "errors"        "os"        "os/signal"        "time")// Runner 在给定的超时时间内执行一组任务// 并且在操作体系发送中断信号时竣事这些任务type Runner struct {        // interrupt channel 用来接收操作体系发送的信号        interrupt chan os.Signal
  2.         // complete channel 用来关照任务已经完成        complete chan error
  3.         // timeout channel 用来关照任务已经超时        timeout <-chan time.Time
  4.         // tasks 用来保存任务列表        tasks []func(int)
  5. }// ErrTimeout 定义一个超时错误, 会在人物执行超时时被返回
  6. var ErrTimeout = errors.New("received timeout")
  7. // ErrInterrupt 定义一个中断错误, 会在收到操作系统事件的时候返回
  8. var ErrInterrupt = errors.New("received interrupt")
  9. // New 返回一个Runner实例
  10. func New(d time.Duration) *Runner {
  11.         return &Runner{
  12.                 // 1个缓冲的信号通道
  13.                 interrupt: make(chan os.Signal, 1),
  14.                 // 没有缓冲的信号通道,如果没有接受者那么会阻塞
  15.                 complete: make(chan error),
  16.                 timeout:  time.After(d),
  17.         }
  18. }
  19. // Add 方法用来添加任务,这个任务是一个接收int类型的ID作为参数的函数
  20. func (r *Runner) Add(tasks ...func(int)) {
  21.         r.tasks = append(r.tasks, tasks...)
  22. }
  23. // Start 方法用来开始执行任务,并监视通道事件
  24. func (r *Runner) Start() error {
  25.         // 我们希望接收所有中断信号
  26.         signal.Notify(r.interrupt, os.Interrupt)
  27.         // 异步执行任务
  28.         go func() {
  29.                 r.complete <- r.run()
  30.         }()
  31.         select {
  32.         // 当任务处理完成时该通道会返回
  33.         case err := <-r.complete:
  34.                 return err
  35.         // 当任务处理程序运行超时时发出信号
  36.         case <-r.timeout:
  37.                 return ErrTimeout
  38.         }
  39. }
  40. func (r *Runner) run() error {
  41.         for id, task := range r.tasks {
  42.                 if r.gotInterrupt() {
  43.                         return ErrInterrupt
  44.                 }
  45.                 // 执行注册的任务
  46.                 task(id)
  47.         }
  48.         return nil
  49. }
  50. // gotInterrupt 检查是否接收到中断信号
  51. func (r *Runner) gotInterrupt() bool {
  52.         select {
  53.         // 如果有中断信号那么返回true
  54.         case <-r.interrupt:
  55.                 // 接收到中断信号,停止后续再接收到中断信号
  56.                 signal.Stop(r.interrupt)
  57.                 return true
  58.         // 没有终端信号返回false,继续执行
  59.         default:
  60.                 return false
  61.         }
  62. }
复制代码
在main.go中进行调用
  1. package main
  2. import (
  3.         "log"
  4.         "os"
  5.         "time"
  6.         "code/runner"
  7. )
  8. // timeout 定义程序执行超时时间,如果超过这个时间还没执行完成会失败退出.
  9. const timeout = 3 * time.Second
  10. // 主函数入口
  11. func main() {
  12.         log.Println("Starting work.")
  13.         // 调用New创建 runner对象.
  14.         r := runner.New(timeout)
  15.         // 向任务队列中添加需要顺序执行的任务
  16.         r.Add(createTask(), createTask(), createTask())
  17.         // Run 执行人物,并按照返回错误处理
  18.         if err := r.Start(); err != nil {
  19.                 switch err {
  20.                 case runner.ErrTimeout:
  21.                         log.Println("Terminating due to timeout.")
  22.                         os.Exit(1)
  23.                 case runner.ErrInterrupt:
  24.                         log.Println("Terminating due to interrupt.")
  25.                         os.Exit(2)
  26.                 }
  27.         }
  28.     // 记录执行结果
  29.         log.Println("Process ended.")
  30. }
  31. // createTask 返回一个入参为int的函数
  32. func createTask() func(int) {
  33.         return func(id int) {
  34.                 log.Printf("Processor - Task #%d.", id)
  35.                 time.Sleep(time.Duration(id) * time.Second)
  36.         }
  37. }
复制代码
源码已经放到gitee需要的自行下载:
https://gitee.com/andrewgithub/note_lab/blob/main/example/go/concurrent_mode/runner/runner.go

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

© 2001-2025 Discuz! Team. Powered by Discuz! X3.5

GMT+8, 2025-7-12 23:31 , Processed in 0.079645 second(s), 30 queries 手机版|qidao123.com技术社区-IT企服评测▪应用市场 ( 浙ICP备20004199 )|网站地图

快速回复 返回顶部 返回列表