分享
  1. 首页
  2. 文章

手撸golang GO与微服务 Saga模式之7

ioly · · 657 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

缘起

最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过补偿动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用
Saga由一系列的子事务"Ti"组成,
每个Ti都有对应的补偿"Ci",
当Ti出现问题时Ci用于处理Ti执行带来的问题。
可以通过下面的两个公式理解Saga模式。
T = T1 T2 ... Tn
T = TCT
Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,
而应该将事务切分为一组按序依次提交的短事务,
Saga模式满足ACD(原子性、一致性、持久性)特征。
摘自 <<Go微服务实战>> 刘金亮, 2021.1

目标

  • 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
  • 事务消息队列服务的功能性要求

    • 消息不会丢失: 消息的持久化
    • 消息的唯一性: 要求每个消息有全局ID和子事务ID
    • 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试

子目标(Day 7)

  • MQS已基本可用, 现在实现一个模拟的订单微服务, 并与MQ联动

    • 长事务: 订单创建后, 联动库存服务, 扣减库存
    • 补偿动作

      • 如果扣库成功, 更新订单状态为已出库(实际系统中, 可能还涉及物流发货等复杂流程)
      • 否则(库存不足), 更新订单状态为出库失败(实际系统中, 可能还涉及退款和通知客户等复杂流程)
  • 流程

    • 创建订单后, 向MQ发布[销售订单.创建]消息
    • 订阅MQ的[销售订单.出库.成功], [销售订单.出库.失败]消息
    • 接收到MQ的出库消息后, 更新订单状态

设计

  • ISaleOrderService: 订单服务接口
  • SaleOrder: 销售订单实体
  • tSaleOrderService: 模拟订单服务, 实现ISaleOrderService接口
  • NotifyStockOutbound: 接收库存服务的扣库结果消息

ISaleOrderService.go

订单服务接口

package order
// ISaleOrderService to manage sale order creation and modification
type ISaleOrderService interface {
 // get order info
 Get(orderID string) *SaleOrder
 // create new order
 Create(it *SaleOrder) error
 // update order status
 Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder)
}

SaleOrder.go

销售订单实体

package order
type SaleOrder struct {
 OrderID string
 CustomerID string
 ProductID string
 Quantity int
 Price float64
 Amount float64
 CreateTime int64
 StatusFlag int32
}
const StatusNotDelivered int32 = 0
const StatusStockOutboundDone int32 = 1
const StatusStockOutboundFailed int32 = 2
const StatusMQServiceFailed int32 = 3

tSaleOrderService.go

模拟订单服务, 实现ISaleOrderService接口

package order
import (
 "bytes"
 "encoding/json"
 "errors"
 "io/ioutil"
 "learning/gooop/saga/mqs/logger"
 "learning/gooop/saga/mqs/models"
 "net/http"
 "sync"
 "sync/atomic"
 "time"
)
type tSaleOrderService struct {
 rwmutex *sync.RWMutex
 orders map[string]*SaleOrder
 bMQReady bool
 publishQueue chan *SaleOrder
}
func newSaleOrderService() ISaleOrderService {
 it := new(tSaleOrderService)
 it.init()
 return it
}
func (me *tSaleOrderService) init() {
 me.rwmutex = new(sync.RWMutex)
 me.orders = make(map[string]*SaleOrder)
 me.bMQReady = false
 me.publishQueue = make(chan *SaleOrder, gMQMaxQueuedMsg)
 go me.beginSubscribeMQ()
 go me.beginPublishMQ()
}
func (me *tSaleOrderService) beginSubscribeMQ() {
 expireDuration := int64(1 * time.Hour)
 subscribeDuration := 20 * time.Minute
 pauseDuration := 3*time.Second
 lastSubscribeTime := int64(0)
 for {
 now := time.Now().UnixNano()
 if now - lastSubscribeTime >= int64(subscribeDuration) {
 expireTime := now + expireDuration
 err := fnSubscribeMQ(expireTime)
 if err != nil {
 me.bMQReady = false
 logger.Logf("tSaleOrderService.beginSubscribeMQ, failed, err=%v", err)
 } else {
 lastSubscribeTime = now
 me.bMQReady = true
 logger.Logf("tSaleOrderService.beginSubscribeMQ, done")
 }
 }
 time.Sleep(pauseDuration)
 }
}
func fnSubscribeMQ(expireTime int64) error {
 msg := &models.SubscribeMsg{
 ClientID: gMQClientID,
 Topic: gMQSubscribeTopic,
 NotifyUrl: gMQServerURL + PathOfNotifyStockOutbound,
 ExpireTime: expireTime,
 }
 url := gMQServerURL + "/subscribe"
 return fnPost(msg, url)
}
func fnPost(msg interface{}, url string) error {
 body,_ := json.Marshal(msg)
 rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))
 if err != nil {
 return err
 }
 defer rsp.Body.Close()
 j, err := ioutil.ReadAll(rsp.Body)
 if err != nil {
 return err
 }
 ok := &models.OkMsg{}
 err = json.Unmarshal(j, ok)
 if err != nil {
 return err
 }
 if !ok.OK {
 return gMQReplyFalse
 }
 return nil
}
func (me *tSaleOrderService) beginPublishMQ() {
 for {
 select {
 case msg := <- me.publishQueue :
 me.publishMQ(msg)
 break
 }
 }
}
func (me *tSaleOrderService) Get(orderID string) *SaleOrder {
 me.rwmutex.RLock()
 defer me.rwmutex.RUnlock()
 it,ok := me.orders[orderID]
 if ok {
 return it
 } else {
 return nil
 }
}
func (me *tSaleOrderService) Create(it *SaleOrder) error {
 me.rwmutex.Lock()
 defer me.rwmutex.Unlock()
 if len(me.publishQueue) >= gMQMaxQueuedMsg {
 return gMQNotAvailableError
 }
 me.orders[it.OrderID] = it
 me.publishQueue <- it
 return nil
}
func (me *tSaleOrderService) publishMQ(it *SaleOrder) {
 url := gMQServerURL + "/publish"
 j,_ := json.Marshal(it)
 msg := &models.TxMsg{
 GlobalID: it.OrderID,
 SubID: it.OrderID,
 SenderID: gMQClientID,
 Topic: gMQPublishTopic,
 CreateTime: it.CreateTime,
 Content: string(j),
 }
 for i := 0;i < gMQMaxPublishRetry;i++ {
 err := fnPost(msg, url)
 if err != nil {
 logger.Logf("tSaleOrderService.publishMQ, failed, err=%v, order=%v", err, it)
 time.Sleep(gMQPublishInterval)
 } else {
 logger.Logf("tSaleOrderService.publishMQ, done, order=%v", it)
 return
 }
 }
 // publish failed
 logger.Logf("tSaleOrderService.publishMQ, failed max retries, order=%v", it)
 _,_ = me.Update(it.OrderID, StatusNotDelivered, StatusMQServiceFailed)
}
func (me *tSaleOrderService) Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder) {
 me.rwmutex.RLock()
 defer me.rwmutex.RUnlock()
 it, ok := me.orders[orderID]
 if !ok {
 return gNotFoundError, nil
 }
 if !atomic.CompareAndSwapInt32(&it.StatusFlag, oldStatusFlag, newStatusFlag) {
 return gStatusChangedError, it
 }
 it.StatusFlag = newStatusFlag
 return nil, it
}
var gMQReplyFalse = errors.New("mq reply false")
var gMQNotAvailableError = errors.New("mq not ready")
var gNotFoundError = errors.New("order not found")
var gStatusChangedError = errors.New("status changed")
var gMQMaxPublishRetry = 3
var gMQPublishInterval = 1*time.Second
var gMQSubscribeTopic = "sale-order.stock.outbound"
var gMQPublishTopic = "sale-order.created"
var gMQClientID = "sale-order-service"
var gMQServerURL = "http://localhost:333"
var gMQMaxQueuedMsg = 1024
var SaleOrderService = newSaleOrderService()

NotifyStockOutbound.go

接收库存服务的扣库结果消息

package order
import (
 "encoding/json"
 "github.com/gin-gonic/gin"
 "io/ioutil"
 "learning/gooop/saga/mqs/logger"
 "learning/gooop/saga/mqs/models"
 "net/http"
)
func NotifyStockOutbound(c *gin.Context) {
 body := c.Request.Body
 defer body.Close()
 j, e := ioutil.ReadAll(body)
 if e != nil {
 logger.Logf("order.NotifyStockOutbound, failed ioutil.ReadAll")
 c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
 return
 }
 msg := &models.TxMsg{}
 e = json.Unmarshal(j, msg)
 if e != nil {
 logger.Logf("order.NotifyStockOutbound, failed json.Unmarshal")
 c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
 return
 }
 orderID := msg.GlobalID
 succeeded := msg.Content == "1"
 logger.Logf("order.NotifyStockOutbound, orderID=%v, succeeded=%s", orderID, succeeded)
 var newStatusFlag int32
 if succeeded {
 newStatusFlag = StatusStockOutboundDone
 } else {
 newStatusFlag = StatusStockOutboundFailed
 }
 err, order := SaleOrderService.Update(orderID, StatusNotDelivered, newStatusFlag)
 if err != nil {
 logger.Logf("order.NotifyStockOutbound, failed SaleOrderService.Update, err=%v, order=%v", err, order)
 }
 
 c.JSON(http.StatusOK, gin.H{ "ok": true })
}
var PathOfNotifyStockOutbound = "/notify/sale-order.stock.outbound"

(未完待续)


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:ioly

查看原文:手撸golang GO与微服务 Saga模式之7

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
657 次点击
1 回复 | 直到 2025年05月13日 22:27:59
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏