Go编程模式三—Fan-Out模式与协程池结合

有这样一个需求,有一批学生,现在只有Name字段,需要根据Name字段做参数远程请求获取Score字段的值。

常规写法

type  User  struct {
    Name  string
    Score int64
}
//模拟远程调用数据
func Dodata(user *User) {
     user.Score  =  int64(len(user.Name))
}

func main() {
    s := CreatData()
    var wg sync.WaitGroup
    wg.Add(len(s)) //每个user开启一个协程处理
    for _, item := range s {
        go func(i *User) {
            defer wg.Done()
            Dodata(i)
        }(item)
    }
    wg.Wait()
    //得到数据后下一步处理
    for _, val := range s {
        fmt.Println(val.Score)
    }
}

当user数据为n时,需要开启n个协程去处理。代码不可控。能不能指定m个协程并发处理,类似协程池的思路。

func main() {
    s := CreatData()
    var wg sync.WaitGroup
    wg.Add(10)
    Handler(10, &wg, s, Dodata)//只开启10协程去处理
    wg.Wait()
    //得到数据后下一步处理
    for _, val := range s {
        fmt.Println(val.Score)
    }
}

func Handler(number int, wg *sync.WaitGroup, s []*User, workerFun func(*User)) {
    inch := make(chan *User, 0)
    //协程1:把需要处理的参数写入inch
    go func() {
        for _, item := range s {
            inch <- item
        }
        close(inch)
    }()
    //协程2:开启number个协程,同时读取inch的参数
    for i := 0; i < number; i++ {
        go func() {
            defer wg.Done()
            for item := range inch {
                workerFun(item)
            }
        }()
    }
}

假如需要读取的结果是一个回执Receipt,则需要创建一个resCh通道,worker请求数据后把结果写入这个通道。

//开启指定个协程处理数组,使用扇出方式处理
type User struct {
    Name string
}
type Receipt struct {
    Name  string
    Score int64
}
//模拟远程调用数据
func Dodata(user *User) *Receipt {
    var res Receipt
    res.Name = user.Name
    res.Score = int64(len(user.Name))
    return &res
}

func main() {
    s := CreatData()
    var wg sync.WaitGroup
    var resWg sync.WaitGroup
    resCh := make(chan *Receipt)
    //协程3:开启一个协程读取结果
    resWg.Add(1)
    go func() {
        defer resWg.Done()
        for item := range resCh {
            fmt.Println(item.Score)
        }
    }()
    wg.Add(10)
    Handler(10, &wg, s, resCh, Dodata)
    wg.Wait()
    close(resCh) //worker结束后需要及时关闭resCh
    resWg.Wait() //保证读取结果完整
}

func Handler(number int, wg *sync.WaitGroup, s []*User, resCh chan<- *Receipt, workerFun func(*User) *Receipt) {
    inch := make(chan *User, 0)
    //协程1:把需要处理的参数写入inch
    go func() {
        for _, item := range s {
            inch <- item
        }
        close(inch)
    }()
    //协程2:开启number个协程,同时读取参数并把结果写入resCh
    for i := 0; i < number; i++ {
        go func() {
            defer wg.Done()
            for item := range inch {
                res := workerFun(item)
                resCh <- res
            }
        }()
    }
}

FAN-OUT模式:

多个goroutine从同一个通道读取数据,直到该通道关闭。OUT是一种张开的模式,所以又被称为扇出,可以用来分发任务。

FAN-IN模式:

1个goroutine从多个通道读取数据,直到这些通道关闭。IN是一种收敛的模式,所以又被称为扇入,用来收集处理的结果。

本作品采用《CC 协议》,转载必须注明作者和本文链接
用过哪些工具?为啥用这个工具(速度快,支持高并发...)?底层如何实现的?
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!