分享
  1. 首页
  2. 文章

Nsq 原理分析(一)

朱伟 · · 1416 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

Nsq 是用 go 语言开发的轻量级的分布式消息队列,适合小型项目使用、用来学习消息队列实现原理、学习 golang channel知识以及如何用 go 来写分布式,为什么说适合小型小型项目使用因为,nsq 如果没有能力进行二次开发的情况存在的问题还是很多的。


Nsq 模块介绍

nsqd:是一个进程监听了 http、tcp 两种协议,用来创建 topic、channel,分发消息给消费者,向 nsqlooup 注册自己的元数据信息(topic、channel、consumer),自己的服务信息,最核心模块。

nsqlookup:存储了 nsqd 的元数据和服务信息(endpoind),向消费者提供服务发现功能,向 nsqadmin 提供数据查询功能。

nsqadmin:简单的管理界面,展示了 topic、channel以及channel上的消费者,也可以创建 topic、channel
nsq.gif
摘自官网
生产者向某个topic中发送消息,如果topic有一个或者多个channle,那么该消息会被复制多分发送到每一个channel中。类似 rabbitmq中的fanout类型,channle类似队列。
官方说 nsq 是分布式的消息队列服务,但是在我看来只有channel到消费者这部分提现出来分布式的感觉,nsqd 这个模块其实就是单点的,nsqd 将 topic、channel、以及消息都存储在了本地磁盘,官方还建议一个生产者使用一个 nsqd,这样不仅浪费资源还没有数据备份的保障。一旦 nsqd 所在的主机磁损坏,数据都将丢失。

Nsq 源码分析

先部署一个简单的环境,以 centos 操作系统为例

下载
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
解压
tar xvf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
cd nsq-1.2.0.linux-amd64.go1.12.9/bin
cp * /bin

启动三个终端,一个用来启动 nsqadmin(管理界面)、nsqlookup(nsqd服务以及元数据管理)、nsqd(nsq核心模块,元数据、消息存储以及消息分发), ip 换成自己的真实ip

终端1
/bin/nsqd --lookupd-tcp-address 192.168.1.1:4160 -tcp-address 0.0.0.0:4152 -http-address 0.0.0.0:4153 --broadcast-address 192.168.1.1
终端2
/bin/nsqlookupd --broadcast-address 192.168.1.1:4160
终端3
/bin/nsqadmin --lookupd-http-address 192.168.1.1:4160

看一下 nsq 的简单使用

cat producer.go
package main
import "github.com/nsqio/go-nsq"
config := nsq.NewConfig()
p, _ := nsq.NewProducer(addr, config)
err := p.Publish("topic", []byte("message"))
if err != nil {
 fmt.Printf("dispatch task failed %s", err)
}
cat consumer.go
package main
import "github.com/nsqio/go-nsq"
type MyHandler struct {}
func (h *MyHandler) HandleMessage(message *nsq.Message) error {
 fmt.Printf("consume message %+v\n", message)
}
config := nsq.NewConfig()
c, _ := nsq.NewConsumer("topic", "channel", config)
c.SetLoggerLevel(nsq.LogLevelDebug)
handler := &MyHandler{}
c.AddHandler(handler)
// 这里端口是4161 是 nsqlookup 的 http 端口, nsqd 和 nsqlookup 都同时监听了 tcp和http两个协议
err := c.ConnectToNSQLookupd("192.168.1.1:4161")
if err != nil {
 fmt.Printf("Connect nsq lookup failed %+v\n", err)
}

1. 生产者代码分析

go-nsq/producer.go

// After Config is passed into NewProducer the values are no longer mutable (they are copied).
func NewProducer(addr string, config *Config) (*Producer, error) {
 err := config.Validate()
 if err != nil {
 return nil, err
 }
 p := &Producer{
 id: atomic.AddInt64(&instCount, 1),
 addr: addr,
 config: *config,
 logger: make([]logger, int(LogLevelMax+1)),
 logLvl: LogLevelInfo,
 transactionChan: make(chan *ProducerTransaction),
 exitChan: make(chan int),
 responseChan: make(chan []byte),
 errorChan: make(chan []byte),
 }
 // Set default logger for all log levels
 l := log.New(os.Stderr, "", log.Flags())
 for index, _ := range p.logger {
 p.logger[index] = l
 }
 return p, nil
}

初始化了 Producer 的结构体

// Publish synchronously publishes a message body to the specified topic, returning
// an error if publish failed
func (w *Producer) Publish(topic string, body []byte) error { 
 return w.sendCommand(Publish(topic, body))
}

指定要往哪个 topic 中发送消息以及要发送的消息

// Publish creates a new Command to write a message to a given topic
func Publish(topic string, body []byte) *Command {
 var params = [][]byte{[]byte(topic)}
 return &Command{[]byte("PUB"), params, body}
}

封装了命令

func (w *Producer) sendCommand(cmd *Command) error {
 doneChan := make(chan *ProducerTransaction)
 // 内部使用了异步发送的方式
 err := w.sendCommandAsync(cmd, doneChan, nil)
 if err != nil {
 close(doneChan)
 return err
 }
 // 等待异步发送完成
 t := <-doneChan
 return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
 args []interface{}) error {
 // keep track of how many outstanding producers we're dealing with
 // in order to later ensure that we clean them all up...
 atomic.AddInt32(&w.concurrentProducers, 1)
 defer atomic.AddInt32(&w.concurrentProducers, -1)
 // 判断有没有和 nsqd 建立连接,已经建立跳过
 if atomic.LoadInt32(&w.state) != StateConnected {
 err := w.connect()
 if err != nil {
 return err
 }
 }
 t := &ProducerTransaction{
 cmd: cmd,
 doneChan: doneChan,
 Args: args,
 }
 select {
 case w.transactionChan <- t:
 case <-w.exitChan:
 return ErrStopped
 }
 return nil
}

在上面这段代码中依然没有看到将 PUB command 发送给 nsqd进程的代码, 我们看一下那个 connect 函数

func (w *Producer) connect() error {
 w.guard.Lock()
 defer w.guard.Unlock()
 if atomic.LoadInt32(&w.stopFlag) == 1 {
 return ErrStopped
 }
 switch state := atomic.LoadInt32(&w.state); state {
 case StateInit:
 case StateConnected:
 return nil
 default:
 return ErrNotConnected
 }
 w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)
 w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
 w.conn.SetLoggerLevel(w.getLogLevel())
 format := fmt.Sprintf("%3d (%%s)", w.id)
 for index := range w.logger {
 w.conn.SetLoggerForLevel(w.logger[index], LogLevel(index), format)
 }
 // 这个主要是消费者在使用。在消费者部分会详细分析
 _, err := w.conn.Connect()
 if err != nil {
 w.conn.Close()
 w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err)
 return err
 }
 atomic.StoreInt32(&w.state, StateConnected)
 w.closeChan = make(chan int)
 w.wg.Add(1)
 // 生产者利用这个 goroutine 向 nsqd 发送命令和接收响应
 go w.router()
 return nil
}
func (w *Producer) router() {
 for {
 select {
 // 在上面的 sendCommandAsync 这个方法中只看到了将待发送的命令又包装了一下扔到了一个 channel 中,这里在监听,以及将命令发送给nsqd
 case t := <-w.transactionChan:
 w.transactions = append(w.transactions, t)
 err := w.conn.WriteCommand(t.cmd)
 if err != nil {
 w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
 w.close()
 }
 // 接收 nsqd 的响应
 case data := <-w.responseChan:
 w.popTransaction(FrameTypeResponse, data)
 case data := <-w.errorChan:
 w.popTransaction(FrameTypeError, data)
 case <-w.closeChan:
 goto exit
 case <-w.exitChan:
 goto exit
 }
 }
exit:
 w.transactionCleanup()
 w.wg.Done()
 w.log(LogLevelInfo, "exiting router")
}

2. 消费者代码分析

// NewConsumer creates a new instance of Consumer for the specified topic/channel
//
// The only valid way to create a Config is via NewConfig, using a struct literal will panic.
// After Config is passed into NewConsumer the values are no longer mutable (they are copied).
// 指定要监听的订阅的 topic 和 channel
func NewConsumer(topic string, channel string, config *Config) (*Consumer, error) {
 if err := config.Validate(); err != nil {
 return nil, err
 }
 if !IsValidTopicName(topic) {
 return nil, errors.New("invalid topic name")
 }
 if !IsValidChannelName(channel) {
 return nil, errors.New("invalid channel name")
 }
 r := &Consumer{
 id: atomic.AddInt64(&instCount, 1),
 topic: topic,
 channel: channel,
 config: *config,
 logger: make([]logger, LogLevelMax+1),
 logLvl: LogLevelInfo,
 maxInFlight: int32(config.MaxInFlight),
 incomingMessages: make(chan *Message),
 rdyRetryTimers: make(map[string]*time.Timer),
 pendingConnections: make(map[string]*Conn),
 connections: make(map[string]*Conn),
 lookupdRecheckChan: make(chan int, 1),
 rng: rand.New(rand.NewSource(time.Now().UnixNano())),
 StopChan: make(chan int),
 exitChan: make(chan int),
 }
 // Set default logger for all log levels
 l := log.New(os.Stderr, "", log.Flags())
 for index := range r.logger {
 r.logger[index] = l
 }
 r.wg.Add(1)
 // 因为nsq是推送push的方式消费消息,所以早消费者端会控制消费的速度,限流作用,可以配置可以自动更新
 go r.rdyLoop()
 return r, nil
}

初始化 Consumer结构体

初始化后需要添加消息处理函数 AddHandler

// AddHandler sets the Handler for messages received by this Consumer. This can be called
// multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) AddHandler(handler Handler) {
 r.AddConcurrentHandlers(handler, 1)
}
// AddConcurrentHandlers sets the Handler for messages received by this Consumer. It
// takes a second argument which indicates the number of goroutines to spawn for
// message handling.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
 if atomic.LoadInt32(&r.connectedFlag) == 1 {
 panic("already connected")
 }
 atomic.AddInt32(&r.runningHandlers, int32(concurrency))
 for i := 0; i < concurrency; i++ {
 // 可以设置并发
 go r.handlerLoop(handler)
 }
}
func (r *Consumer) handlerLoop(handler Handler) {
 r.log(LogLevelDebug, "starting Handler")
 for {
 // 不断的接收 nsqd 发送过来的请求, readloop这个死循环方法会向这个channel仍消息进来,后面我们会说到
 message, ok := <-r.incomingMessages
 if !ok {
 goto exit
 }
 if r.shouldFailMessage(message, handler) {
 message.Finish()
 continue
 }
 // 使用我们添加的消息处理函数来消费消息
 err := handler.HandleMessage(message)
 if err != nil {
 r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
 if !message.IsAutoResponseDisabled() {
 message.Requeue(-1)
 }
 continue
 }
 // 当一条消息处理完成是否从队列中移除,相当于提交,默认消费完一条消息自动提交,可以设置批量提交
 if !message.IsAutoResponseDisabled() {
 message.Finish()
 }
 }
exit:
 r.log(LogLevelDebug, "stopping Handler")
 if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
 r.exit()
 }
}
func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
 // message passed the max number of attempts
 if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
 r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
 message.ID, message.Attempts)
 logger, ok := handler.(FailedMessageLogger)
 if ok {
 logger.LogFailedMessage(message)
 }
 return true
 }
 return false
}
func (r *Consumer) exit() {
 r.exitHandler.Do(func() {
 close(r.exitChan)
 r.wg.Wait()
 close(r.StopChan)
 })
}
// ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.
//
// If it is the first to be added, it initiates an HTTP request to discover nsqd
// producers for the configured topic.
//
// A goroutine is spawned to handle continual polling.
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
 if atomic.LoadInt32(&r.stopFlag) == 1 {
 return errors.New("consumer stopped")
 }
 if atomic.LoadInt32(&r.runningHandlers) == 0 {
 return errors.New("no handlers")
 }
 if err := validatedLookupAddr(addr); err != nil {
 return err
 }
 atomic.StoreInt32(&r.connectedFlag, 1)
 r.mtx.Lock()
 for _, x := range r.lookupdHTTPAddrs {
 if x == addr {
 r.mtx.Unlock()
 return nil
 }
 }
 r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
 numLookupd := len(r.lookupdHTTPAddrs)
 r.mtx.Unlock()
 // if this is the first one, kick off the go loop
 if numLookupd == 1 {
 r.queryLookupd()
 r.wg.Add(1)
 go r.lookupdLoop()
 }
 return nil
}

消费者需要连接到nsqlookup,从nsqlookup中查询到nsqd的服务信息,然后进行连接

// make an HTTP req to one of the configured nsqlookupd instances to discover
// which nsqd's provide the topic we are consuming.
//
// initiate a connection to any new producers that are identified.
func (r *Consumer) queryLookupd() {
 retries := 0
retry:
 endpoint := r.nextLookupdEndpoint()
 r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)
 var data lookupResp
 err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
 if err != nil {
 r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
 retries++
 if retries < 3 {
 r.log(LogLevelInfo, "retrying with next nsqlookupd")
 goto retry
 }
 return
 }
 var nsqdAddrs []string
 for _, producer := range data.Producers {
 broadcastAddress := producer.BroadcastAddress
 port := producer.TCPPort
 joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
 nsqdAddrs = append(nsqdAddrs, joined)
 }
 // apply filter
 if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok {
 nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
 }
 // 获取 nsqlookup中所以的nsqd信息,然后进行连接
 for _, addr := range nsqdAddrs {
 err = r.ConnectToNSQD(addr)
 if err != nil && err != ErrAlreadyConnected {
 r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
 continue
 }
 }
}

官方不建议消费者端直接连接nsqd,

// ConnectToNSQD takes a nsqd address to connect directly to.
//
// It is recommended to use ConnectToNSQLookupd so that topics are discovered
// automatically. This method is useful when you want to connect to a single, local,
// instance.
func (r *Consumer) ConnectToNSQD(addr string) error {
 if atomic.LoadInt32(&r.stopFlag) == 1 {
 return errors.New("consumer stopped")
 }
 if atomic.LoadInt32(&r.runningHandlers) == 0 {
 return errors.New("no handlers")
 }
 atomic.StoreInt32(&r.connectedFlag, 1)
 // 初始化
 conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
 conn.SetLoggerLevel(r.getLogLevel())
 format := fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel)
 for index := range r.logger {
 conn.SetLoggerForLevel(r.logger[index], LogLevel(index), format)
 }
 r.mtx.Lock()
 _, pendingOk := r.pendingConnections[addr]
 _, ok := r.connections[addr]
 if ok || pendingOk {
 r.mtx.Unlock()
 return ErrAlreadyConnected
 }
 r.pendingConnections[addr] = conn
 if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 {
 r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
 }
 r.mtx.Unlock()
 r.log(LogLevelInfo, "(%s) connecting to nsqd", addr)
 cleanupConnection := func() {
 r.mtx.Lock()
 delete(r.pendingConnections, addr)
 r.mtx.Unlock()
 conn.Close()
 }
 // 进行连接,在分析生产者时看到过,这里是consumer和nsqd建立了连接的地方
 resp, err := conn.Connect()
 if err != nil {
 cleanupConnection()
 return err
 }
 if resp != nil {
 if resp.MaxRdyCount < int64(r.getMaxInFlight()) {
 r.log(LogLevelWarning,
 "(%s) max RDY count %d < consumer max in flight %d, truncation possible",
 conn.String(), resp.MaxRdyCount, r.getMaxInFlight())
 }
 }
 // consumer向nsqd发送订阅命令,此时consumer会将自己注册到nsqd中,更准确的说法是consumer将自己注册到了topic下的channel的client列表中,有消息到来时channle会随机向自己的客户端列表发送消息
 cmd := Subscribe(r.topic, r.channel)
 err = conn.WriteCommand(cmd)
 if err != nil {
 cleanupConnection()
 return fmt.Errorf("[%s] failed to subscribe to %s:%s - %s",
 conn, r.topic, r.channel, err.Error())
 }
 r.mtx.Lock()
 delete(r.pendingConnections, addr)
 r.connections[addr] = conn
 r.mtx.Unlock()
 // pre-emptive signal to existing connections to lower their RDY count
 for _, c := range r.conns() {
 r.maybeUpdateRDY(c)
 }
 return nil

go-nsq/conn.go

// Connect dials and bootstraps the nsqd connection
// (including IDENTIFY) and returns the IdentifyResponse
func (c *Conn) Connect() (*IdentifyResponse, error) {
 dialer := &net.Dialer{
 LocalAddr: c.config.LocalAddr,
 Timeout: c.config.DialTimeout,
 }
 // 生产者或者消费者在这里与 nsqd 建立 tcp 连接
 conn, err := dialer.Dial("tcp", c.addr)
 if err != nil {
 return nil, err
 }
 c.conn = conn.(*net.TCPConn)
 c.r = conn
 c.w = conn
 // 建立连接后先发送 4 字节信息表示使用哪种协议,目前有 v1 和 v2两种协议
 _, err = c.Write(MagicV2)
 if err != nil {
 c.Close()
 return nil, fmt.Errorf("[%s] failed to write magic - %s", c.addr, err)
 }
 // 告诉 nsqd 关于自己的一些基本信息,比如心跳间隔、处理消息的超时、client id 等等
 resp, err := c.identify()
 if err != nil {
 return nil, err
 }
 if resp != nil && resp.AuthRequired {
 if c.config.AuthSecret == "" {
 c.log(LogLevelError, "Auth Required")
 return nil, errors.New("Auth Required")
 }
 err := c.auth(c.config.AuthSecret)
 if err != nil {
 c.log(LogLevelError, "Auth Failed %s", err)
 return nil, err
 }
 }
 c.wg.Add(2)
 atomic.StoreInt32(&c.readLoopRunning, 1)
 // 这两个 goroutine 很重要
 go c.readLoop()
 go c.writeLoop()
 return resp, nil
}
func (c *Conn) readLoop() {
 delegate := &connMessageDelegate{c}
 for {
 if atomic.LoadInt32(&c.closeFlag) == 1 {
 goto exit
 }
 // 从 nsqd获取消息
 frameType, data, err := ReadUnpackedResponse(c)
 if err != nil {
 if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 {
 goto exit
 }
 if !strings.Contains(err.Error(), "use of closed network connection") {
 c.log(LogLevelError, "IO error - %s", err)
 c.delegate.OnIOError(c, err)
 }
 goto exit
 }
 // 心跳检测默认30s检查一次,后面会细说一下这里
 if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) {
 c.log(LogLevelDebug, "heartbeat received")
 c.delegate.OnHeartbeat(c)
 err := c.WriteCommand(Nop())
 if err != nil {
 c.log(LogLevelError, "IO error - %s", err)
 c.delegate.OnIOError(c, err)
 goto exit
 }
 continue
 }
 switch frameType {
 // 处理相应信息
 case FrameTypeResponse:
 c.delegate.OnResponse(c, data)
 // 接收消息进行消费
 case FrameTypeMessage:
 msg, err := DecodeMessage(data)
 if err != nil {
 c.log(LogLevelError, "IO error - %s", err)
 c.delegate.OnIOError(c, err)
 goto exit
 }
 msg.Delegate = delegate
 msg.NSQDAddress = c.String()
 atomic.AddInt64(&c.messagesInFlight, 1)
 atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())
 // 这里将从nsqd那边获取到的消息扔到了一个channel中,这个channel就是上面 handlerloop死循环中在等待消息的channel
 c.delegate.OnMessage(c, msg)
 case FrameTypeError:
 c.log(LogLevelError, "protocol error - %s", data)
 c.delegate.OnError(c, data)
 default:
 c.log(LogLevelError, "IO error - %s", err)
 c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
 }
 }
exit:
 atomic.StoreInt32(&c.readLoopRunning, 0)
 // start the connection close
 messagesInFlight := atomic.LoadInt64(&c.messagesInFlight)
 if messagesInFlight == 0 {
 // if we exited readLoop with no messages in flight
 // we need to explicitly trigger the close because
 // writeLoop won't
 c.close()
 } else {
 c.log(LogLevelWarning, "delaying close, %d outstanding messages", messagesInFlight)
 }
 c.wg.Done()
 c.log(LogLevelInfo, "readLoop exiting")
}
func (c *Conn) writeLoop() {
 for {
 select {
 case <-c.exitChan:
 c.log(LogLevelInfo, "breaking out of writeLoop")
 // Indicate drainReady because we will not pull any more off msgResponseChan
 close(c.drainReady)
 goto exit
 case cmd := <-c.cmdChan:
 err := c.WriteCommand(cmd)
 if err != nil {
 c.log(LogLevelError, "error sending command %s - %s", cmd, err)
 c.close()
 continue
 }
 case resp := <-c.msgResponseChan:
 // Decrement this here so it is correct even if we can't respond to nsqd
 msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)
 if resp.success {
 c.log(LogLevelDebug, "FIN %s", resp.msg.ID)
 c.delegate.OnMessageFinished(c, resp.msg)
 c.delegate.OnResume(c)
 } else {
 c.log(LogLevelDebug, "REQ %s", resp.msg.ID)
 c.delegate.OnMessageRequeued(c, resp.msg)
 if resp.backoff {
 c.delegate.OnBackoff(c)
 } else {
 c.delegate.OnContinue(c)
 }
 }
 err := c.WriteCommand(resp.cmd)
 if err != nil {
 c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err)
 c.close()
 continue
 }
 if msgsInFlight == 0 &&
 atomic.LoadInt32(&c.closeFlag) == 1 {
 c.close()
 continue
 }
 }
 }
exit:
 c.wg.Done()
 c.log(LogLevelInfo, "writeLoop exiting")
}

当消息处理完成consumer会通过writeloop向nsqd发送FIN 命令,告诉nsqd我有哪些消息消费完成可以从队列中移除了。
其实上面是go nsq这个客户端的代码,还没有看到 nsq本身的代码,先总结一下。然后继续看nsqd的代码
生产者

  1. 生产者先初始化Producerj结构体,然后设置一些配置
  2. 生产者和nsqd建立tcp连接
  3. 协商版本
  4. 生产者启动一个route协程,这个协程用来不断的向nsqd发送PUB指令,同时携带消息

消费者

  1. 消费者初始化Consumer结构体
  2. 消费者通过nsqlookup和 nsqd 建立tcp连接,nsqd可能是一个也可能是多个
  3. 协商版本
  4. 建立连接后发送自己的识别信息给nsqd,携带一些基本配置信息,比如心跳间隔、消息消费超时、客户端id等等
  5. 启动RDY限流机制
  6. 启动 readloop、writeloop

有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:朱伟

查看原文:Nsq 原理分析(一)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1416 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏