分享
  1. 首页
  2. 文章

手撸golang GO与微服务 Saga模式之8 集成测试

老罗话编程 · · 514 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

手撸golang GO与微服务 Saga模式之8 集成测试

缘起

最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过补偿动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用
Saga由一系列的子事务"Ti"组成,
每个Ti都有对应的补偿"Ci",
当Ti出现问题时Ci用于处理Ti执行带来的问题。
可以通过下面的两个公式理解Saga模式。
T = T1 T2 ... Tn
T = TCT
Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,
而应该将事务切分为一组按序依次提交的短事务,
Saga模式满足ACD(原子性、一致性、持久性)特征。
摘自 <<Go微服务实战>> 刘金亮, 2021.1

目标

  • 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
  • 事务消息队列服务的功能性要求
    • 消息不会丢失: 消息的持久化
    • 消息的唯一性: 要求每个消息有全局ID和子事务ID
    • 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试

子目标(Day 8)

  • 创建虚拟的库存服务
    • 启动时, 注册到MQ
    • 接收到订单创建的消息时, 扣减库存
    • 扣库成功时, 经MQ通知订单服务扣库成功
    • 扣库失败时, 经MQ通知订单服务扣库失败

设计

  • IStockService: 模拟的库存服务接口
  • tStockService: 虚拟库存服务, 实现IStockService接口
  • NotifySaleOrderCreated: 用于监听订单创建消息的http回调处理器

单元测试

order_test.go

  1. 初始化10个产品库存
  2. 订单服务, 创建订单1, 尝试扣减1个库存, 预期成功
  3. 订单服务, 创建订单2, 尝试扣减10个库存, 预期失败
  4. 校验订单1的最终状态为出库成功
  5. 校验订单2的最终状态为出库失败
package saga
import (
 "github.com/jmoiron/sqlx"
 "learning/gooop/saga/mqs/cmd"
 "learning/gooop/saga/mqs/database"
 "learning/gooop/saga/mqs/logger"
 "learning/gooop/saga/order"
 "learning/gooop/saga/stock"
 "sync"
 "testing"
 "time"
)
var gRunOnce sync.Once
func fnBootMQS() {
 gRunOnce.Do(func() {
 // boot mqs
 go cmd.BootMQS()
 // wait for mqs up
 time.Sleep(1 * time.Second)
 })
}
func fnAssertTrue (t *testing.T, b bool, msg string) {
 if !b {
 t.Fatal(msg)
 }
}
func Test_SagaSaleOrder(t *testing.T) {
 // prepare mqs
 fnClearDB(t)
 fnBootMQS()
 // 1 create prod stock
 prodID := "test-prod-1"
 err := stock.MockStockService.AddStock(prodID, 10)
 if err != nil {
 t.Fatal(err)
 }
 // create order 1
 o1 := &order.SaleOrder{
 OrderID: "test-order-1",
 ProductID: prodID,
 CustomerID: "test-customer-1",
 Quantity: 1,
 Price: 100,
 Amount: 100,
 CreateTime: time.Now().UnixNano(),
 StatusFlag: order.StatusNotDelivered,
 }
 err = order.MockSaleOrderService.Create(o1)
 if err != nil {
 t.Fatal(err)
 }
 // create order 2
 time.Sleep(10*time.Millisecond)
 o2 := &order.SaleOrder{
 OrderID: "test-order-2",
 ProductID: prodID,
 CustomerID: "test-customer-2",
 Quantity: 10,
 Price: 100,
 Amount: 1000,
 CreateTime: time.Now().UnixNano(),
 StatusFlag: order.StatusNotDelivered,
 }
 err = order.MockSaleOrderService.Create(o2)
 if err != nil {
 t.Fatal(err)
 }
 time.Sleep(1 * time.Second)
 logger.Logf("============================================")
 log := "tSaleOrderService.beginSubscribeMQ, done"
 fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)
 log = "tSaleOrderService.publishMQ, done, order=test-order-1"
 fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)
 log = "tSaleOrderService.publishMQ, done, order=test-order-2"
 fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)
 log = "stock.NotifySaleOrderCreated, order=test-order-1"
 fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)
 log = "stock.NotifySaleOrderCreated, order=test-order-2"
 fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)
 o1 = order.MockSaleOrderService.Get(o1.OrderID)
 fnAssertTrue(t, o1.StatusFlag == order.StatusStockOutboundDone, "expecting o1 done")
 o2 = order.MockSaleOrderService.Get(o2.OrderID)
 fnAssertTrue(t, o2.StatusFlag == order.StatusStockOutboundFailed, "expecting o2 failed")
 logger.Logf("test passed")
}
func fnClearDB(t *testing.T) {
 fnDBExec(t, "delete from subscriber")
 fnDBExec(t, "delete from tx_msg")
 fnDBExec(t, "delete from delivery_queue")
 fnDBExec(t, "delete from success_queue")
}
func fnDBExec(t *testing.T, sql string, args... interface{}) int {
 rows := []int64{ 0 }
 err := database.DB(func(db *sqlx.DB) error {
 r,e := db.Exec(sql, args...)
 if e != nil {
 return e
 }
 rows[0], e = r.RowsAffected()
 if e != nil {
 return e
 }
 return nil
 })
 if err != nil {
 t.Fatal(err)
 }
 return int(rows[0])
}

测试输出

$ go test -v order_test.go 
=== RUN Test_SagaSaleOrder
23:55:54.292132442 eventbus.Pub, event=system.boot, handler=gDeliveryService.handleBootEvent
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.
[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env: export GIN_MODE=release
 - using code: gin.SetMode(gin.ReleaseMode)
[GIN-debug] GET /ping --> learning/gooop/saga/mqs/handlers.Ping (4 handlers)
[GIN-debug] POST /subscribe --> learning/gooop/saga/mqs/handlers.Subscribe (4 handlers)
[GIN-debug] POST /publish --> learning/gooop/saga/mqs/handlers.Publish (4 handlers)
[GIN-debug] POST /notify --> learning/gooop/saga/mqs/handlers.Notify (4 handlers)
[GIN-debug] POST /notify/sale-order.stock.outbound --> learning/gooop/saga/order.NotifyStockOutbound (4 handlers)
[GIN-debug] POST /notify/sale-order.created --> learning/gooop/saga/stock.NotifySaleOrderCreated (4 handlers)
[GIN-debug] Listening and serving HTTP on :3333
23:55:54.292287032 tDeliveryService.beginCleanExpiredWorkers
23:55:54.292345845 tDeliveryService.beginCreatingWorkers
23:55:54.356542981 handlers.Subscribe, msg=&{sale-order-service sale-order.stock.outbound http://localhost:3333/notify/sale-order.stock.outbound 1616086554355593476}
23:55:54.356524325 handlers.Subscribe, msg=&{stock-service sale-order.created http://localhost:3333/notify/sale-order.created 1616086554355598830}
23:55:54.365256441 handlers.Subscribe, event=subscriber.registered, msg=&{sale-order-service sale-order.stock.outbound http://localhost:3333/notify/sale-order.stock.outbound 1616086554355593476}
23:55:54.365271105 eventbus.Pub, event=subscriber.registered, handler=gDeliveryService.handleSubscriberRegistered
[GIN] 2021年03月18日 - 23:55:54 | 200 | 8.865173ms | ::1 | POST "/subscribe"
[GIN] 2021年03月18日 - 23:55:54 | 200 | 8.882138ms | ::1 | POST "/subscribe"
23:55:54.365488163 tSaleOrderService.beginSubscribeMQ, done
23:55:54.365861542 database.DB, err=empty rows
23:55:54.366239244 tDeliveryWorker.afterInitialLoad, clientID=sale-order-service, rows=0
23:55:54.373588493 handlers.Subscribe, event=subscriber.registered, msg=&{stock-service sale-order.created http://localhost:3333/notify/sale-order.created 1616086554355598830}
23:55:54.373605972 eventbus.Pub, event=subscriber.registered, handler=gDeliveryService.handleSubscriberRegistered
[GIN] 2021年03月18日 - 23:55:54 | 200 | 17.189632ms | ::1 | POST "/subscribe"
[GIN] 2021年03月18日 - 23:55:54 | 200 | 17.205549ms | ::1 | POST "/subscribe"
23:55:54.373843032 tStockService.beginSubscribeMQ, done
23:55:54.3743926 database.DB, err=empty rows
23:55:54.374499757 tDeliveryWorker.afterInitialLoad, clientID=stock-service, rows=0
23:55:55.292336699 tStockService.AddStock, done, prodId=test-prod-1, stock=0, delta=0, after=10
23:55:55.323746568 handlers.Publish, msg=test-order-1/test-order-1/sale-order.created, msgId=112
[GIN] 2021年03月18日 - 23:55:55 | 200 | 31.112478ms | ::1 | POST "/publish"
[GIN] 2021年03月18日 - 23:55:55 | 200 | 31.125855ms | ::1 | POST "/publish"
23:55:55.323811205 handlers.Publish, pubLiveMsg 112
23:55:55.323910377 tSaleOrderService.publishMQ, done, order=test-order-1/&{test-order-1 test-customer-1 test-prod-1 1 100 100 1616082955292352151 0}
23:55:55.324227736 handlers.Publish, pubLiveMsg, msgId=112, rows=1
23:55:55.324273573 handlers.Publish, event=msg.published, clientID=stock-service, msg=test-order-1/test-order-1/http://localhost:3333/notify/sale-order.created
23:55:55.32428051 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service
23:55:55.324285512 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service
23:55:55.324292286 tLiveMsgSource.handleMsgPublished, clientID=stock-service, msg=test-order-1/test-order-1/sale-order.created
23:55:55.324346678 tDeliveryWorker.beginPollAndDeliver, msg from live=&{98 stock-service http://localhost:3333/notify/sale-order.created 112 test-order-1 test-order-1 sale-order-service 1616082955292352151 sale-order.created {"OrderID":"test-order-1","CustomerID":"test-customer-1","ProductID":"test-prod-1","Quantity":1,"Price":100,"Amount":100,"CreateTime":1616082955292352151,"StatusFlag":0} 0 0}
23:55:55.33925766 handlers.Publish, msg=test-order-2/test-order-2/sale-order.created, msgId=113
[GIN] 2021年03月18日 - 23:55:55 | 200 | 15.264561ms | ::1 | POST "/publish"
[GIN] 2021年03月18日 - 23:55:55 | 200 | 15.280884ms | ::1 | POST "/publish"
23:55:55.339353768 handlers.Publish, pubLiveMsg 113
23:55:55.339446893 tSaleOrderService.publishMQ, done, order=test-order-2/&{test-order-2 test-customer-2 test-prod-1 10 100 1000 1616082955302734821 0}
23:55:55.339909493 handlers.Publish, pubLiveMsg, msgId=113, rows=1
23:55:55.339919874 handlers.Publish, event=msg.published, clientID=stock-service, msg=test-order-2/test-order-2/http://localhost:3333/notify/sale-order.created
23:55:55.339925049 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service
23:55:55.339929964 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service
23:55:55.339935935 tLiveMsgSource.handleMsgPublished, clientID=stock-service, msg=test-order-2/test-order-2/sale-order.created
23:55:55.350117186 tDeliveryWorker.deliver, begin, id=stock-service, msg=test-order-1/test-order-1
23:55:55.35041833 stock.NotifySaleOrderCreated, order=test-order-1/&{test-order-1 test-customer-1 test-prod-1 1 100 100 1616082955292352151 0}
23:55:55.350429178 tStockService.AddStock, done, prodId=test-prod-1, stock=10, delta=-1, after=9
[GIN] 2021年03月18日 - 23:55:55 | 200 | 88.872μs | ::1 | POST "/notify/sale-order.created"
[GIN] 2021年03月18日 - 23:55:55 | 200 | 133.617μs | ::1 | POST "/notify/sale-order.created"
23:55:55.350592351 tDeliveryWorker.deliver, OK, id=stock-service, msg=test-order-1/test-order-1
23:55:55.367336707 tDeliveryWorker.afterDeliverySuccess, done, id=stock-service, msg=test-order-1/test-order-1
23:55:55.36738322 tDeliveryWorker.beginPollAndDeliver, msg from live=&{99 stock-service http://localhost:3333/notify/sale-order.created 113 test-order-2 test-order-2 sale-order-service 1616082955302734821 sale-order.created {"OrderID":"test-order-2","CustomerID":"test-customer-2","ProductID":"test-prod-1","Quantity":10,"Price":100,"Amount":1000,"CreateTime":1616082955302734821,"StatusFlag":0} 0 0}
23:55:55.367530495 database.DB, err=empty rows
23:55:55.374978535 tDeliveryWorker.deliver, begin, id=stock-service, msg=test-order-2/test-order-2
23:55:55.375201115 stock.NotifySaleOrderCreated, order=test-order-2/&{test-order-2 test-customer-2 test-prod-1 10 100 1000 1616082955302734821 0}
23:55:55.375211216 tStockService.AddStock, failed, prodId=test-prod-1, stock=9, delta=-10
23:55:55.375219558 tStockService.HandleSaleOrderCreated, err=insufficient stock, order=&{test-order-2 test-customer-2 test-prod-1 10 100 1000 1616082955302734821 0}
[GIN] 2021年03月18日 - 23:55:55 | 200 | 102.52μs | ::1 | POST "/notify/sale-order.created"
[GIN] 2021年03月18日 - 23:55:55 | 200 | 116.933μs | ::1 | POST "/notify/sale-order.created"
23:55:55.375354895 tDeliveryWorker.deliver, OK, id=stock-service, msg=test-order-2/test-order-2
23:55:55.389901711 tDeliveryWorker.afterDeliverySuccess, done, id=stock-service, msg=test-order-2/test-order-2
23:55:55.38993077 tDeliveryWorker.beginPollAndDeliver, msg from db=&{99 stock-service http://localhost:3333/notify/sale-order.created 113 test-order-2 test-order-2 sale-order-service 1616082955302734821 sale-order.created {"OrderID":"test-order-2","CustomerID":"test-customer-2","ProductID":"test-prod-1","Quantity":10,"Price":100,"Amount":1000,"CreateTime":1616082955302734821,"StatusFlag":0} 1 1616082955367401386}
23:55:55.420121681 handlers.Publish, msg=test-order-1/test-order-1.outbound/sale-order.stock.outbound, msgId=114
[GIN] 2021年03月18日 - 23:55:55 | 200 | 69.507171ms | ::1 | POST "/publish"
[GIN] 2021年03月18日 - 23:55:55 | 200 | 69.520805ms | ::1 | POST "/publish"
23:55:55.420220719 handlers.Publish, pubLiveMsg 114
23:55:55.420321792 tStockService.publishMQ, done, msg=&{test-order-1 test-order-1.outbound stock-service 1616082955350432496 sale-order.stock.outbound 1}
23:55:55.42071623 handlers.Publish, pubLiveMsg, msgId=114, rows=1
23:55:55.420731889 handlers.Publish, event=msg.published, clientID=sale-order-service, msg=test-order-1/test-order-1.outbound/http://localhost:3333/notify/sale-order.stock.outbound
23:55:55.420741935 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service
23:55:55.420746401 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service
23:55:55.420755367 tLiveMsgSource.handleMsgPublished, clientID=sale-order-service, msg=test-order-1/test-order-1.outbound/sale-order.stock.outbound
23:55:55.42079505 tDeliveryWorker.beginPollAndDeliver, msg from live=&{100 sale-order-service http://localhost:3333/notify/sale-order.stock.outbound 114 test-order-1 test-order-1.outbound stock-service 1616082955350432496 sale-order.stock.outbound 1 0 0}
23:55:55.435844021 handlers.Publish, msg=test-order-2/test-order-2.outbound/sale-order.stock.outbound, msgId=115
[GIN] 2021年03月18日 - 23:55:55 | 200 | 15.407267ms | ::1 | POST "/publish"
[GIN] 2021年03月18日 - 23:55:55 | 200 | 15.420327ms | ::1 | POST "/publish"
23:55:55.4359058 handlers.Publish, pubLiveMsg 115
23:55:55.436026025 tStockService.publishMQ, done, msg=&{test-order-2 test-order-2.outbound stock-service 1616082955375214295 sale-order.stock.outbound 0}
23:55:55.436398324 handlers.Publish, pubLiveMsg, msgId=115, rows=1
23:55:55.436409937 handlers.Publish, event=msg.published, clientID=sale-order-service, msg=test-order-2/test-order-2.outbound/http://localhost:3333/notify/sale-order.stock.outbound
23:55:55.43642793 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service
23:55:55.436433697 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service
23:55:55.43644379 tLiveMsgSource.handleMsgPublished, clientID=sale-order-service, msg=test-order-2/test-order-2.outbound/sale-order.stock.outbound
23:55:55.446599314 tDeliveryWorker.deliver, begin, id=sale-order-service, msg=test-order-1/test-order-1.outbound
23:55:55.446809726 order.NotifyStockOutbound, orderID=test-order-1, succeeded=true
[GIN] 2021年03月18日 - 23:55:55 | 200 | 61.898μs | ::1 | POST "/notify/sale-order.stock.outbound"
[GIN] 2021年03月18日 - 23:55:55 | 200 | 81.911μs | ::1 | POST "/notify/sale-order.stock.outbound"
23:55:55.446951354 tDeliveryWorker.deliver, OK, id=sale-order-service, msg=test-order-1/test-order-1.outbound
23:55:55.462584405 tDeliveryWorker.afterDeliverySuccess, done, id=sale-order-service, msg=test-order-1/test-order-1.outbound
23:55:55.462615131 tDeliveryWorker.beginPollAndDeliver, msg from live=&{101 sale-order-service http://localhost:3333/notify/sale-order.stock.outbound 115 test-order-2 test-order-2.outbound stock-service 1616082955375214295 sale-order.stock.outbound 0 0 0}
23:55:55.469999185 tDeliveryWorker.deliver, begin, id=sale-order-service, msg=test-order-2/test-order-2.outbound
23:55:55.470163043 order.NotifyStockOutbound, orderID=test-order-2, succeeded=false
[GIN] 2021年03月18日 - 23:55:55 | 200 | 85.14μs | ::1 | POST "/notify/sale-order.stock.outbound"
[GIN] 2021年03月18日 - 23:55:55 | 200 | 105.638μs | ::1 | POST "/notify/sale-order.stock.outbound"
23:55:55.470369408 tDeliveryWorker.deliver, OK, id=sale-order-service, msg=test-order-2/test-order-2.outbound
23:55:55.486229145 tDeliveryWorker.afterDeliverySuccess, done, id=sale-order-service, msg=test-order-2/test-order-2.outbound
23:55:56.302885199 ============================================
23:55:56.303470422 test passed
--- PASS: Test_SagaSaleOrder (2.05s)
PASS
ok command-line-arguments 2.057s

IStockService.go

模拟的库存服务接口

package stock;
import "learning/gooop/saga/order"
type IStockService interface {
 GetStock(prodId string) int
 AddStock(prodId string, delta int) error
 HandleSaleOrderCreated(it *order.SaleOrder) error
}

tStockService.go

虚拟库存服务, 实现IStockService接口

package stock
import (
 "bytes"
 "encoding/json"
 "errors"
 "io/ioutil"
 "learning/gooop/saga/mqs/logger"
 "learning/gooop/saga/mqs/models"
 "learning/gooop/saga/order"
 "net/http"
 "sync"
 "time"
)
type tStockService struct {
 rwmutex *sync.RWMutex
 stock map[string]int
 bMQReady bool
 publishQueue chan *models.TxMsg
}
func newStockService() IStockService {
 it := new(tStockService)
 it.init()
 return it
}
func (me *tStockService) init() {
 me.rwmutex = new(sync.RWMutex)
 me.stock = make(map[string]int)
 me.bMQReady = false
 me.publishQueue = make(chan *models.TxMsg, gMQMaxQueuedMsg)
 go func() {
 time.Sleep(100*time.Millisecond)
 go me.beginSubscribeMQ()
 go me.beginPublishMQ()
 }()
}
func (me *tStockService) GetStock(prodId string) int {
 me.rwmutex.RLock()
 defer me.rwmutex.RUnlock()
 it,ok := me.stock[prodId]
 if ok {
 return it
 } else {
 return 0
 }
}
func (me *tStockService) AddStock(prodId string, delta int) error {
 me.rwmutex.RLock()
 defer me.rwmutex.RUnlock()
 it,ok := me.stock[prodId]
 if ok {
 n := it + delta
 if n < 0 {
 logger.Logf("tStockService.AddStock, failed, prodId=%s, stock=%d, delta=%d", prodId, it, delta)
 return gInsufficientStockError
 } else {
 logger.Logf("tStockService.AddStock, done, prodId=%s, stock=%d, delta=%d, after=%d", prodId, it, delta, n)
 me.stock[prodId] = n
 }
 } else {
 if delta < 0 {
 logger.Logf("tStockService.AddStock, failed, prodId=%s, stock=0, delta=%d", prodId, delta)
 return gInsufficientStockError
 } else {
 logger.Logf("tStockService.AddStock, done, prodId=%s, stock=0, delta=%d, after=%d", prodId, it, delta)
 me.stock[prodId] = delta
 }
 }
 return nil
}
func (me *tStockService) beginSubscribeMQ() {
 expireDuration := int64(1 * time.Hour)
 subscribeDuration := 20 * time.Minute
 pauseDuration := 3*time.Second
 lastSubscribeTime := int64(0)
 for {
 now := time.Now().UnixNano()
 if now - lastSubscribeTime >= int64(subscribeDuration) {
 expireTime := now + expireDuration
 err := fnSubscribeMQ(expireTime)
 if err != nil {
 me.bMQReady = false
 logger.Logf("tStockService.beginSubscribeMQ, failed, err=%v", err)
 } else {
 lastSubscribeTime = now
 me.bMQReady = true
 logger.Logf("tStockService.beginSubscribeMQ, done")
 }
 }
 time.Sleep(pauseDuration)
 }
}
func fnSubscribeMQ(expireTime int64) error {
 msg := &models.SubscribeMsg{
 ClientID: gMQClientID,
 Topic: gMQSubscribeTopic,
 NotifyUrl: gMQServerURL + PathOfNotifySaleOrderCreated,
 ExpireTime: expireTime,
 }
 url := gMQServerURL + "/subscribe"
 return fnPost(msg, url)
}
func fnPost(msg interface{}, url string) error {
 body,_ := json.Marshal(msg)
 rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))
 if err != nil {
 return err
 }
 defer rsp.Body.Close()
 j, err := ioutil.ReadAll(rsp.Body)
 if err != nil {
 return err
 }
 ok := &models.OkMsg{}
 err = json.Unmarshal(j, ok)
 if err != nil {
 return err
 }
 if !ok.OK {
 return gMQReplyFalse
 }
 return nil
}
func (me *tStockService) beginPublishMQ() {
 for {
 select {
 case msg := <- me.publishQueue :
 me.publishMQ(msg)
 break
 }
 }
}
func (me *tStockService) publishMQ(msg *models.TxMsg) {
 url := gMQServerURL + "/publish"
 for i := 0;i < gMQMaxPublishRetry;i++ {
 err := fnPost(msg, url)
 if err != nil {
 logger.Logf("tStockService.publishMQ, failed, err=%v, msg=%v", err, msg)
 time.Sleep(gMQPublishInterval)
 } else {
 logger.Logf("tStockService.publishMQ, done, msg=%v", msg)
 return
 }
 }
 // publish failed
 logger.Logf("tStockService.publishMQ, failed max retries, msg=%v", msg)
}
func (me *tStockService) HandleSaleOrderCreated(it *order.SaleOrder) error {
 msg := &models.TxMsg{}
 msg.GlobalID = it.OrderID
 msg.SubID = it.OrderID + ".outbound"
 msg.SenderID = gMQClientID
 msg.Topic = gMQPublishTopic
 err := me.AddStock(it.ProductID, -it.Quantity)
 msg.CreateTime = time.Now().UnixNano()
 if err != nil {
 logger.Logf("tStockService.HandleSaleOrderCreated, err=%s, order=%v", err.Error(), it)
 msg.Content = "0"
 } else {
 msg.Content = "1"
 }
 if len(me.publishQueue) >= gMQMaxQueuedMsg {
 logger.Logf("tStockService.HandleSaleOrderCreated, err=%s, order=%v", gMQBlocked.Error(), it)
 return gMQBlocked
 } else {
 me.publishQueue <- msg
 return err
 }
}
var gInsufficientStockError = errors.New("insufficient stock")
var gMQReplyFalse = errors.New("mq reply false")
var gMQBlocked = errors.New("mq blocked")
var gMQMaxPublishRetry = 10
var gMQPublishInterval = 1*time.Second
var gMQSubscribeTopic = "sale-order.created"
var gMQPublishTopic = "sale-order.stock.outbound"
var gMQClientID = "stock-service"
var gMQServerURL = "http://localhost:3333"
var gMQMaxQueuedMsg = 1024
var MockStockService = newStockService()

NotifySaleOrderCreated.go

用于监听订单创建消息的http回调处理器

package stock
import (
 "encoding/json"
 "github.com/gin-gonic/gin"
 "io/ioutil"
 "learning/gooop/saga/mqs/logger"
 "learning/gooop/saga/mqs/models"
 "learning/gooop/saga/order"
 "net/http"
)
func NotifySaleOrderCreated(c *gin.Context) {
 body := c.Request.Body
 defer body.Close()
 j, e := ioutil.ReadAll(body)
 if e != nil {
 logger.Logf("stock.NotifySaleOrderCreated, failed ioutil.ReadAll")
 c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
 return
 }
 msg := &models.TxMsg{}
 e = json.Unmarshal(j, msg)
 if e != nil {
 logger.Logf("stock.NotifySaleOrderCreated, failed json.Unmarshal msg")
 c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
 return
 }
 order := &order.SaleOrder{}
 e = json.Unmarshal([]byte(msg.Content), order)
 if e != nil {
 logger.Logf("stock.NotifySaleOrderCreated, failed json.Unmarshal order")
 c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
 return
 }
 logger.Logf("stock.NotifySaleOrderCreated, order=%s/%v", order.OrderID, order)
 // notify stock service
 _ = MockStockService.HandleSaleOrderCreated(order)
 c.JSON(http.StatusOK, gin.H{ "ok": true })
}
var PathOfNotifySaleOrderCreated = "/notify/sale-order.created"

(未完待续)


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:老罗话编程

查看原文:手撸golang GO与微服务 Saga模式之8 集成测试

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
514 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏