分享
  1. 首页
  2. 文章

Nsq原理分析(二)

朱伟 · · 1249 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

上一篇文章中对nsq进行了简单的介绍,从nsq 的golang的客户端代码分析了一下nsq的使用,这篇文章会分析nsqd的代码

Nsqd代码分析

nsqd做了什么

  • nsqd接收对topic、channel的创建以及对消息的存储和分发
  • nsqd向nsqlookup注册自己的服务信息,ip 和端口,向nsqlookup注册自己的元数据信息(topic、channel),nsqd也会向nsqdlook查询topic、和channel信息

nsq.png
nsqadmin 是一个简单的管理界面,通过它可以查询topic、channel、消费者等等一些基本信息,nsqadmin是从 nsqlookup中获取信息的,通过nsqadmin也可以创建topic、channel,创建到了nsqlookup中,在nsqlookup中的内存中维护者,nsqd 会在某一个合适的时刻将这些信息拉回本地然后创建
nsqd 启动

func (n *NSQD) Main() error {
 ctx := &context{n}
 exitCh := make(chan error)
 var once sync.Once
 exitFunc := func(err error) {
 once.Do(func() {
 if err != nil {
 n.logf(LOG_FATAL, "%s", err)
 }
 exitCh <- err
 })
 }
 n.tcpServer.ctx = ctx
 // 启动 tcp监听
 n.waitGroup.Wrap(func() {
 exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
 })
 // 启动http监听
 httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
 n.waitGroup.Wrap(func() {
 exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
 })
 if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
 httpsServer := newHTTPServer(ctx, true, true)
 n.waitGroup.Wrap(func() {
 exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
 })
 }
 // 队列扫描,处理超时、延迟等信息
 n.waitGroup.Wrap(n.queueScanLoop)
 // 向nsqlookup注册自己的元数据信息
 n.waitGroup.Wrap(n.lookupLoop)
 if n.getOpts().StatsdAddress != "" {
 n.waitGroup.Wrap(n.statsdLoop)
 }
 err := <-exitCh
 return err
}
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
 logf(lg.INFO, "TCP: listening on %s", listener.Addr())
 var wg sync.WaitGroup
 for {
 //等待请求的到来
 clientConn, err := listener.Accept()
 if err != nil {
 if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
 logf(lg.WARN, "temporary Accept() failure - %s", err)
 runtime.Gosched()
 continue
 }
 // theres no direct way to detect this error because it is not exposed
 if !strings.Contains(err.Error(), "use of closed network connection") {
 return fmt.Errorf("listener.Accept() error - %s", err)
 }
 break
 }
 wg.Add(1)
 // 每当到来一个请求都启动一个goroutine进行处理
 go func() {
 handler.Handle(clientConn)
 wg.Done()
 }()
 }
 // wait to return until all handler goroutines complete
 wg.Wait()
 logf(lg.INFO, "TCP: closing %s", listener.Addr())
 return nil
}
unc (p *tcpServer) Handle(clientConn net.Conn) {
 p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())
 // The client should initialize itself by sending a 4 byte sequence indicating
 // the version of the protocol that it intends to communicate, this will allow us
 // to gracefully upgrade the protocol away from text/line oriented to whatever...
 buf := make([]byte, 4)
 _, err := io.ReadFull(clientConn, buf)
 if err != nil {
 p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
 clientConn.Close()
 return
 }
 //协商协议版本
 protocolMagic := string(buf)
 p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
 clientConn.RemoteAddr(), protocolMagic)
 var prot protocol.Protocol
 switch protocolMagic {
 case " V2":
 prot = &protocolV2{ctx: p.ctx}
 default:
 protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
 clientConn.Close()
 p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
 clientConn.RemoteAddr(), protocolMagic)
 return
 }
 p.conns.Store(clientConn.RemoteAddr(), clientConn)
 // 开始一个死循环
 err = prot.IOLoop(clientConn)
 if err != nil {
 p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
 }
 p.conns.Delete(clientConn.RemoteAddr())
}
func (p *protocolV2) IOLoop(conn net.Conn) error {
 var err error
 var line []byte
 var zeroTime time.Time
 clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
 client := newClientV2(clientID, conn, p.ctx)
 p.ctx.nsqd.AddClient(client.ID, client)
 // synchronize the startup of messagePump in order
 // to guarantee that it gets a chance to initialize
 // goroutine local state derived from client attributes
 // and avoid a potential race with IDENTIFY (where a client
 // could have changed or disabled said attributes)
 messagePumpStartedChan := make(chan bool)
 go p.messagePump(client, messagePumpStartedChan)
 // 消息分发,向消费者发送消息
 <-messagePumpStartedChan
 for {
 // 设置socket读取超时,如果consumer未在指定的时间内发送过来,那么会断开连接,导致consumer退出
 if client.HeartbeatInterval > 0 {
 client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
 } else {
 client.SetReadDeadline(zeroTime)
 }
 // ReadSlice does not allocate new space for the data each request
 // ie. the returned slice is only valid until the next call to it
 //读取生产者或者消费者发送过来的请求
 line, err = client.Reader.ReadSlice('\n')
 if err != nil {
 if err == io.EOF {
 err = nil
 } else {
 err = fmt.Errorf("failed to read command - %s", err)
 }
 break
 }
 // trim the '\n'
 line = line[:len(line)-1]
 // optionally trim the '\r'
 if len(line) > 0 && line[len(line)-1] == '\r' {
 line = line[:len(line)-1]
 }
 params := bytes.Split(line, separatorBytes)
 p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
 var response []byte
 // 根据不同的命令执行不同的动作
 response, err = p.Exec(client, params)
 if err != nil {
 ctx := ""
 if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
 ctx = " - " + parentErr.Error()
 }
 p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)
 sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
 if sendErr != nil {
 p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
 break
 }
 // errors of type FatalClientErr should forceably close the connection
 if _, ok := err.(*protocol.FatalClientErr); ok {
 break
 }
 continue
 }
 if response != nil {
 err = p.Send(client, frameTypeResponse, response)
 if err != nil {
 err = fmt.Errorf("failed to send response - %s", err)
 break
 }
 }
 }
 p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
 conn.Close()
 close(client.ExitChan)
 if client.Channel != nil {
 client.Channel.RemoveClient(client.ID)
 }
 p.ctx.nsqd.RemoveClient(client.ID)
 return err
}

在继续向下看前,看一下生产者的 PUB 请求在nsqd中做了什么

func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
 var err error
 if len(params) < 2 {
 return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters")
 }
 topicName := string(params[1])
 if !protocol.IsValidTopicName(topicName) {
 return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC",
 fmt.Sprintf("PUB topic name %q is not valid", topicName))
 }
 bodyLen, err := readLen(client.Reader, client.lenSlice)
 if err != nil {
 return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size")
 }
 if bodyLen <= 0 {
 return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
 fmt.Sprintf("PUB invalid message body size %d", bodyLen))
 }
 if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize {
 return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
 fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxMsgSize))
 }
 messageBody := make([]byte, bodyLen)
 _, err = io.ReadFull(client.Reader, messageBody)
 if err != nil {
 return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body")
 }
 if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil {
 return nil, err
 }
 // topic 在nsqd中的创建的lazy create,只有当某个生产者向该topic中发送消息时才会创建topic,
 topic := p.ctx.nsqd.GetTopic(topicName)
 msg := NewMessage(topic.GenerateID(), messageBody)
 err = topic.PutMessage(msg)
 if err != nil {
 return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
 }
 client.PublishedMessage(topicName, 1)
 return okBytes, nil
}
/ GetTopic performs a thread safe operation
// to return a pointer to a Topic object (potentially new)
func (n *NSQD) GetTopic(topicName string) *Topic {
 // most likely, we already have this topic, so try read lock first.
 n.RLock()
 // 当topic在nsqd中创建过时就直接返回该topic
 t, ok := n.topicMap[topicName]
 n.RUnlock()
 if ok {
 return t
 }
 n.Lock()
 t, ok = n.topicMap[topicName]
 if ok {
 n.Unlock()
 return t
 }
 deleteCallback := func(t *Topic) {
 n.DeleteExistingTopic(t.name)
 }
 //稍后看一下这个函数
 t = NewTopic(topicName, &context{n}, deleteCallback)
 n.topicMap[topicName] = t
 n.Unlock()
 n.logf(LOG_INFO, "TOPIC(%s): created", t.name)
 // topic is created but messagePump not yet started
 // if loading metadata at startup, no lookupd connections yet, topic started after load
 if atomic.LoadInt32(&n.isLoading) == 1 {
 return t
 }
 // if using lookupd, make a blocking call to get the topics, and immediately create them.
 // this makes sure that any message received is buffered to the right channels
 //如果使用了nsqlookup,那么从nsqlookup中查询该topic的channel信息,如果没有在nsqd中创建就创建出来
 lookupdHTTPAddrs := n.lookupdHTTPAddrs()
 if len(lookupdHTTPAddrs) > 0 {
 channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
 if err != nil {
 n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err)
 }
 for _, channelName := range channelNames {
 if strings.HasSuffix(channelName, "#ephemeral") {
 continue // do not create ephemeral channel with no consumer client
 }
 t.GetChannel(channelName)
 }
 } else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 {
 n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name)
 }
 // now that all channels are added, start topic messagePump
 t.Start()
 return t
}
// Topic constructor
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
 t := &Topic{
 name: topicName,
 channelMap: make(map[string]*Channel),
 memoryMsgChan: nil,
 startChan: make(chan int, 1),
 exitChan: make(chan int),
 channelUpdateChan: make(chan int),
 ctx: ctx,
 paused: 0,
 pauseChan: make(chan int),
 deleteCallback: deleteCallback,
 idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID),
 }
 // create mem-queue only if size > 0 (do not use unbuffered chan)
 if ctx.nsqd.getOpts().MemQueueSize > 0 {
 t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
 }
 if strings.HasSuffix(topicName, "#ephemeral") {
 t.ephemeral = true
 t.backend = newDummyBackendQueue()
 } else {
 dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
 opts := ctx.nsqd.getOpts()
 lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
 }
 //持久化的结构
 t.backend = diskqueue.New(
 topicName,
 ctx.nsqd.getOpts().DataPath,
 ctx.nsqd.getOpts().MaxBytesPerFile,
 int32(minValidMsgLength),
 int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
 ctx.nsqd.getOpts().SyncEvery,
 ctx.nsqd.getOpts().SyncTimeout,
 dqLogf,
 )
 }
 // topic中也启动了一个messagePump,在protocolv2中也启动了一个同名函数,前一个是为了向consumer推送消息,这个是向topic下的一个或者多个队列中发送消息
 t.waitGroup.Wrap(t.messagePump)
 // 通知持久化
 t.ctx.nsqd.Notify(t)
 return t
}
func (t *Topic) Start() {
 select {
 case t.startChan <- 1:
 default:
 }
}

看一下nsqd是如何向nsqlookup注册自己的元数据信息的,在nsqd启动时起了一个goroutine lookuploop

func (n *NSQD) lookupLoop() {
 var lookupPeers []*lookupPeer
 var lookupAddrs []string
 connect := true
 hostname, err := os.Hostname()
 if err != nil {
 n.logf(LOG_FATAL, "failed to get hostname - %s", err)
 os.Exit(1)
 }
 // for announcements, lookupd determines the host automatically
 ticker := time.Tick(15 * time.Second)
 for {
 if connect {
 for _, host := range n.getOpts().NSQLookupdTCPAddresses {
 if in(host, lookupAddrs) {
 continue
 }
 n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
 lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
 connectCallback(n, hostname))
 lookupPeer.Command(nil) // start the connection
 lookupPeers = append(lookupPeers, lookupPeer)
 lookupAddrs = append(lookupAddrs, host)
 }
 n.lookupPeers.Store(lookupPeers)
 connect = false
 }
 select {
 case <-ticker:
 // 向nsqlookup发送心跳信息
 // send a heartbeat and read a response (read detects closed conns)
 for _, lookupPeer := range lookupPeers {
 n.logf(LOG_DEBUG, "LOOKUPD(%s): sending heartbeat", lookupPeer)
 cmd := nsq.Ping()
 _, err := lookupPeer.Command(cmd)
 if err != nil {
 n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
 }
 }
 case val := <-n.notifyChan:
 var cmd *nsq.Command
 var branch string
 switch val.(type) {
 // 注册channel
 case *Channel:
 // notify all nsqlookupds that a new channel exists, or that it's removed
 branch = "channel"
 channel := val.(*Channel)
 if channel.Exiting() == true {
 cmd = nsq.UnRegister(channel.topicName, channel.name)
 } else {
 cmd = nsq.Register(channel.topicName, channel.name)
 }
 // 注册topic
 case *Topic:
 
 // notify all nsqlookupds that a new topic exists, or that it's removed
 branch = "topic"
 topic := val.(*Topic)
 if topic.Exiting() == true {
 cmd = nsq.UnRegister(topic.name, "")
 } else {
 cmd = nsq.Register(topic.name, "")
 }
 }
 for _, lookupPeer := range lookupPeers {
 n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
 _, err := lookupPeer.Command(cmd)
 if err != nil {
 n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
 }
 }
 case <-n.optsNotificationChan:
 var tmpPeers []*lookupPeer
 var tmpAddrs []string
 for _, lp := range lookupPeers {
 if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) {
 tmpPeers = append(tmpPeers, lp)
 tmpAddrs = append(tmpAddrs, lp.addr)
 continue
 }
 n.logf(LOG_INFO, "LOOKUP(%s): removing peer", lp)
 lp.Close()
 }
 lookupPeers = tmpPeers
 lookupAddrs = tmpAddrs
 connect = true
 case <-n.exitChan:
 goto exit
 }
 }
exit:
 n.logf(LOG_INFO, "LOOKUP: closing")
}

在nsqd启动lookuploop这个goroutine时还启动了另一 queueScanLoop goroutine,主要用来监控超时消息的处理。
总结一下

  • nsqd启动时分别监听tcp、http端口
  • 启动loopuploop goroutine 向nsqlookup 注册自己的相关信息
  • 启动 queueScanLoop goroutine 对超时消息进行处理
  • 启动 statsdLoop goroutine 进行性能和topic、channel等一些参数进行统计
  • 当有 producer client 通过 PUB 命令接入进来时,nsqd 会情动一个单独的 goroutine 进行处理,此时会创建 topic、channel,topic会启动一个 messagepump 的 goroutine,将消息发送给下面的各个channel
  • 当有 consumer client 接入进来时, 启动单独 goroutine 进行处理,会启动一个 messagepump goroutine 将消息发送给各个consumer

注意,consumer 消费消息是有超时配置的,消费者的每一条消息要在超时范围内,要不然会导致一些问题。


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:朱伟

查看原文:Nsq原理分析(二)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1249 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏