分享
  1. 首页
  2. 文章

使用 Go 每分钟处理百万请求

大宝dayday见 · · 1241 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

代码:

package main
import (
"fmt"
"log"
"net/http"
"time"
)
const (
 MaxWorker = 100 //随便设置值
 MaxQueue = 200 // 随便设置值
)
// 一个可以发送工作请求的缓冲 channel
var JobQueue chan Job
func init() {
 JobQueue = make(chan Job, MaxQueue)
}
type Payload struct{}
type Job struct {
 PayLoad Payload
}
type Worker struct {
 WorkerPool chan chan Job
 JobChannel chan Job
 quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
 return Worker{
 WorkerPool: workerPool,
 JobChannel: make(chan Job),
 quit: make(chan bool),
 }
}
// Start 方法开启一个 worker 循环,监听退出 channel,可按需停止这个循环
func (w Worker) Start() {
 go func() {
 for {
 // 将当前的 worker 注册到 worker 队列中
 w.WorkerPool <- w.JobChannel
 select {
 case job := <-w.JobChannel:
 // 真正业务的地方
 // 模拟操作耗时
 time.Sleep(500 * time.Millisecond)
 fmt.Printf("上传成功:%v\n", job)
 case <-w.quit:
 return
 }
 }
 }()
}
func (w Worker) stop() {
 go func() {
 w.quit <- true
 }()
}
// 初始化操作
type Dispatcher struct {
 // 注册到 dispatcher 的 worker channel 池
 WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
 pool := make(chan chan Job, maxWorkers)
 return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
 // 开始运行 n 个 worker
 for i := 0; i < MaxWorker; i++ {
 worker := NewWorker(d.WorkerPool)
 worker.Start()
 }
 go d.dispatch()
}
func (d *Dispatcher) dispatch() {
 for {
 select {
 case job := <-JobQueue:
 go func(job Job) {
 // 尝试获取一个可用的 worker job channel,阻塞直到有可用的 worker
 jobChannel := <-d.WorkerPool
 // 分发任务到 worker job channel 中
 jobChannel <- job
 }(job)
 }
 }
}
// 接收请求,把任务筛入JobQueue。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
 work := Job{PayLoad: Payload{}}
 JobQueue <- work
 _, _ = w.Write([]byte("操作成功"))
}
func main() {
 // 通过调度器创建worker,监听来自 JobQueue的任务
 d := NewDispatcher(MaxWorker)
 d.Run()
 http.HandleFunc("/payload", payloadHandler)
 log.Fatal(http.ListenAndServe(":8099", nil))
}

结语:
最终采用的是两级 channel,一级是将用户请求数据放入到 chan Job 中,这个 channel job 相当于待处理的任务队列。

另一级用来存放可以处理任务的 work 缓存队列,类型为 chan chan Job。调度器把待处理的任务放入一个空闲的缓存队列当中,work 会一直处理它的缓存队列。通过这种方式,实现了一个 worker 池。大致画了一个图帮助理解,
使用 Go 每分钟处理百万请求
首先我们在接收到一个请求后,创建 Job 任务,把它放入到任务队列中等待 work 池处理。


func payloadHandler(w http.ResponseWriter, r *http.Request) {
 job := Job{PayLoad: Payload{}}
 JobQueue <- work
 _, _ = w.Write([]byte("操作成功"))
}

调度器初始化work池后,在 dispatch 中,一旦我们接收到 JobQueue 的任务,就去尝试获取一个可用的 worker,分发任务给 worker 的 job channel 中。 注意这个过程不是同步的,而是每接收到一个 job,就开启一个 G 去处理。这样可以保证 JobQueue 不需要进行阻塞,对应的往 JobQueue 理论上也不需要阻塞地写入任务。


func (d *Dispatcher) Run() {
 // 开始运行 n 个 worker
 for i := 0; i < MaxWorker; i++ {
 worker := NewWorker(d.WorkerPool)
 worker.Start()
 }
 go d.dispatch()
}
func (d *Dispatcher) dispatch() {
 for {
 select {
 case job := <-JobQueue:
 go func(job Job) {
 // 尝试获取一个可用的 worker job channel,阻塞直到有可用的 worker
 jobChannel := <-d.WorkerPool
 // 分发任务到 worker job channel 中
 jobChannel <- job
 }(job)
 }
 }
}

附录:http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/


有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:大宝dayday见

查看原文:使用 Go 每分钟处理百万请求

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1241 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏