分享
  1. 首页
  2. 文章

基于 rabbitmq 实现的延时队列

只是一个id · · 1477 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

虽然 rabbitmq 没有延时队列的功能,但是稍微变动一下也是可以实现的

实现延时队列的基本要素

  1. 存在一个倒计时机制:Time To Live(TTL)
  2. 当到达时间点的时候会触发一个发送消息的事件:Dead Letter Exchanges(DLX)

$~~~~~~$基于第一点,我利用的是消息存在过期时间这一特性, 消息一旦过期就会变成dead letter,可以让单独的消息过期,也可以设置整个队列消息的过期时间
rabbitmq会有限取两个值的最小值

$~~~~~~$基于第二点,是用到了rabbitmq的过期消息处理机制:
. x-dead-letter-exchange 将过期的消息发送到指定的 exchange
. x-dead-letter-routing-key 将过期的消息发送到自定的 route当中

在这里例子当中,我使用的是 过期消息+转发指定exchange

在 golang 中的实现

首先是消费者comsumer.go

package main
import (
 "log"
 "github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
 if err != nil {
 log.Fatalf("%s: %s", msg, err)
 }
}
func main() {
 // 建立链接
 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
 failOnError(err, "Failed to connect to RabbitMQ")
 defer conn.Close()
 ch, err := conn.Channel()
 failOnError(err, "Failed to open a channel")
 defer ch.Close()
 // 声明一个主要使用的 exchange
 err = ch.ExchangeDeclare(
 "logs", // name
 "fanout", // type
 true, // durable
 false, // auto-deleted
 false, // internal
 false, // no-wait
 nil, // arguments
 )
 failOnError(err, "Failed to declare an exchange")
 // 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列
 q, err := ch.QueueDeclare(
 "test_logs", // name
 false, // durable
 false, // delete when unused
 true, // exclusive
 false, // no-wait
 nil, // arguments
 )
 failOnError(err, "Failed to declare a queue")
 /**
 * 注意,这里是重点!!!!!
 * 声明一个延时队列, ß我们的延时消息就是要发送到这里
 */
 _, errDelay := ch.QueueDeclare(
 "test_delay", // name
 false, // durable
 false, // delete when unused
 true, // exclusive
 false, // no-wait
 amqp.Table{
 // 当消息过期时把消息发送到 logs 这个 exchange
 "x-dead-letter-exchange":"logs",
 }, // arguments
 )
 failOnError(errDelay, "Failed to declare a delay_queue")
 err = ch.QueueBind(
 q.Name, // queue name, 这里指的是 test_logs
 "", // routing key
 "logs", // exchange
 false,
 nil)
 failOnError(err, "Failed to bind a queue")
 // 这里监听的是 test_logs
 msgs, err := ch.Consume(
 q.Name, // queue name, 这里指的是 test_logs
 "", // consumer
 true, // auto-ack
 false, // exclusive
 false, // no-local
 false, // no-wait
 nil, // args
 )
 failOnError(err, "Failed to register a consumer")
 forever := make(chan bool)
 go func() {
 for d := range msgs {
 log.Printf(" [x] %s", d.Body)
 }
 }()
 log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
 <-forever
}

然后是生产者productor.go

package main
import (
 "log"
 "os"
 "strings"
 "github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
 if err != nil {
 log.Fatalf("%s: %s", msg, err)
 }
}
func main() {
 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
 failOnError(err, "Failed to connect to RabbitMQ")
 defer conn.Close()
 ch, err := conn.Channel()
 failOnError(err, "Failed to open a channel")
 defer ch.Close()
 body := bodyFrom(os.Args)
 // 将消息发送到延时队列上
 err = ch.Publish(
 "", // exchange 这里为空则不选择 exchange
 "test_delay", // routing key
 false, // mandatory
 false, // immediate
 amqp.Publishing{
 ContentType: "text/plain",
 Body: []byte(body),
 Expiration: "5000", // 设置五秒的过期时间
 })
 failOnError(err, "Failed to publish a message")
 log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
 var s string
 if (len(args) < 2) || os.Args[1] == "" {
 s = "hello"
 } else {
 s = strings.Join(args[1:], " ")
 }
 return s
}

运行一下:

go run comsumer.go
go run productor.go

$~~~~~~$具体看代码和注释就行, 这里的关键点就是将要延时的消息发送到过期队列当中, 然后监听的是过期队列转发到的 exchange 下的队列
正常情况就是始终监听一个队列,然后把过期消息发送到延时队列中,当消息到达时间后就把消息发到正在监听的队列

一个自己写的mq工具
博客原文


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:只是一个id

查看原文:基于 rabbitmq 实现的延时队列

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1477 次点击
1 回复 | 直到 2018年05月15日 14:41:00
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏