分享
  1. 首页
  2. 文章

go语言的chan

richmonkey · · 23205 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

chan是一个FIFO队列,chan分成两种类型同步和异步
同步的chan完成发送者和接受者之间手递手传递元素的过程,必须要求对方的存在才能完成一次发送或接受
异步的chan发送和接受都是基于chan的缓存,但当缓存队列填满后,发送者就会进入发送队列, 当缓存队列为空时,接受者就会接入等待队列。

chan的数据结构:

struct	Hchan
{
 uintgo	qcount;			// total data in the q
 uintgo	dataqsiz;		// size of the circular q
 uint16	elemsize;
 uint16	pad;			// ensures proper alignment of the buffer that follows Hchan in memory
 bool	closed;
 Alg*	elemalg;		// interface for element type
 uintgo	sendx;			// send index
 uintgo	recvx;			// receive index
 WaitQ	recvq;			// list of recv waiters
 WaitQ	sendq;			// list of send waiters
 Lock;
};

chan发送

void
runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
{
 SudoG *sg;
 SudoG mysg;
 G* gp;
 int64 t0;
 
 if(c == nil) {
 	USED(t);
 	if(pres != nil) {
 		*pres = false;
 		return;
 	}
 	runtime·park(nil, nil, "chan send (nil chan)");
 	return; // not reached
 }
 
 if(debug) {
 	runtime·printf("chansend: chan=%p; elem=", c);
 	c->elemalg->print(c->elemsize, ep);
 	runtime·prints("\n");
 }
 
 t0 = 0;
 mysg.releasetime = 0;
 if(runtime·blockprofilerate > 0) {
 	t0 = runtime·cputicks();
 	mysg.releasetime = -1;
 }
 
 runtime·lock(c);
 if(raceenabled)
 	runtime·racereadpc(c, pc, runtime·chansend);
 if(c->closed)
 	goto closed;
 
 if(c->dataqsiz > 0)
 	goto asynch;
 
 sg = dequeue(&c->recvq);
 if(sg != nil) {
 	if(raceenabled)
 		racesync(c, sg);
 	runtime·unlock(c);
 
 	gp = sg->g;
 	gp->param = sg;
 	if(sg->elem != nil)
 		c->elemalg->copy(c->elemsize, sg->elem, ep);
 	if(sg->releasetime)
 		sg->releasetime = runtime·cputicks();
 	runtime·ready(gp);
 
 	if(pres != nil)
 		*pres = true;
 	return;
 }
 
 if(pres != nil) {
 	runtime·unlock(c);
 	*pres = false;
 	return;
 }
 
 mysg.elem = ep;
 mysg.g = g;
 mysg.selgen = NOSELGEN;
 g->param = nil;
 enqueue(&c->sendq, &mysg);
 runtime·park(runtime·unlock, c, "chan send");
 
 if(g->param == nil) {
 	runtime·lock(c);
 	if(!c->closed)
 		runtime·throw("chansend: spurious wakeup");
 	goto closed;
 }
 
 if(mysg.releasetime > 0)
 	runtime·blockevent(mysg.releasetime - t0, 2);
 
 return;
 
asynch:
 if(c->closed)
 	goto closed;
 
 if(c->qcount >= c->dataqsiz) {
 	if(pres != nil) {
 		runtime·unlock(c);
 		*pres = false;
 		return;
 	}
 	mysg.g = g;
 	mysg.elem = nil;
 	mysg.selgen = NOSELGEN;
 	enqueue(&c->sendq, &mysg);
 	runtime·park(runtime·unlock, c, "chan send");
 
 	runtime·lock(c);
 	goto asynch;
 }
 
 if(raceenabled)
 	runtime·racerelease(chanbuf(c, c->sendx));
 
 c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
 if(++c->sendx == c->dataqsiz)
 	c->sendx = 0;
 c->qcount++;
 
 sg = dequeue(&c->recvq);
 if(sg != nil) {
 	gp = sg->g;
 	runtime·unlock(c);
 	if(sg->releasetime)
 		sg->releasetime = runtime·cputicks();
 	runtime·ready(gp);
 } else
 	runtime·unlock(c);
 if(pres != nil)
 	*pres = true;
 if(mysg.releasetime > 0)
 	runtime·blockevent(mysg.releasetime - t0, 2);
 return;
 
closed:
 runtime·unlock(c);
 runtime·panicstring("send on closed channel");
}
  1. 判断队列类型,异步队列则转到5
  2. 从等待队列中获取等待队列中的接受者
  3. 如果取到接受者,则将对象直接传递给接受者,然后唤醒接受者,发送过程完成
  4. 如果未取到接受者,则将发送者enqueue到发送队列,发送者进入阻塞状态
  5. 异步队列首先判断队列缓存是否还有空间
  6. 如果缓存空间已满,则将发送者enqueue到发送队列,发送者进入阻塞状态
  7. 如果缓存空间未满,则将元素copy到缓存中,这时发送者就不会进入阻塞状态
  8. 尝试唤醒等待队列中的一个接受者

chan接受

void
runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received)
{
 SudoG *sg;
 SudoG mysg;
 G *gp;
 int64 t0;
 
 if(debug)
 	runtime·printf("chanrecv: chan=%p\n", c);
 
 if(c == nil) {
 	USED(t);
 	if(selected != nil) {
 		*selected = false;
 		return;
 	}
 	runtime·park(nil, nil, "chan receive (nil chan)");
 	return; // not reached
 }
 
 t0 = 0;
 mysg.releasetime = 0;
 if(runtime·blockprofilerate > 0) {
 	t0 = runtime·cputicks();
 	mysg.releasetime = -1;
 }
 
 runtime·lock(c);
 if(c->dataqsiz > 0)
 	goto asynch;
 
 if(c->closed)
 	goto closed;
 
 sg = dequeue(&c->sendq);
 if(sg != nil) {
 	if(raceenabled)
 		racesync(c, sg);
 	runtime·unlock(c);
 
 	if(ep != nil)
 		c->elemalg->copy(c->elemsize, ep, sg->elem);
 	gp = sg->g;
 	gp->param = sg;
 	if(sg->releasetime)
 		sg->releasetime = runtime·cputicks();
 	runtime·ready(gp);
 
 	if(selected != nil)
 		*selected = true;
 	if(received != nil)
 		*received = true;
 	return;
 }
 
 if(selected != nil) {
 	runtime·unlock(c);
 	*selected = false;
 	return;
 }
 
 mysg.elem = ep;
 mysg.g = g;
 mysg.selgen = NOSELGEN;
 g->param = nil;
 enqueue(&c->recvq, &mysg);
 runtime·park(runtime·unlock, c, "chan receive");
 
 if(g->param == nil) {
 	runtime·lock(c);
 	if(!c->closed)
 		runtime·throw("chanrecv: spurious wakeup");
 	goto closed;
 }
 
 if(received != nil)
 	*received = true;
 if(mysg.releasetime > 0)
 	runtime·blockevent(mysg.releasetime - t0, 2);
 return;
 
asynch:
 if(c->qcount <= 0) {
 	if(c->closed)
 		goto closed;
 
 	if(selected != nil) {
 		runtime·unlock(c);
 		*selected = false;
 		if(received != nil)
 			*received = false;
 		return;
 	}
 	mysg.g = g;
 	mysg.elem = nil;
 	mysg.selgen = NOSELGEN;
 	enqueue(&c->recvq, &mysg);
 	runtime·park(runtime·unlock, c, "chan receive");
 
 	runtime·lock(c);
 	goto asynch;
 }
 
 if(raceenabled)
 	runtime·raceacquire(chanbuf(c, c->recvx));
 
 if(ep != nil)
 	c->elemalg->copy(c->elemsize, ep, chanbuf(c, c->recvx));
 c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
 if(++c->recvx == c->dataqsiz)
 	c->recvx = 0;
 c->qcount--;
 
 sg = dequeue(&c->sendq);
 if(sg != nil) {
 	gp = sg->g;
 	runtime·unlock(c);
 	if(sg->releasetime)
 		sg->releasetime = runtime·cputicks();
 	runtime·ready(gp);
 } else
 	runtime·unlock(c);
 
 if(selected != nil)
 	*selected = true;
 if(received != nil)
 	*received = true;
 if(mysg.releasetime > 0)
 	runtime·blockevent(mysg.releasetime - t0, 2);
 return;
 
closed:
 if(ep != nil)
 	c->elemalg->copy(c->elemsize, ep, nil);
 if(selected != nil)
 	*selected = true;
 if(received != nil)
 	*received = false;
 if(raceenabled)
 	runtime·raceacquire(c);
 runtime·unlock(c);
 if(mysg.releasetime > 0)
 	runtime·blockevent(mysg.releasetime - t0, 2);
}
  1. 判断队列类型,如果是异步队列则转到5
  2. 从发送队列获取接受者
  3. 如果取到接受者,则直接从接受者获取元素,并唤醒发送者,接受过程完成
  4. 如果未取到接受者,则将自身enqueue到等待队列,阻塞goroutine等待发送者唤醒
  5. 异步队列首先判断队列缓存中是否有元素
  6. 缓存为空时,则将自身enqueue到等待队列,阻塞goroutine等待发送者唤醒
  7. 缓存非空时,取出缓存中的第一个元素
  8. 然后尝试唤醒发送队列中的一个发送者

有疑问加站长微信联系(非本文作者)

本文来自:博客园

感谢作者:richmonkey

查看原文:go语言的chan

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
23205 次点击
上一篇:go语言的GC
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏