分享
  1. 首页
  2. 文章

源码分析nats request reply的设计实现

rfyiamcool · · 1820 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

前言:

Golang nats mq是一个基于golang实现的高性能消息队列,nats也被cncf纳入到云原生计算基金会。据我所知,nats mq貌似在gopher里很有名气,但别的语言系的朋友不关注这个。

我用nats也有段时间了,你问我这个nats好用否? 我只能说比我以前用的go nsq性能高且可用性强,加入stream后,可信赖数据可靠性。如果再详细的介绍,大家就看nats的官网文档吧。

该文章后续仍在不断的更新修改中, 请移步到原文地址 http://xiaorui.cc/?p=5642

我们的nats应用场景

先说下我们的应用场景,我们除了用传统意义上的mq功能外,就会借助nats来实现消息总线。这里的消息总线模式更像是微服务的另一种模型。常见的微服务模式是服务提供者把自己的信息注册到 zk/ etcd/consul等,然后服务调用者通过服务发现机制找到访问对象的地址,然后直接访问,或者是通过mesh机制代理访问。

而基于消息总线的微服务设计是怎样?服务提供者监听nats mq topic,服务调用者直接把请求扔到mq topic,然后接收返回的结果? 为什么是疑问? nats大体是可以分为三个模式,跟大多mq一样的pubsub模式,还有queue模式,跟其他mq不一样的request reply模式。 mq一般可以理解为异步消费,也就是说producer只需要把任务丢到mq就完事了,但是如果你要实现同步,那么producer和provider两端需要定义好 任务信息topic和结果topic。然后producer pub任务后,需要再sub订阅结果topic。

麻烦不? 如果整条微服务体现的所有服务都这么搞,太蛋疼了。 所以nats mq为微服务专门搞了一个request & reply。简单说,就是为了结果上面说的同步问题,对于producer来说只是一个方法。你会发现github社区里有不少微服务套件都有消息总线模式的实现。

看字看不明白,看图应该能看懂吧?


nats request & reply 实现

request & reply的实现基本是在nats client端实现的。nats server对request & reply没做啥特别的东西。我们这里拿golang的nats client来分析下源码。go nats client的协程控制的很好,没有随意的滥开协程。subscribe自身没有并发控制,subscribe绑定了一个事件和回调方法后,会new一个waitForMsgs协程来回调,也就是说单个subscribe是同步阻塞的。 这个需要注意下,在go里nsq和kafka的sarama是模式并发模式的。

nats client连接的过程

一个nats client的连接会开启2个协程,一个是用来读取数据,一个用来flush数据。

1
2
3
4
5
6
7
// Process a connected connection and initialize properly.
func(nc *Conn)processConnectInit()error{
nc.wg.Add(2)
gonc.readLoop()
gonc.flusher()
returnnil
}
nats request发送的过程
针对request模式,一个连接只有一个waitForMsgs协程,nats通过sync.Once限制唯一。这里waitForMsgs的作用只是把msg发给request等待的chan而已。当我们使用bus的request方法时,他内部会生成一个接收结果的chan,接着在绑定Subscribe里绑定事件, 然后阻塞等待接收结果的chan。当远端的订阅者把结果返回时, waitForMsg会把msg传给上面的chan。 request接收chan,结束阻塞状态,返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// xiaorui.cc
func(nc *Conn)Request(subj string,data[]byte,timeout time.Duration)(*Msg,error){
// Create literal Inbox and map to a chan msg.
mch:=make(chan*Msg,RequestChanLen)
respInbox:=nc.newRespInbox()
token:=respToken(respInbox)
nc.respMap[token]=mch
createSub:=nc.respMux==nil
ginbox:=nc.respSub
nc.mu.Unlock()
ifcreateSub{
varerr error
// 通过sync.Once保证request模式下,waitForMsg只有一个协程。作用是订阅返回结果的topic及回调mch
nc.respSetup.Do(func(){err=nc.createRespMux(ginbox)})
iferr!=nil{
returnnil,err
}
}
iferr:=nc.PublishRequest(subj,respInbox,data);err!=nil{
returnnil,err
}
t:=globalTimerPool.Get(timeout)
deferglobalTimerPool.Put(t)
varokbool
varmsg*Msg
select{
// 回调的channel
casemsg,ok=<-mch:
if!ok{
returnnil,ErrConnectionClosed
}
case<-t.C:
nc.mu.Lock()
delete(nc.respMap,token)
nc.mu.Unlock()
returnnil,ErrTimeout
}
returnmsg,nil
}

nats reply订阅接收的过程

订阅消费者返回结果的topic,把获取的结果扔到request等待的channel里。订阅者去直接调用subscribe事件的时候,每个subscribe的调用都会new一个waitForMsg协程来回调我们注册的方法。

简单说,同一个连接下,多个sub就多个并发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// xiaorui.cc
func(nc *Conn)createRespMux(respSub string)error{
s,err:=nc.Subscribe(respSub,nc.respHandler)
iferr!=nil{
returnerr
}
nc.mu.Lock()
nc.respMux=s
nc.mu.Unlock()
returnnil
}
func(nc *Conn)respHandler(m *Msg){
rt:=respToken(m.Subject)
nc.mu.Lock()
// Just return if closed.
ifnc.isClosed(){
nc.mu.Unlock()
return
}
// Grab mch
mch:=nc.respMap[rt]
...
select{
casemch<-m:
default:
return
}
}
func(nc *Conn)subscribe(subj,queue string,cb MsgHandler,ch chan*Msg)(*Subscription,error){
...
ifcb!=nil{
...
gonc.waitForMsgs(sub)
}
}
func(nc *Conn)waitForMsgs(s *Subscription){
varclosed bool
vardelivered,max uint64
// Used to account for adjustments to sub.pBytes when we wrap back around.
msgLen:=-1
for{
...
ifs.pHead==nil&&!s.closed{
s.pCond.Wait()
}
// Pop the msg off the list
m:=s.pHead
ifm!=nil{
...
}
mcb:=s.mcb
max=s.max
...
// 投递消息, 其实就是调用nc.respHandler
ifm!=nil&&(max==0||delivered<=max){
mcb(m)
}
...
}
// Check for barrier messages
s.mu.Lock()
form:=s.pHead;m!=nil;m=s.pHead{
ifm.barrier!=nil{
s.mu.Unlock()
ifatomic.AddInt64(&m.barrier.refs,-1)==0{
m.barrier.f()
}
s.mu.Lock()
}
s.pHead=m.next
}
s.mu.Unlock()
}

总结:

其实nats request& reply的原理很简单,另外golang nats的源码质量还可以,建议大家可以看下。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址 xiaorui.cc

有疑问加站长微信联系(非本文作者)

本文来自:峰云就她了

感谢作者:rfyiamcool

查看原文:源码分析nats request reply的设计实现

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1820 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏