分享
  1. 首页
  2. 文章

golang-nsq系列(二)--nsqd源码解析

热爱coding的稻草 · · 1072 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

上一篇初识了 nsq 三个模块(nsqd, nsqlookupd, nsqadmin)的 demo演示,本篇则从源码开始,一步一步去解析 nsqd 的执行流程和逻辑处理,学习别人优秀的项目架构,以期学以致用。

1. nsqd 执行入口

nsq/apps/nsqd/main.go 可以找到执行入口文件,如下:

nsqd-path

2. nsqd 执行主逻辑源码

2.1 通过第三方 svc 包进行优雅的后台进程管理,svc.Run() -> svc.Init() -> svc.Start(),启动 nsqd 实例;

func main() {
 prg := &program{}
 if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
 logFatal("%s", err)
 }
}
func (p *program) Init(env svc.Environment) error {
 if env.IsWindowsService() {
 dir := filepath.Dir(os.Args[0])
 return os.Chdir(dir)
 }
 return nil
}
func (p *program) Start() error {
 opts := nsqd.NewOptions()
 flagSet := nsqdFlagSet(opts)
 flagSet.Parse(os.Args[1:])
 ...
}



2.2 初始化配置项(opts, cfg),加载历史数据(nsqd.LoadMetadata)、持久化最新数据(nsqd.PersistMetadata),然后开启协程,进入 nsqd.Main() 主函数;

options.Resolve(opts, flagSet, cfg)
 nsqd, err := nsqd.New(opts)
 if err != nil {
 logFatal("failed to instantiate nsqd - %s", err)
 }
 p.nsqd = nsqd
 err = p.nsqd.LoadMetadata()
 if err != nil {
 logFatal("failed to load metadata - %s", err)
 }
 err = p.nsqd.PersistMetadata()
 if err != nil {
 logFatal("failed to persist metadata - %s", err)
 }
 go func() {
 err := p.nsqd.Main()
 if err != nil {
 p.Stop()
 os.Exit(1)
 }
 }()



2.3 初始化 tcpServer, httpServer, httpsServer,然后循环监控队列信息(n.queueScanLoop)、节点信息管理(n.lookupLoop)、统计信息(n.statsdLoop)输出;

tcpServer := &tcpServer{ctx: ctx}
 n.waitGroup.Wrap(func() {
 exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
 })
 httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
 n.waitGroup.Wrap(func() {
 exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
 })
 if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
 httpsServer := newHTTPServer(ctx, true, true)
 n.waitGroup.Wrap(func() {
 exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
 })
 }
 n.waitGroup.Wrap(n.queueScanLoop)
 n.waitGroup.Wrap(n.lookupLoop)
 if n.getOpts().StatsdAddress != "" {
 n.waitGroup.Wrap(n.statsdLoop)
 }



2.4 分别处理 tcp/http 请求,开启 handler 协程进行并发处理,其中 newHTTPServer 注册路由采用了 Decorate 装饰器模式(后面会进一步解析);

http-Decorate路由分发:

router := httprouter.New()
 router.HandleMethodNotAllowed = true
 router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf)
 router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf)
 router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf)
 s := &httpServer{
 ctx: ctx,
 tlsEnabled: tlsEnabled,
 tlsRequired: tlsRequired,
 router: router,
 }
 router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
 router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))
 // v1 negotiate
 router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
 router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
 router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1))
 // only v1
 router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
 router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))

tcp-handler 处理:

for {
 clientConn, err := listener.Accept()
 if err != nil {
 if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
 logf(lg.WARN, "temporary Accept() failure - %s", err)
 runtime.Gosched()
 continue
 }
 // theres no direct way to detect this error because it is not exposed
 if !strings.Contains(err.Error(), "use of closed network connection") {
 return fmt.Errorf("listener.Accept() error - %s", err)
 }
 break
 }
 go handler.Handle(clientConn)
 }



2.5 tcp 解析 V2 协议,走内部协议封装的 prot.IOLoop(conn) 进行处理;

var prot protocol.Protocol
 switch protocolMagic {
 case " V2":
 prot = &protocolV2{ctx: p.ctx}
 default:
 protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
 clientConn.Close()
 p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
 clientConn.RemoteAddr(), protocolMagic)
 return
 }
 err = prot.IOLoop(clientConn)
 if err != nil {
 p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
 return
 }



2.6 通过内部协议进行 p.Exec(执行命令)、p.Send(发送结果),保证每个 nsqd 节点都能正确的进行消息生成与消费,一旦上述过程有 error 都会被捕获处理,确保分布式投递的可靠性。

params := bytes.Split(line, separatorBytes)
 p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
 var response []byte
 response, err = p.Exec(client, params)
 if err != nil {
 ctx := ""
 if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
 ctx = " - " + parentErr.Error()
 }
 p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)
 sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
 if sendErr != nil {
 p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
 break
 }
 // errors of type FatalClientErr should forceably close the connection
 if _, ok := err.(*protocol.FatalClientErr); ok {
 break
 }
 continue
 }
 if response != nil {
 err = p.Send(client, frameTypeResponse, response)
 if err != nil {
 err = fmt.Errorf("failed to send response - %s", err)
 break
 }
 }

3. nsqd 流程图小结

上述流程小结示意图如下:

nsqd-logic

【小结】从源码可以看到,代码逻辑清晰明了,利用 Go 协程高效并发处理分布式多节点 nsqd 的消息生产与消费,里面有很多细节有待下一步仔细剖析,学以致用。


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:热爱coding的稻草

查看原文:golang-nsq系列(二)--nsqd源码解析

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1072 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏