分享
  1. 首页
  2. 文章

Golang实现简单爬虫框架(4)——队列实现并发任务调度

盐的甜 · · 1499 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

前言

在上一篇文章《Golang实现简单爬虫框架(3)——简单并发版》中我们实现了一个最简单并发爬虫,调度器为每一个Request创建一个goroutine,每个goroutineWorker队列中分发任务,发完就结束。所有的Worker都在抢一个channel中的任务。但是这样做还是有些许不足之处,比如控制力弱:所有的Worker在抢同一个channel中的任务,我们没有办法控制给哪一个worker任务。

其实我们可以自己做一个任务分发的机制,我们来决定分发给哪一个Worker

注意:本次并发是在上一篇文章简单并发实现的基础上修改,所以没有贴出全部代码,只是贴出部分修改部分,要查看完整项目代码,可以查看上篇文章,或者从github下载项目源代码查看

1、项目架构

在上一篇文章实现简单并发的基础上,我们修改下Scheduler的任务分发机制

image
  • Scheduler接收到一个Request后,不能直接发给Worker,也不能为每个Request创建一个goroutine,所以这里使用一个Request队列
  • 同时我们想对Worker实现一个更多的控制,可以决定把任务分发给哪一个Worker,所以这里我们还需要一个Worker队列
  • 当有了RequestWorker,我们就可以把选择的Request发送给选择的Worker

2、队列实现任务调度器

在scheduler目录下创建queued.go文件

package scheduler
import "crawler/engine"
// 使用队列来调度任务
type QueuedScheduler struct {
 requestChan chan engine.Request // Request channel
 // Worker channel, 其中每一个Worker是一个 chan engine.Request 类型
 workerChan chan chan engine.Request 
}
// 提交请求任务到 requestChannel
func (s *QueuedScheduler) Submit(request engine.Request) {
 s.requestChan <- request
}
func (s *QueuedScheduler) ConfigMasterWorkerChan(chan engine.Request) {
 panic("implement me")
}
// 告诉外界有一个 worker 可以接收 request
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
 s.workerChan <- w
}
func (s *QueuedScheduler) Run() {
 // 生成channel
 s.workerChan = make(chan chan engine.Request)
 s.requestChan = make(chan engine.Request)
 go func() {
 // 创建请求队列和工作队列
 var requestQ []engine.Request
 var workerQ []chan engine.Request
 for {
 var activeWorker chan engine.Request
 var activeRequest engine.Request
 
 // 当requestQ和workerQ同时有数据时
 if len(requestQ) > 0 && len(workerQ) > 0 {
 activeWorker = workerQ[0]
 activeRequest = requestQ[0]
 }
 
 select {
 case r := <-s.requestChan: // 当 requestChan 收到数据
 requestQ = append(requestQ, r)
 case w := <-s.workerChan: // 当 workerChan 收到数据
 workerQ = append(workerQ, w)
 case activeWorker <- activeRequest: // 当请求队列和认读队列都不为空时,给任务队列分配任务
 requestQ = requestQ[1:]
 workerQ = workerQ[1:]
 }
 }
 }()
}

3、爬虫引擎

修改后的concurrent.go文件如下

package engine
import (
 "log"
)
// 并发引擎
type ConcurrendEngine struct {
 Scheduler Scheduler
 WorkerCount int
}
// 任务调度器
type Scheduler interface {
 Submit(request Request) // 提交任务
 ConfigMasterWorkerChan(chan Request)
 WorkerReady(w chan Request)
 Run()
}
func (e *ConcurrendEngine) Run(seeds ...Request) {
 out := make(chan ParseResult)
 e.Scheduler.Run()
 // 创建 goruntine
 for i := 0; i < e.WorkerCount; i++ {
 createWorker(out, e.Scheduler)
 }
 // engine把请求任务提交给 Scheduler
 for _, request := range seeds {
 e.Scheduler.Submit(request)
 }
 itemCount := 0
 for {
 // 接受 Worker 的解析结果
 result := <-out
 for _, item := range result.Items {
 log.Printf("Got item: #%d: %v\n", itemCount, item)
 itemCount++
 }
 // 然后把 Worker 解析出的 Request 送给 Scheduler
 for _, request := range result.Requests {
 e.Scheduler.Submit(request)
 }
 }
}
func createWorker(out chan ParseResult, s Scheduler) {
 // 为每一个Worker创建一个channel
 in := make(chan Request)
 go func() {
 for {
 s.WorkerReady(in) // 告诉调度器任务空闲
 request := <-in
 result, err := worker(request)
 if err != nil {
 continue
 }
 out <- result
 }
 }()
}

4、main函数

package main
import (
 "crawler/engine"
 "crawler/scheduler"
 "crawler/zhenai/parser"
)
func main() {
 e := engine.ConcurrendEngine{
 Scheduler: &scheduler.QueuedScheduler{},// 这里调用并发调度器
 WorkerCount: 50,
 }
 e.Run(engine.Request{
 Url: "http://www.zhenai.com/zhenghun",
 ParseFunc: parser.ParseCityList,
 })
}

运行结果如下:

image

5、总结

在这篇文章中我们使用队列实现对并发任务的调度,从而实现了对Worker的控制。我们现在并发有两种实现方式,但是他们的调度方法是不同的,为了代码的统一,所以在下一篇文章中的内容有:

  • 对项目做一个同构
  • 添加数据的存储模块。

如果想获取Google工程师深度讲解go语言视频资源的,可以在评论区留下邮箱。

项目的源代码已经托管到Github上,对于各个版本都有记录,欢迎大家查看,记得给个star,在此先谢谢大家

如果觉得博客不错,劳烦大人给个赞,


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:盐的甜

查看原文:Golang实现简单爬虫框架(4)——队列实现并发任务调度

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1499 次点击
下一篇:MAY-Lesson2
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏