分享
  1. 首页
  2. 文章

(golang学习)3. go线程、协程理解

沧浪水 · · 1371 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

1.进程、线程、协程区别

a.各自特点

参考《详细介绍 进程、线程和协程的区别

  • 进程:拥有自己独立的堆和栈,既不共享堆,也不共享栈,进程由操作系统调度;
  • 线程:拥有自己独立的栈和共享的堆,共享堆,不共享栈,标准线程由操作系统调度;
  • 协程:拥有自己独立的栈和共享的堆,共享堆,不共享栈,协程由程序员在协程的代码里显示调度。

协程与线程:
每个单位时间内,一个CPU只能处理一个线程(操作系统:thread),线程是CPU处理的单位或单元,底层资源占用中等(比进程少)。线程中程序的执行过程是:同步阻塞的(依次执行),非抢占式的(依代码编写顺序)。开发上比较清晰明了。
协程是"用户级"的线程,通过把线程的分段运行:主动暂停、主动运行,切换逻辑点,针对i/o请求可以节约连接、对方处理的中间环节等待时间,一个线程上可以跑多个协程。协程中的程序执行是触发、跳转的,异步非阻塞的(事件触发),抢占式的(线程挂起等待响应)。开发上很复杂。

channel信道,是go用于在线程间传递数据的,下面关于channel的例子观察线程与协程使用情况

b.上代码一:

使用一个无缓存channel时:

package main
import (
 "fmt"
 "time"
)
var waitc = make(chan int)
func routine(id int) {
 time.Sleep(time.Microsecond *200)
 fmt.Printf("this is routine %v before.\n", id)
 waitc <- id
 fmt.Printf("this is routine %v after.\n", id)
}
func main() {
 for i := 0; i < 5; i++ {
 go routine(i*i)
 }
 for i := 0; i < 5; i++ {
 fmt.Printf("--this is main routine %v before.\n", i)
 <-waitc
 fmt.Printf("--this is main routine %v after.\n", i)
 }
 time.Sleep(time.Microsecond *200)
}
/*
--this is main routine 0 before.
this is routine 1 before.
this is routine 1 after.
this is routine 9 before.
this is routine 16 before.
this is routine 4 before.
this is routine 0 before.
--this is main routine 0 after.
--this is main routine 1 before.
--this is main routine 1 after.
--this is main routine 2 before.
--this is main routine 2 after.
--this is main routine 3 before.
--this is main routine 3 after.
--this is main routine 4 before.
--this is main routine 4 after.
this is routine 4 after.
this is routine 9 after.
this is routine 16 after.
原文地址 https://blog.csdn.net/kjfcpua/article/details/18265441
-----------------------------------------------------------------------
解析:
遇到信道阻塞,循环取的、继续循环、跳过当前,阻塞的执行完毕、继续循环、直到完成
原理上与 yield中断相同
*/

百度查找关于go的多线程,写法也跟协程没有明显区别。参照上面特点的话,线程部分:go func(){}()另起一个线程,变量继承当前父进程/主线程、运行空间为{}、内部顺序执行,如果有数据流阻塞;协程部分:对线程添加异步代码,实现事件驱动的执行状态切换,运行空间为当前线程、数据流驱动(输出、输入)不阻塞。
参考 上官二狗《Go 缓冲 channel 和 非缓冲 channel 的区别

c、代码二:

使用一个缓存channel + 一个无缓存channel时:

package main
import (
 "fmt"
 "math/rand"
 "strconv"
 "time"
)
type array2j struct {
 a []string
 b string
}
func main() {
 ch := make(chan string, 3)
 c2 := make(chan string)
 var queue array2j
 for i:=1; i<=5; i++ {
 go func(i int) {
 time.Sleep(time.Duration(rand.Intn(100)) * time.Microsecond)
 fmt.Println("go func:" + strconv.Itoa(i))
 ch <- strconv.Itoa(i) + "_ch_" + strconv.Itoa(rand.Int())
 }(i)
 }
 for j:=1; j<=2; j++ {
 go func() {
 time.Sleep(1 * time.Second)
 ch <- "c2"
 }()
 }
 //等下,切到其他线程
 time.Sleep(1 * time.Second)
 for {
 select {
 case a,e := <-ch:
 fmt.Println(a,e)
 queue.a = append(queue.a, a)
 case b,e := <-c2:
 fmt.Println(b,e)
 queue.b = b
 }
 //下面这快代码是不报错的关键:当所有信道为空时退出循环
 if len(ch) + len(c2) == 0 {
 fmt.Println("queue", queue)
 break
 }
 //执行不到
 res, err := <- ch
 fmt.Println(res, err)
 }
 fmt.Println("hello go!")
}
/**
go func:1
go func:4
go func:2
go func:3
go func:5
1_ch_5577006791947779410 true
4_ch_8674665223082153551 true
2_ch_6129484611666145821 true
3_ch_4037200794235010051 true
5_ch_3916589616287113937 true
c2 true
c2 true
queue {[1_ch_5577006791947779410 4_ch_8674665223082153551 2_ch_6129484611666145821 3_ch_4037200794235010051 5_ch_3916589616287113937 c2 c2] }
hello go!
*/

这里信道ch宽度是3,有5个线程输入-对应将有5个输出,运行流出正常。说明:有缓存、超量时会自动阻塞,当读取完其中数值时,又可以继续写入。

2.应用测试

a、数据库的批量写入

实际操作只有看到线程和异步, 协程是线程的一个异步表现。
准备测试环境,使用php进行建表、生成10w测试数据sql.data的准备略。

package main
import (
 "database/sql"
 _ "github.com/go-sql-driver/mysql"
 "fmt"
 "io"
 "os"
 "path"
 "strconv"
 _ "strings"
 "bufio"
 _ "fmt"
 _ "io"
 _ "io/ioutil"
 "time"
)
func ReadFile(filePath string, handle func(string)) error {
 f, err := os.Open(filePath)
 defer f.Close()
 if err != nil {
 return err
 }
 //255*100 测试行大于4K时读取被截断 [:4K]
 buf := bufio.NewReaderSize(f, 25500)
 for {
 line, _, err := buf.ReadLine()
 statistic.readLine++
 handle(string(line))
 if err != nil {
 if err == io.EOF{
 //fmt.Println( "io.EOF:", err, string(line))
 return nil
 }
 return err
 }
 //return nil
 }
}
func buildQuery(line string){
 if len(line) == 0 {
 //结尾
 query := curSql[:len(curSql)-1]
 fmt.Println( "==> EOF队列:" + strconv.Itoa(statistic.sqlLineNum), line)
 go execQuery(query)
 }else{
 newSql := curSql + "(" + line +"),"
 if len(newSql) > maxSqlLen {
 query := curSql[:len(curSql)-1]
 fmt.Println( "任务队列:" + strconv.Itoa(statistic.sqlLineNum))
 go execQuery(query)
 //回归
 curSql = sqlBuild + "(" + line +"),"
 }else{
 curSql = newSql
 }
 }
}
func execQuery(query string) {
 statistic.sqlLineNum++
 res, err := myDb.Exec(query) //Result
 if err != nil {
 fmt.Println(err.Error()) //显示异常
 panic(err.Error()) //抛出异常
 }
 re, err := res.RowsAffected() //int64, error
 if err != nil {
 fmt.Println(err.Error()) //显示异常
 fmt.Println(err) //抛出异常
 }
 string := strconv.FormatInt(re, 10)
 rows, err := strconv.Atoi(string)
 if err != nil {
 fmt.Println(err) //抛出异常
 }
 channelResult <- rows
}
type statistics struct {
 execDoneNum int
 sqlLineNum int
 chanRecNum int
 readLine int
}
var sqlBuild string
var curSql string
var myDb *sql.DB
var maxSqlLen = 1024*1024*2
var statistic statistics = statistics{0,0,0,0}
//容器mysql的最大连接数是150 (200崩溃)
var channelResult = make(chan int, 20)
func main(){
 var err error
 myDb, err = sql.Open("mysql", "root:123456@tcp(172.1.11.11:3306)/testdb?charset=utf8")
 if err != nil {
 fmt.Println(err.Error()) //显示异常
 panic(err.Error()) //抛出异常
 }
 defer myDb.Close()
 var count int
 rows, err := myDb.Query("SELECT COUNT(id) as count FROM t10_5")
 if err != nil {
 fmt.Println(err.Error()) //显示异常
 panic(err.Error()) //抛出异常
 }
 for rows.Next() {
 rows.Scan(&count)
 }
 fmt.Println(count)
 fmt.Println()
 //初始化sql
 sqlBuild = "INSERT INTO `t10_5` ("
 for i:=1; i<100; i++ {
 sqlBuild += "`field_"+ strconv.Itoa(i) +"`,"
 }
 sqlBuild = sqlBuild[:len(sqlBuild)-1] + ") VALUES "
 pwd, _ := os.Getwd()
 dataPath := path.Join(pwd, "sql.data")
 fmt.Println(dataPath)
 curSql = sqlBuild;
 ReadFile(dataPath, buildQuery)
 time.Sleep(time.Second)
 for {
 x, ok := <- channelResult
 statistic.execDoneNum += x
 statistic.chanRecNum++
 fmt.Println(statistic.sqlLineNum, statistic.execDoneNum, ok, len(channelResult))
 if len(channelResult)==0 {
 print("---------- 完成 ----------")
 fmt.Println(statistic.execDoneNum, statistic.sqlLineNum)
 break
 }
 }
 fmt.Println("chanRecNum=", statistic.chanRecNum, "sqlLineNum=", statistic.sqlLineNum, "readLine=", statistic.readLine, statistic.execDoneNum, len(channelResult))
 var count2 int
 rows, err = myDb.Query("SELECT COUNT(id) as count FROM t10_5")
 if err != nil {
 fmt.Println(err.Error()) //显示异常
 panic(err.Error()) //抛出异常
 }
 for rows.Next() {
 rows.Scan(&count2)
 }
 fmt.Println(count2)
 fmt.Println(count2-count, statistic.execDoneNum, "缺失行:", count2-count-statistic.execDoneNum)
}
/**
...
523 100000 true 0
---------- 完成 ----------100000 523
chanRecNum= 523 sqlLineNum= 523 readLine= 100001 100000 0
100000
100000 100000 缺失行: 0
...
523 100000 true 0
---------- 完成 ----------100000 523
chanRecNum= 523 sqlLineNum= 523 readLine= 100001 100000 0
200000
100000 100000 缺失行: 0
real 1m4.293s
user 0m28.764s
sys 0m1.944s
 */

大量写入在主从库时,会占用大量内存,导致主机多次内存和磁盘空间不足,需要注意。


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:沧浪水

查看原文:(golang学习)3. go线程、协程理解

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1371 次点击
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏