兜兜零元 发表于 2025-1-4 22:39:41

如何在Golang中实现协程池

在Golang中实现协程池(Goroutine Pool)可以进步并发任务的执行服从,特别是在必要限制并发数量或管理资源的环境下。协程池允许你控制同时运行的协程数量,从而制止创建过多的协程导致系统资源耗尽。
以下是一个简朴的协程池实现示例:

[*] 定义协程池结构体:
协程池结构体必要包罗任务队列、工作协程数量、期待组等。
[*] 实现任务提交和协程管理:
使用通道(channel)来管理任务队列,并使用期待组(sync.WaitGroup)来期待所有任务完成。
package main

import (
        "fmt"
        "sync"
        "time"
)

// Task represents a unit of work that the goroutine pool will execute.
type Task func()

// GoroutinePool represents a pool of goroutines that can execute tasks.
type GoroutinePool struct {
        tasks      chan Task
        workerPool chan struct{}
        wg         sync.WaitGroup
        maxWorkers int
}

// NewGoroutinePool creates a new goroutine pool with a specified maximum number of workers.
func NewGoroutinePool(maxWorkers int) *GoroutinePool {
        pool := &GoroutinePool{
                tasks:      make(chan Task),
                workerPool: make(chan struct{}, maxWorkers),
                maxWorkers: maxWorkers,
        }

        // Start worker goroutines
        for i := 0; i < maxWorkers; i++ {
                pool.workerPool <- struct{}{}
                go pool.worker()
        }

        return pool
}

// worker is the goroutine that executes tasks from the tasks channel.
func (p *GoroutinePool) worker() {
        for task := range p.tasks {
                task()
                <-p.workerPool // Signal that a worker is available again
        }
}

// Submit adds a task to the goroutine pool.
func (p *GoroutinePool) Submit(task Task) {
        p.wg.Add(1)
        go func() {
                defer p.wg.Done()
                <-p.workerPool // Wait for a worker to be available
                p.tasks <- task
        }()
}

// Wait waits for all submitted tasks to complete.
func (p *GoroutinePool) Wait() {
        p.wg.Wait()
        close(p.tasks) // Close the tasks channel to signal workers to exit
        for range p.workerPool { // Drain the workerPool to ensure all workers have exited
        }
}

func main() {
        // Create a goroutine pool with 3 workers
        pool := NewGoroutinePool(3)

        // Submit some tasks to the pool
        for i := 0; i < 10; i++ {
                taskNum := i
                pool.Submit(func() {
                        fmt.Printf("Executing task %d\n", taskNum)
                        time.Sleep(time.Second) // Simulate work
                })
        }

        // Wait for all tasks to complete
        pool.Wait()
        fmt.Println("All tasks completed")
} 解释


[*]结构体定义:

[*]Task:表示一个任务,是一个无参数的函数。
[*]GoroutinePool:包罗任务通道 tasks、工作协程控制通道 workerPool、期待组 wg 和最大工作协程数量 maxWorkers。

[*]创建协程池:

[*]NewGoroutinePool 函数初始化协程池,并启动指定数量的工作协程。

[*]工作协程:

[*]worker 方法从 tasks 通道中接收任务并执行,执行完成后将工作协程标志为可用(通过向 workerPool 发送一个空结构体)。

[*]提交任务:

[*]Submit 方法将任务添加到任务通道,并期待一个工作协程变为可用。

[*]期待任务完成:

[*]Wait 方法期待所有任务完成,并关闭任务通道,确保所有工作协程退出。

使用示例

在 main 函数中,我们创建了一个包罗3个工作协程的协程池,并提交了10个任务。每个任务打印一个消息并模拟1秒的工作。最后,我们期待所有任务完成并打印完成消息。
这个简朴的协程池实现可以根据必要举行扩展,比方添加错误处理惩罚、任务超时、动态调整工作协程数量等功能。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 如何在Golang中实现协程池