分享
  1. 首页
  2. 文章

微服务之间通过RabbitMQ通信

CoderMiner · · 3421 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

微服务之间通过RabbitMQ通信

微服务之间是相互独立的,不像单个工程一样各个模块之间可以直接通过方法调用实现通信,相互独立的服务直接一般的通信方式是使用 HTTP协议rpc协议或者使用消息中间件如RabbitMQ``Kafka

image

在这篇文章 使用Golang和MongoDB构建微服务 已经实现了一个微服务的应用,在文章中已经实现了各个服务直接的通信,是使用的 HTTP的形式 ,那各个服务之间如何通过 RabbitMQ进行消息通信呢,我们现在要实现一个功能,就是一个用户预订电影票的接口,需要服务 User Service(port 8000) 和 服务 Booking Service(port 8003)之间通信,用户预订之后,把预订信息写入到 booking的数据库中

安装 RabbitMQ

安装 RabbitMQ 之前需要先安装 Erlang 的环境 ,然后下载安装RabbitMQ ,请选择对应的版本,安装完成之后,RabbitMQ在Windows上是作为一个服务在后台运行,关于 RabbitMQ 的接口如何使用,请参考官网的 教程,有各个主流语言的实现我们使用的是Go版本,请下载对应的实现接口 go get github.com/streadway/amqp

RabbitMQ的接口做一下简单的封装

  • 定义一个接口

messaging/message.go

type IMessageClient interface {
 ConnectToBroker(connectionStr string) error
 PublishToQueue(data []byte, queueName string) error
 SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error
 Close()
}
type MessageClient struct {
 conn *amqp.Connection
}
  • 连接接口
func (m *MessageClient) ConnectToBroker(connectionStr string) error {
 if connectionStr == "" {
 panic("the connection str mustnt be null")
 }
 var err error
 m.conn, err = amqp.Dial(connectionStr)
 return err
}
  • 发布消息接口
func (m *MessageClient) PublishToQueue(body []byte, queueName string) error {
 if m.conn == nil {
 panic("before publish you must connect the RabbitMQ first")
 }
 ch, err := m.conn.Channel()
 defer ch.Close()
 failOnError(err, "Failed to open a channel")
 q, err := ch.QueueDeclare(
 queueName,
 false,
 false,
 false,
 false,
 nil,
 )
 failOnError(err, "Failed to declare a queue")
 err = ch.Publish(
 "",
 q.Name,
 false,
 false,
 amqp.Publishing{
 ContentType: "application/json",
 Body: body,
 },
 )
 failOnError(err, "Failed to publish a message")
 return nil
}
  • 订阅消息接口
func (m *MessageClient) SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error {
 ch, err := m.conn.Channel()
 //defer ch.Close()
 failOnError(err, "Failed to open a channel")
 q, err := ch.QueueDeclare(
 queueName,
 false,
 false,
 false,
 false,
 nil,
 )
 failOnError(err, "Failed to declare a queue")
 msgs, err := ch.Consume(
 q.Name,
 "",
 true,
 false,
 false,
 false,
 nil,
 )
 failOnError(err, "Failed to register a consumer")
 go consumeLoop(msgs, handlerFunc)
 return nil
}

实现通信

User Service中定义一个新的POST接口 /user/{name}/booking,实现用户的预订功能,预订之后,通过RabbitMQ发布一个消息给
Booking Service,Booking Service接收到消息之后,做相应的处理(写入数据库)

User Service

  • 初始化 MessageClient

users/controllers/user.go

var client messaging.IMessageClient
func init() {
 client = &messaging.MessageClient{}
 err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
 if err != nil {
 fmt.Println("connect to rabbitmq error", err)
 }
}
  • 添加新的路由和实现

routes.go

register("POST", "/user/{name}/booking", controllers.NewBooking, nil)

users/controllers/user.go

func NewBooking(w http.ResponseWriter, r *http.Request) {
 params := mux.Vars(r)
 user_name := params["name"]
 defer r.Body.Close()
 var bookings models.Booking
 body, _ := ioutil.ReadAll(r.Body)
 err := json.Unmarshal(body, &bookings)
 if err != nil {
 fmt.Println("the format body error ", err)
 }
 fmt.Println("user name:", user_name, bookings)
 go notifyMsg(body)
}
  • 用一个协程实现消息的发布
func notifyMsg(body []byte) {
 err := client.PublishToQueue(body, "new_booking")
 if err != nil {
 fmt.Println("Failed to publis message", err)
 }
}

Booking Service

  • 初始化MessageClient
var client messaging.IMessageClient
func initMessage() {
 client = &messaging.MessageClient{}
 err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
 if err != nil {
 fmt.Println("Failed to connect to RabbitMQ", err)
 }
 err = client.SubscribeToQueue("new_booking", getBooking)
 if err != nil {
 fmt.Println("Failed to comsuer the msg", err)
 }
}

在 web服务之前启动

func main() {
 initMessage()
 r := routes.NewRouter()
 http.ListenAndServe(":8003", r)
}
  • 接收后的消息处理
func getBooking(delivery amqp.Delivery) {
 var booking models.Booking
 json.Unmarshal(delivery.Body, &booking)
 booking.Id = bson.NewObjectId().Hex()
 dao.Insert("Booking", "BookModel", booking)
 fmt.Println("the booking msg", booking)
}

验证,需要启动 User ServiceBooking Service
使用 Postman 发送对应的数据

post 127.0.0.1:8000/user/kevin_woo/booking
{
 "name":"kevin_woo",
 "books":[
 {
 "date":"20180727",
 "movies":["5b4c45d49d5e3e33c4a5b97a"]
 },
 {
 "date":"20180810",
 "movies":["5b4c45ea9d5e3e33c4a5b97b"]
 }
 ]
}

可以看到数据库已经有了一条新的预订信息

说明,我这里POST的数据就是booking数据库中的结构,实际情况需要对数据进行封装处理,在POST数据时,没有对数据进行验证,
在实际开发过程中需要对各个数据做相应的验证,这里主要是看一下 RabbitMQ的消息传递处理的过程

源码 Github


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:CoderMiner

查看原文:微服务之间通过RabbitMQ通信

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
3421 次点击 ∙ 2 赞
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏