分享
  1. 首页
  2. 文章

Nsq从入门到实践

bysir · · 5429 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

当nsq跑起来之后, 我们可能会遇到以下问题

  • 分布式部署
  • 处理错误(何时requeue)
  • 如何使用golang lib

抱着不应该只停留在入门的态度, 笔者粗浅的研究了一下这几个问题, 希望也对有同样疑问的人有帮助.

部署

由于NSQ的分布式网络结构, NSQD必须广播(到lookup)自己的地址并让消费者连接, 那么多个NSQD无法做透明负载均衡.
就必须为每一个NSQD分配单独的IP(或host)以保证消费者(在lookup找到NSQ节点)能够正确的连接. 这样部署起来可能会麻烦一些 但问题不大, 注意一下就好.

ps: 如果有更好的方法请告诉我, 小弟感激不尽.

NSQ Requeue And Backoff

建议结合官方文档来看

requeue(重试)

用于当错误发生, 需要重试时.

backoff(避退)

backoff能降低消费者吞吐量以让消费者从错误中恢复.

当消费者在backoff状态时, 这个消费者将不再处理任何消息, 直到backoff超时

当触发backoff时控制台将打印:

// 进入backoff状态, RDY设置为0代表准备接收0条消息(不接收消息) (协议详情看 https://nsq.io/clients/tcp_protocol_spec.html)
WRN 1 [test/test] backing off for 1m4s (backoff level 6), setting all to RDY 0
// 时间到了将设置RDY为1接收1条消息以测试状态, 官方将这个状态称为`tests the waters`
WRN 1 [test/test] (DESKTOP-HELJ7V4:4150) backoff timeout expired, sending RDY 1

当有多个消费者竞争时, 出错的消费者应当主动backoff不再处理消息(以让出更多的机会给其他消费者).
如果只有一个消费者, 则消费者会等到backoff超时后才开始处理消息(空出时间让消费者恢复).

避退是存在于整个消费者上的, 所以消费者每当一个消息处理失败了之后都会增加这个消费者的backoff level. 这会影响这个消费者的处理能力.

到底需不需要用backoff, 就要看业务了:

  • 消息是用来更新数据库订单状态的, 这是一个不容易出错的逻辑, 如果需要requeue则需要backoff让出优先级, 让其他消费者来做, 尽量以挽救这个订单.
  • 消息是用来通知第三方(如支付宝支付成功的http回调)的, 一般requeue是发生在第三方端响应不满足预期的响应, 这不是我方消费者的错误, 应当不使用backoff, 避免阻塞消息消费.

参考:

golang lib

nsq提供golang的client lib. 支持全部特性.

本着不重复造轮子原则, 我也想尽大可能的使用nsq lib里的代码逻辑来实现需求, 但有些需求它实现不了, 我也只好自己写代码了.

先看看它原有的几个逻辑

消息自动重试

// Handler is the message processing interface for Consumer
//
// Implement this interface for handlers that return whether or not message
// processing completed successfully.
//
// When the return value is nil Consumer will automatically handle FINishing.
//
// When the returned value is non-nil Consumer will automatically handle REQueing.
type Handler interface {
 HandleMessage(message *Message) error
}

消息自动重试与判断失败

func (r *Consumer) handlerLoop(handler Handler) {
 r.log(LogLevelDebug, "starting Handler")
 for {
 message, ok := <-r.incomingMessages
 if !ok {
 goto exit
 }
 if r.shouldFailMessage(message, handler) {
 message.Finish()
 continue
 }
 err := handler.HandleMessage(message)
 if err != nil {
 r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
 if !message.IsAutoResponseDisabled() {
 message.Requeue(-1)
 }
 continue
 }
 if !message.IsAutoResponseDisabled() {
 message.Finish()
 }
 }
exit:
 r.log(LogLevelDebug, "stopping Handler")
 if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
 r.exit()
 }
}

判断失败

func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
 // message passed the max number of attempts
 if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
 r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
 message.ID, message.Attempts)
 logger, ok := handler.(FailedMessageLogger)
 if ok {
 logger.LogFailedMessage(message)
 }
 return true
 }
 return false
}

requeue

可以看到当handler返回的error不为空时, nsq将自动requeue, 这种重试是很方便但是

使用这个重试机制的坏处是:

  • 不能自定义requeue的等待时间(默认等待时间=config.DefaultRequeueDelay*Attempts)
  • 会在控制台打印一个ERR(不能自定义格式, 而且有一些err不应该打印到控制台), 这点可能有洁癖的开发者受不了.
  • 一些错误不应该重试, 如入参不合法, 再怎么重试也是徒劳. 这时候应该直接失败.

所以我建议不要使用这个err机制, 而应当手动使用msg.Requeue(-1)或者msg.RequeueWithoutBackoff(-1) 来显示指定requeue.

shouldFailMessage

我们可以使用 FailedMessageLogger interface自定义当消息失败时的处理方式.

但它的shouldFailMessage又有什么需求满足不了呢?

  • 在失败的时候拿到最后一次错误信息
  • shouldFailMessage只能判断处理重试次数过多的失败, 不能处理直接失败的消息.

所以又只有自己实现啦:

我们直接在Handler中判断Attempts来实现错误处理.

但为了保证我们的消息不被shouldFailMessage处理, 需要配置MaxAttempts为0或者一个比较大的数.


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:bysir

查看原文:Nsq从入门到实践

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
5429 次点击
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏