分享
  1. 首页
  2. 文章

用goroutine和channel实现master-worker模式

初级赛亚人 · · 3066 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

什么是master-worker模式?

如果你做过java服务端,那你对master-worker模式一定并不陌生,它是一种并行模式。

master-worker模式的主要作用是将master任务分别调度到多个worker上进行计算,计算之后将结果返回到master中进行合并整理。目的是减少一个进程对master中的任务处理的压力。

master-worker模式流程图:



图源于网络

master-worker详细流程图:


图源于网络

golang实现master-worker模式

worker

package worker

import (

"fmt"

)

//封装需要处理的数据结构

type Job struct {

num int

}

func NewJob(num int) Job {

return Job{num: num}

}

type Worker struct {

id int //workerID

WorkerPool chan chan Job //worker池

JobChannel chan Job //worker从JobChannel中获取Job进行处理

Result map[interface{}]int //worker将处理结果放入reuslt

quit chan bool //停止worker信号

}

func NewWorker(workerPool chan chan Job, result map[interface{}]int, id int) Worker {

return Worker{

id: id,

WorkerPool: workerPool,

JobChannel: make(chan Job),

Result: result,

quit: make(chan bool),

}

}

func (w Worker) Start() {

go func() {

for {

//将worker的JobChannel放入master的workerPool中

w.WorkerPool <- w.JobChannel

select {

//从JobChannel中获取Job进行处理,JobChannel是同步通道,会阻塞于此

case job := <-w.JobChannel:

//处理这个job

//并将处理得到的结果存入master中的结果集

x := job.num * job.num

fmt.Println(w.id, ":", x)

w.Result[x] = w.id

//停止信号

case <-w.quit:

return

}

}

}()

}

func (w Worker) Stop() {

go func() {

w.quit <- true

}()

}

master

package master

import (

"MasterWorkerPattern/worker"

)

type Master struct {

WorkerPool chan chan worker.Job //worker池

Result map[interface{}]int //存放worker处理后的结果集

jobQueue chan worker.Job //待处理的任务chan

workerList []worker.Worker //存放worker列表,用于停止worker

}

var maxworker int

//maxWorkers:开启线程数

//result :结果集

func NewMaster(maxWorkers int, result map[interface{}]int) *Master {

pool := make(chan chan worker.Job, maxWorkers)

maxworker = maxWorkers

return &Master{WorkerPool: pool, Result: result, jobQueue: make(chan worker.Job, 2*maxWorkers)}

}

func (m *Master) Run() {

//启动所有的Worker

for i := 0; i < maxworker; i++ {

work := worker.NewWorker(m.WorkerPool, m.Result, i)

m.workerList = append(m.workerList, work)

work.Start()

}

go m.dispatch()

}

func (m *Master) dispatch() {

for {

select {

case job := <-m.jobQueue:

go func(job worker.Job) {

//从workerPool中取出一个worker的JobChannel

jobChannel := <-m.WorkerPool

//向这个JobChannel中发送job,worker中的接收配对操作会被唤醒

jobChannel <- job

}(job)

}

}

}

//添加任务到任务通道

func (m *Master) AddJob(num int) {

job := worker.NewJob(num)

//向任务通道发送任务

m.jobQueue <- job

}

//停止所有任务

func (m *Master) Stop() {

for _, v := range m.workerList {

v.Stop()

}

}

test

// MasterWorkerPattern project main.go

package main

import (

"MasterWorkerPattern/master"

"fmt"

"time"

)

func main() {

result := map[interface{}]int{}

mas := master.NewMaster(4, result)

mas.Run()

for i := 0; i < 10; i++ {

mas.AddJob(i)

}

time.Sleep(time.Millisecond)

//mas.Stop()

fmt.Println("result=", result)

}

运行结果

0 : 81

3 : 0

0 : 4

0 : 36

0 : 25

0 : 49

0 : 64

1 : 1

3 : 9

2 : 16

result= map[81:0 36:0 64:0 1:1 0:3 4:0 25:0 49:0 9:3 16:2]

这个运行结果是不确定的,在worker.go中有一行这样的代码:

x:=job.num*job.num

fmt.Println(w.id,":",x),打印的是workerID和Job中num的平方,从最后的测试结果中可以看出id=0的这个worker计算了很多个Job,而其他的都很少,这并没什么影响,执行多次完全可能是不同的结果。另外,开启多少个worker来为master工作比较好?如果你是2核,建议2-4个;如果你是4核,建议4-8,即i-2*i个,尽量用压榨CPU。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:初级赛亚人

查看原文:用goroutine和channel实现master-worker模式

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
3066 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏