分享
  1. 首页
  2. 文章

Golang封装RabbitMQ

imsgy · · 2773 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

程序封装

package rabbitmq
import (
 "fmt"
 "github.com/streadway/amqp"
 "time"
)
// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel
// 定义生产者接口
type Producer interface {
 MsgContent() string
}
// 定义接收者接口
type Receiver interface {
 Consumer([]byte) error
}
// 定义RabbitMQ对象
type RabbitMQ struct {
 connection *amqp.Connection
 channel *amqp.Channel
 queueName string // 队列名称
 routingKey string // key名称
 exchangeName string // 交换机名称
 exchangeType string // 交换机类型
 producerList []Producer
 receiverList []Receiver
}
// 定义队列交换机对象
type QueueExchange struct {
 QuName string // 队列名称
 RtKey string // key值
 ExName string // 交换机名称
 ExType string // 交换机类型
}
// 链接rabbitMQ
func (r *RabbitMQ)mqConnect() {
 var err error
 RabbitUrl := fmt.Sprintf("amqp://%s:%s@%s:%d/", "guest", "guest", "******", 5673)
 mqConn, err = amqp.Dial(RabbitUrl)
 r.connection = mqConn // 赋值给RabbitMQ对象
 if err != nil {
 fmt.Printf("MQ打开链接失败:%s \n", err)
 }
 mqChan, err = mqConn.Channel()
 r.channel = mqChan // 赋值给RabbitMQ对象
 if err != nil {
 fmt.Printf("MQ打开管道失败:%s \n", err)
 }
}
// 关闭RabbitMQ连接
func (r *RabbitMQ)mqClose() {
 // 先关闭管道,再关闭链接
 err := r.channel.Close()
 if err != nil {
 fmt.Printf("MQ管道关闭失败:%s \n", err)
 }
 err = r.connection.Close()
 if err != nil {
 fmt.Printf("MQ链接关闭失败:%s \n", err)
 }
}
// 创建一个新的操作对象
func New(q *QueueExchange) *RabbitMQ {
 return &RabbitMQ{
 queueName:q.QuName,
 routingKey:q.RtKey,
 exchangeName: q.ExName,
 exchangeType: q.ExType,
 }
}
// 启动RabbitMQ客户端,并初始化
func (r *RabbitMQ) Start() {
 // 开启监听生产者发送任务
 for _, producer := range r.producerList {
 go r.listenProducer(producer)
 }
 // 开启监听接收者接收任务
 for _, receiver := range r.receiverList {
 go r.listenReceiver(receiver)
 }
 time.Sleep(1*time.Second)
}
// 注册发送指定队列指定路由的生产者
func (r *RabbitMQ) RegisterProducer(producer Producer) {
 r.producerList = append(r.producerList, producer)
}
// 发送任务
func (r *RabbitMQ) listenProducer(producer Producer) {
 // 验证链接是否正常,否则重新链接
 if r.channel == nil {
 r.mqConnect()
 }
 // 用于检查队列是否存在,已经存在不需要重复声明
 _, err := r.channel.QueueDeclarePassive(r.queueName, true,false,false,true,nil)
 if err != nil{
 // 队列不存在,声明队列
 // name:队列名称;durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;autoDelete:是否自动删除;noWait:是否非阻塞,
 // true为是,不等待RMQ返回信息;args:参数,传nil即可;exclusive:是否设置排他
 _, err = r.channel.QueueDeclare(r.queueName, true, false, false, true, nil)
 if err != nil {
 fmt.Printf("MQ注册队列失败:%s \n", err)
 return
 }
 }
 // 队列绑定
 err = r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, true,nil)
 if err != nil {
 fmt.Printf("MQ绑定队列失败:%s \n", err)
 return
 }
 // 用于检查交换机是否存在,已经存在不需要重复声明
 err = r.channel.ExchangeDeclarePassive(r.exchangeName, r.exchangeType, true, false, false, true, nil)
 if err != nil{
 // 注册交换机
 // name:交换机名称,kind:交换机类型,durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;autoDelete:是否自动删除;
 // noWait:是否非阻塞, true为是,不等待RMQ返回信息;args:参数,传nil即可; internal:是否为内部
 err = r.channel.ExchangeDeclare(r.exchangeName, r.exchangeType, true, false, false, true, nil)
 if err != nil {
 fmt.Printf("MQ注册交换机失败:%s \n", err)
 return
 }
 }
 // 发送任务消息
 err = r.channel.Publish(r.exchangeName, r.routingKey, false, false, amqp.Publishing{
 ContentType: "text/plain",
 Body: []byte(producer.MsgContent()),
 })
 if err != nil {
 fmt.Printf("MQ任务发送失败:%s \n", err)
 return
 }
}
// 注册接收指定队列指定路由的数据接收者
func (r *RabbitMQ) RegisterReceiver(receiver Receiver) {
 r.receiverList = append(r.receiverList, receiver)
}
// 监听接收者接收任务
func (r *RabbitMQ) listenReceiver(receiver Receiver) {
 // 处理结束关闭链接
 defer r.mqClose()
 // 验证链接是否正常
 if r.channel == nil {
 r.mqConnect()
 }
 // 用于检查队列是否存在,已经存在不需要重复声明
 _, err := r.channel.QueueDeclarePassive(r.queueName, true,false,false,true,nil)
 if err != nil{
 // 队列不存在,声明队列
 // name:队列名称;durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;autoDelete:是否自动删除;noWait:是否非阻塞,
 // true为是,不等待RMQ返回信息;args:参数,传nil即可;exclusive:是否设置排他
 _, err = r.channel.QueueDeclare(r.queueName, true, false, false, true, nil)
 if err != nil {
 fmt.Printf("MQ注册队列失败:%s \n", err)
 return
 }
 }
 // 绑定任务
 err = r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, true, nil)
 if err != nil {
 fmt.Printf("绑定队列失败:%s \n", err)
 return
 }
 // 获取消费通道,确保rabbitMQ一个一个发送消息
 err = r.channel.Qos(1, 0, true)
 msgList, err := r.channel.Consume(r.queueName, "", false, false, false, false, nil)
 if err != nil {
 fmt.Printf("获取消费通道异常:%s \n", err)
 return
 }
 for msg := range msgList {
 // 处理数据
 err := receiver.Consumer(msg.Body)
 if err!=nil {
 err = msg.Ack(true)
 if err != nil {
 fmt.Printf("确认消息未完成异常:%s \n", err)
 return
 }
 }else {
 // 确认消息,必须为false
 err = msg.Ack(false)
 if err != nil {
 fmt.Printf("确认消息完成异常:%s \n", err)
 return
 }
 return
 }
 }
}

使用方法

package main
import (
 "fmt"
 "test/rabbitmq"
)
type TestPro struct {
 msgContent string
}
// 实现发送者
func (t *TestPro) MsgContent() string {
 return t.msgContent
}
// 实现接收者
func (t *TestPro) Consumer(dataByte []byte) error {
 fmt.Println(string(dataByte))
 return nil
}
func main() {
 msg := fmt.Sprintf("这是测试任务")
 t := &TestPro{
 msg,
 }
 queueExchange := &rabbitmq.QueueExchange{
 "test.rabbit",
 "rabbit.key",
 "test.rabbit.mq",
 "direct",
 }
 mq := rabbitmq.New(queueExchange)
 mq.RegisterProducer(t)
 mq.RegisterReceiver(t)
 mq.Start()
}

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:imsgy

查看原文:Golang封装RabbitMQ

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
2773 次点击 ∙ 1 赞
下一篇:golang的学习
3 回复 | 直到 2019年07月31日 10:00:58
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏