分享
  1. 首页
  2. 文章

golang的多协程实践

liughost · · 1996 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

go语言以优异的并发特性而闻名,刚好手上有个小项目比较适合。

项目背景:

公司播控平台的数据存储包括MySQL和ElasticSearch(ES)两个部分,编辑、运营的数据首先保存在MySQL中,为了实现模糊搜索和产品关联推荐,特别增加了ES,ES中保存的是节目集的基本信息。

本项目是为了防止实时同步数据出现问题或者系统重新初始化时的全量数据同步而做的。项目主要是从MySQL读取所有的节目集数据写入到ES中。

项目特点:

因为节目集数量较大,不能一次性的读入内存,因此每次读出一部分记录写入ES。ORM使用的是beego。为了提高性能使用了协程,其中读MySQL的部分最大开启20个协程,ES写入部分开启了15个协程(因为ES分片设置的问题,5个协程和15个协程性能映像不大)。

项目主要包括三个文件:

1、PrdES_v3.go,项目的入口,负责协调MySQL读取和ES写入。

package main
import (
 "./db"
 "./es"
 //"encoding/json"
 "fmt"
 "reflect"
 "time"
)
type PrdES struct {
 DB *prd.Mysql
 ES *es.Elastic
}
// func (this *PrdES) Handle(result []*prd.Series) {
// // for _, value := range result {
// // this.DB.FormatData(value)
// // //json, _ := json.Marshal(value)
// // //fmt.Println(string(json))
// // }
// //写入ES,以多线程的方式执行,最多保持5个线程
// this.ES.DoBulk(result)
// }
func (this *PrdES) Run() {
 count := 50
 offset := 0
 maxCount := 20
 //create channel
 chs := make([]chan []*prd.Series, maxCount)
 selectCase := make([]reflect.SelectCase, maxCount)
 for i := 0; i < maxCount; i++ {
 offset = count * i
 fmt.Println("offset:", offset)
 //init channel
 chs[i] = make(chan []*prd.Series)
 //set select case
 selectCase[i].Dir = reflect.SelectRecv
 selectCase[i].Chan = reflect.ValueOf(chs[i])
 //运行
 go this.DB.GetData(offset, count, chs[i])
 }
 var result []*prd.Series
 for {
 //wait data return
 chosen, recv, ok := reflect.Select(selectCase)
 if ok {
 fmt.Println("channel id:", chosen)
 result = recv.Interface().([]*prd.Series)
 //读取数据从mysql
 go this.DB.GetData(offset, count, chs[chosen])
 //写入ES,以多线程的方式执行,最多保持15个线程
 this.ES.DoBulk(result)
 //update offset
 offset = offset + len(result)
 //判断是否到达数据尾部,最后一次查询
 if len(result) < count {
 fmt.Println("read end of DB")
 //等所有的任务执行完毕
 this.ES.Over()
 fmt.Println("MySQL Total:", this.DB.GetTotal(), ",Elastic Total:", this.ES.GetTotal())
 return
 }
 }
 }
}
func main() {
 s := time.Now()
 fmt.Println("start")
 pe := new(PrdES)
 pe.DB = prd.NewDB()
 pe.ES = es.NewES()
 //fmt.Println("mysql info:")
 //fmt.Println("ES info:")
 pe.Run()
 fmt.Println("time out:", time.Since(s).Seconds(), "(s)")
 fmt.Println("Over!")
}

在run函数里可以看到使用了reflect.SelectCase,使用reflect.SelectCase的原因是读MySQL数据是多个协程,不可预计哪个会首先返回,selectCase是任何一个处理完毕reflect.Select函数就会返回,MySQL读取的数据放在channel中宕Select函数返回时chosen, recv, ok := reflect.Select(selectCase)判断ok是否未true chosen代表的是协程id通过result = recv.Interface().([]*prd.Series)获得返回的数据,因为MySQL读取的数据是对象的结果集,因次使用recv.Interface函数,如果是简单类型可以使用recv.recvInt(),recv.recvString()等函数直接获取channel返回数据。

这里通过counter控制协程的数量,也可以通过channel,用select的方式控制协程的数量,之所以用counter计数器的方式控制协程数量是我想知道同时有多少协程在运行。

注:此处channel可以不用创建数组形式,channel带回来的数据也没有顺序问题。

2、es.go,负责写入ES和es的写入协程调度

package es
import (
 "../db"
 //"encoding/json"
 "fmt"
 elastigo "github.com/mattbaird/elastigo/lib"
 //elastigo "github.com/Uncodin/elastigo/lib"
 //"github.com/Uncodin/elastigo/core"
 "time"
 //"bytes"
 "flag"
 "sync"
 //"github.com/fatih/structs"
)
var (
 //开发测试库
 //host = flag.String("host", "192.168.1.236", "Elasticsearch Host")
 //C平台线上
 host = flag.String("host", "192.168.100.23", "Elasticsearch Host")
 port = flag.String("port", "9200", "Elasticsearch port")
)
//indexor := core.NewBulkIndexorErrors(10, 60)
// func init() {
// //connect to elasticsearch
// fmt.Println("connecting es")
// //api.Domain = *host //"192.168.1.236"
// //api.Port = "9300"
// }
//save thread count
var counter int
type Elastic struct {
 //Seq int64
 c *elastigo.Conn
 lock *sync.Mutex
 lockTotal *sync.Mutex
 wg *sync.WaitGroup
 total int64
}
func (this *Elastic) Conn() {
 this.c = elastigo.NewConn()
 this.c.Domain = *host
 this.c.Port = *port
 //NewClient(fmt.Sprintf("%s:%d", *host, *port))
}
func (this *Elastic) CreateLock() {
 this.lock = &sync.Mutex{}
 this.lockTotal = &sync.Mutex{}
 this.wg = &sync.WaitGroup{}
 counter = 0
 this.total = 0
}
func NewES() (es *Elastic) {
 //connect elastic
 es = new(Elastic)
 es.Conn()
 //create lock
 es.CreateLock()
 return es
}
func (this *Elastic) DoBulk(series []*prd.Series) {
 for true {
 this.lock.Lock()
 if counter < 25 {
 //跳出,执行任务
 break
 } else {
 this.lock.Unlock()
 //等待100毫秒
 //fmt.Println("wait counter less than 25, counter:", counter)
 time.Sleep(1e8)
 }
 }
 this.lock.Unlock()
 //执行任务
 go this.bulk(series, this.lock)
}
func (this *Elastic) Over() {
 this.wg.Wait()
 /*for {
 this.lock.Lock()
 if counter <= 0 {
 this.lock.Unlock()
 break
 }
 this.lock.Unlock()
 }
 */
}
func (this *Elastic) GetTotal() (t int64) {
 this.lockTotal.Lock()
 t = this.total
 this.lockTotal.Unlock()
 return t
}
func (this *Elastic) bulk(series []*prd.Series, lock *sync.Mutex) (succCount int64) {
 //增加计数器
 this.wg.Add(1)
 //减少计数器
 defer this.wg.Done()
 //加计数器
 lock.Lock()
 counter++
 fmt.Println("add task, coutner:", counter)
 lock.Unlock()
 //设置初始成功写入的数量
 succCount = 0
 for _, value := range series {
 //json, _ := json.Marshal(value)
 //fmt.Println(string(json))
 if value.ServiceGroup != nil {
 fmt.Println("series code:", value.Code, ",ServiceGroup:", value.ServiceGroup)
 resp, err := this.c.Index("guttv", "series", value.Code, nil, *value)
 if err != nil {
 panic(err)
 } else {
 //fmt.Println(value.Code + " write to ES succsessful!")
 fmt.Println(resp)
 succCount++
 }
 } else {
 fmt.Println("series code:", value.Code, "service group is null")
 }
 }
 //计数器减一
 lock.Lock()
 counter--
 fmt.Println("reduce task, coutner:", counter, ",success count:", succCount)
 lock.Unlock()
 this.lockTotal.Lock()
 this.total = this.total + succCount
 this.lockTotal.Unlock()
 return succCount
}

在es.go里有两把锁lock和lockTotal,前者是针对counter变量,记录es正在写入的协程数量的;后者为记录总共写入多少条记录到es而增加的。

这里必须要提到的是Over函数,初步使用协程的容易忽略。golang的原则是当main函数运行结束后,所有正在运行的协程都会终止,因袭在MySQL读取数据完毕必须调用Over函数,等待所有的协程结束。这里使用sync.waiGrooup。每次协程启动执行下面两个语句:

//增加计数器
this.wg.Add(1)
//减少计数器,函数结束时自动执行
defer this.wg.Done()
Over函数中调用
wg.Wait()等待计数器为0时返回,否则就一直阻塞。当然读者也可以看到通过检查counter是否小于等于0也可以判断协程是否都结束(Over函数被注释的部分),显然使用waitGroup更优雅和高效。

3、db.go,负责MySQL数据的读取

package prd
import (
 "fmt"
 "github.com/astaxie/beego/orm"
 _ "github.com/go-sql-driver/mysql" // import your used driver
 "strings"
 "sync"
 "time"
)
func init() {
 
 orm.RegisterDataBase("default", "mysql", "@tcp(192.168.100.3306)/guttv_vod?charset=utf8", 30)
 orm.RegisterModelWithPrefix("t_", new(Series), new(Product), new(ServiceGroup))
 orm.RunSyncdb("default", false, false)
}
type Mysql struct {
 sql string
 total int64
 lock *sync.Mutex
}
func (this *Mysql) New() {
 //this.sql = "SELECT s.*, p.code ProductCode, p.name pName FROM guttv_vod.t_series s inner join guttv_vod.t_product p on p.itemcode=s.code and p.isdelete=0 limit ?,?"
 this.sql = "SELECT s.*, p.code ProductCode, p.name pName FROM guttv_vod.t_series s , guttv_vod.t_product p where p.itemcode=s.code and p.isdelete=0 limit ?,?"
 this.total = 0
 this.lock = &sync.Mutex{}
}
func NewDB() (db *Mysql) {
 db = new(Mysql)
 db.New()
 return db
}
func (this *Mysql) GetTotal() (t int64) {
 t = 0
 this.lock.Lock()
 t = this.total
 this.lock.Unlock()
 return t
}
func (this *Mysql) toTime(toBeCharge string) int64 {
 timeLayout := "2006年01月02日 15:04:05"
 loc, _ := time.LoadLocation("Local")
 theTime, _ := time.ParseInLocation(timeLayout, toBeCharge, loc)
 sr := theTime.Unix()
 if sr < 0 {
 sr = 0
 }
 return sr
}
func (this *Mysql) getSGCode(seriesCode string) (result []string, num int64) {
 sql := "select distinct ref.servicegroupcode code from t_servicegroup_reference_category ref "
 sql = sql + "left join t_category_product cp on cp.categorycode=ref.categorycode "
 sql = sql + "left join t_package pkg on pkg.code = cp.assetcode "
 sql = sql + "left join t_package_product pp on pp.parentcode=pkg.code "
 sql = sql + "left join t_product prd on prd.code = pp.assetcode "
 sql = sql + "where prd.itemcode=?"
 o := orm.NewOrm()
 var sg []*ServiceGroup
 num, err := o.Raw(sql, seriesCode).QueryRows(&sg)
 if err == nil {
 //fmt.Println(num)
 for _, value := range sg {
 //fmt.Println(value.Code)
 result = append(result, value.Code)
 }
 } else {
 fmt.Println(err)
 }
 //fmt.Println(result)
 return result, num
}
func (this *Mysql) formatData(value *Series) {
 //设置业务分组数据
 sg, _ := this.getSGCode(value.Code)
 //fmt.Println(sg)
 value.ServiceGroup = []string{}
 value.ServiceGroup = sg[0:]
 //更改OnlineTime为整数
 value.OnlineTimeInt = this.toTime(value.OnlineTime)
 //分解地区
 value.OriginalCountryArr = strings.Split(value.OriginalCountry, "|")
 //分解二级分类
 value.ProgramType2Arr = strings.Split(value.ProgramType2, "|")
 //写入记录内容
 value.Description = strings.Replace(value.Description, "\n", "", -1)
}
func (this *Mysql) GetData(offset int, size int, ch chan []*Series) {
 var result []*Series
 o := orm.NewOrm()
 num, err := o.Raw(this.sql, offset, size).
 QueryRows(&result)
 if err != nil {
 fmt.Println("read DB err")
 panic(err)
 //return //err, nil
 }
 for _, value := range result {
 this.formatData(value)
 //json, _ := json.Marshal(value)
 //fmt.Println(string(json))
 //fmt.Println(value.ServiceGroup)
 }
 this.lock.Lock()
 this.total += num
 this.lock.Unlock()
 fmt.Println("read count :", num) //, "Total:", Total)
 //return nil, result
 ch <- result
}

从项目上看。go语言开发还是比较简洁的,多协程实现也相对容易,但要求开发者必须对概念非常清晰,像select和selectCase理解和defer的理解要很到位,个人层经做过多年的C/C++程序员,从经验上看,C/C++的经验(多线程的理解)对运用go语言还是很有帮助的。


有疑问加站长微信联系(非本文作者)

本文来自:博客园

感谢作者:liughost

查看原文:golang的多协程实践

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1996 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏