分享
  1. 首页
  2. 文章

手撸golang GO与微服务 ES-CQRS模式之2

老罗话编程 · · 569 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

手撸golang GO与微服务 ES-CQRS模式之2

缘起

最近阅读 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采用golang练习之
gitee:

ES-CQRS模式

ES(Event Sourcing)事件溯源非常好理解,
指的是将每次的事件都记录下来,
而不是去记录对象的状态。
比如新建、修改等都会作为事件记录下来,
当需要最新的状态时,通过事件的堆叠来计算最新的状态。
按照事件溯源的模式进行架构设计,
就是事件驱动架构(Event DrivenArchitecture, EDA)。
命令查询职责分离(CQRS)最早来自Betrand Meyer写的
Object-OrientedSoftware Construction一书,
指的是命令查询分离(Command Query Separation,CQS)。
其基本思想是任何一个对象的方法都可以分为以下两大类:
▪ 命令(Command):不返回任何结果(void),但会改变对象的状态。
▪ 查询(Query):返回结果,但是不会改变对象的状态,对系统没有副作用。
CQRS的核心出发点就是把系统分为读和写两部分,从而方便分别进行优化。

目标(Day 2)

  • 根据ES-CQRS模式, 大幅重构Day 1的设计, 并进行单元测试

设计

  • TodoDTO: 待办事宜数值对象
  • OperationTag: todo写入事件的类型标记
  • TodoEvent: todo写入事件
  • ClassTag: json序列化的类型标记
  • tJSONData: json序列化的数据容器
  • IEventBus: 事件总线接口
  • iTodoEventSerializer: 事件序列化到JSON数据的接口
  • iTodoReader: todo读取接口
  • iTodoWriter: todo写入接口
  • iJSONStore: json文件读写接口
  • ITodoService: todo待办事宜服务接口
  • tEventBus: 事件总线的实现
  • tTodoEventSerializer: 事件序列化到JSON的实现
  • tTodoWriter: 事件写入器的实现, 监听write指令, 并持久化到json存储
  • tMockJSONStore: 虚拟的JSON文件读写实现
  • tTodoReader: 待办事宜读取器, 监听write和load指令, 并计算todo列表的当前状态
  • tMockTodoService: 待办事宜服务的实现

单元测试

todo_app_test.go

package es_cqrs
import (
 td "learning/gooop/es_cqrs/todo_app"
 "testing"
)
func fnAssertTrue (t *testing.T, b bool, msg string) {
 if !b {
 t.Fatal(msg)
 }
}
func Test_TodoApp(t *testing.T) {
 t1 := &td.TodoDTO{ 1, "title-1", "content-1" }
 td.MockTodoService.Create(t1)
 all := td.MockTodoService.GetAll()
 fnAssertTrue(t, len(all) == 1, "expecting 1 item")
 fnAssertTrue(t, all[0].Title == t1.Title, "expecting " + t1.Title)
 t.Log("pass creating")
 t1.Content = "content-1 updated"
 t1.Title = "title-1 updated"
 td.MockTodoService.Update(t1)
 all = td.MockTodoService.GetAll()
 fnAssertTrue(t, len(all) == 1, "expecting 1 item")
 fnAssertTrue(t, all[0].Content == t1.Content, "expecting " + t1.Content)
 t.Log("pass updating")
 td.MockTodoService.Delete(t1)
 all = td.MockTodoService.GetAll()
 fnAssertTrue(t, len(all) == 0, "expecting 0 items")
 t.Log("pass deleting")
}

测试输出

$ go test -v todo_app_test.go 
=== RUN Test_TodoApp
22:38:08.180382833 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.1
22:38:08.180533659 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}
22:38:08.180539669 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.2
22:38:08.180552255 tTodoReader.items: [&{1 title-1 content-1}]
22:38:08.180557245 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.2
22:38:08.180560995 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService
 todo_app_test.go:21: pass creating
22:38:08.180580644 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.1
22:38:08.180604465 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}, {"Tag":2,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}
22:38:08.180612665 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.2
22:38:08.180618512 tTodoReader.items: [&{1 title-1 updated content-1 updated}]
22:38:08.18062244 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.2
22:38:08.180626445 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService
 todo_app_test.go:29: pass updating
22:38:08.180642172 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.1
22:38:08.180656612 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}, {"Tag":2,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}, {"Tag":3,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}
22:38:08.180669129 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.2
22:38:08.180672774 tTodoReader.items: []
22:38:08.180675952 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.2
22:38:08.180679309 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService
 todo_app_test.go:34: pass deleting
--- PASS: Test_TodoApp (0.00s)
PASS
ok command-line-arguments 0.002s

TodoDTO.go

待办事宜数值对象

package todo_app
type TodoDTO struct {
 NO int
 Title string
 Content string
}
func (me *TodoDTO) Clone() *TodoDTO {
 return &TodoDTO{
 me.NO, me.Title, me.Content,
 }
}

OperationTag.go

todo写入事件的类型标记

package todo_app
type OperationTag int
const OPCreated OperationTag = 1
const OPUpdated OperationTag = 2
const OPDeleted OperationTag = 3

TodoEvent.go

todo写入事件

package todo_app
type TodoEvent struct {
 Tag OperationTag
 Data *TodoDTO
}

ClassTag.go

json序列化的类型标记

package todo_app
type ClassTag int
const TodoEventClass ClassTag = 1

tJSONData.go

json序列化的数据容器

package todo_app
import "encoding/json"
type tJSONData struct {
 Tag ClassTag
 Content []byte
}
func (me *tJSONData) Set(tag ClassTag, it interface{}) error {
 me.Tag = tag
 j, e := json.Marshal(it)
 if e != nil {
 return e
 }
 me.Content = j
 return nil
}
func (me *tJSONData) Get(it interface{}) error {
 return json.Unmarshal(me.Content, it)
}

IEventBus.go

事件总线接口

package todo_app
type EventHandleFunc func(e string, args interface{})
type EventHandler struct {
 ID string
 Handler EventHandleFunc
}
type IEventBus interface {
 Pub(e string, args interface{})
 Sub(e string, id string, handleFunc EventHandleFunc)
 Unsub(e string, id string)
}
const EventWriteTodoCmd = "todo.write.cmd"
const EventReadTodoCmd = "todo.read.cmd"
const EventReadTodoRet = "todo.read.ret"
const EventLoadTodoCmd = "todo.load.cmd"

iTodoEventSerializer.go

事件序列化到JSON数据的接口

package todo_app
type iTodoEventSerializer interface {
 Serialize(it *TodoEvent) *tJSONData
}

iTodoReader.go

todo读取接口

package todo_app
type iTodoReader interface {
 All() []*TodoDTO
 HandleTodoEvent(e *TodoEvent)
}

iTodoWriter.go

todo写入接口

package todo_app
type iTodoWriter interface {
 HandleTodoEvent(e *TodoEvent)
}

iJSONStore.go

json文件读写接口

package todo_app
type iJSONStore interface {
 Load()
 Append(it *tJSONData)
}

ITodoService.go

todo待办事宜服务接口

package todo_app
type ITodoService interface {
 Create(it *TodoDTO)
 Update(it *TodoDTO)
 Delete(it *TodoDTO)
 GetAll() []*TodoDTO
}

tEventBus.go

事件总线的实现

package todo_app
import (
 "learning/gooop/saga/mqs/logger"
 "sync"
)
type tEventBus struct {
 rwmutex *sync.RWMutex
 items map[string][]*EventHandler
}
func newEventHandler(id string, handleFunc EventHandleFunc) *EventHandler {
 return &EventHandler{
 id, handleFunc,
 }
}
func newEventBus() IEventBus {
 it := new(tEventBus)
 it.init()
 return it
}
func (me *tEventBus) init() {
 me.rwmutex = new(sync.RWMutex)
 me.items = make(map[string][]*EventHandler)
}
func (me *tEventBus) Pub(e string, args interface{}) {
 me.rwmutex.RLock()
 defer me.rwmutex.RUnlock()
 handlers, ok := me.items[e]
 if ok {
 for _, it := range handlers {
 logger.Logf("eventbus.Pub, event=%s, handler=%s", e, it.ID)
 it.Handler(e, args)
 }
 }
}
func (me *tEventBus) Sub(e string, id string, handleFunc EventHandleFunc) {
 me.rwmutex.Lock()
 defer me.rwmutex.Unlock()
 handler := newEventHandler(id, handleFunc)
 handlers, ok := me.items[e]
 if ok {
 me.items[e] = append(handlers, handler)
 } else {
 me.items[e] = []*EventHandler{handler}
 }
}
func (me *tEventBus) Unsub(e string, id string) {
 me.rwmutex.Lock()
 defer me.rwmutex.Unlock()
 handlers, ok := me.items[e]
 if ok {
 for i, it := range handlers {
 if it.ID == id {
 lastI := len(handlers) - 1
 if i != lastI {
 handlers[i], handlers[lastI] = handlers[lastI], handlers[i]
 }
 me.items[e] = handlers[:lastI]
 }
 }
 }
}
var GlobalEventBus = newEventBus()

tTodoEventSerializer.go

事件序列化到JSON的实现

package todo_app
type tTodoEventSerializer struct {
}
func newEventSeiralizer() iTodoEventSerializer {
 it := new(tTodoEventSerializer)
 return it
}
func (me *tTodoEventSerializer) Serialize(e *TodoEvent) *tJSONData {
 it := new(tJSONData)
 err := it.Set(TodoEventClass, e)
 if err != nil {
 return nil
 }
 return it
}
var gDefaultEventSerializer = newEventSeiralizer()

tTodoWriter.go

事件写入器的实现, 监听write指令, 并持久化到json存储

package todo_app
import (
 "fmt"
 "sync/atomic"
)
type tTodoWriter struct {
 id string
}
func newTodoWriter() iTodoWriter {
 it := new(tTodoWriter)
 it.init()
 return it
}
func (me *tTodoWriter) init() {
 me.id = fmt.Sprintf("tTodoWriter.%d", atomic.AddInt32(&gWriterCounter, 1))
 GlobalEventBus.Sub(EventWriteTodoCmd, me.id, me.handleWriteTodoCmd)
}
func (me *tTodoWriter) handleWriteTodoCmd(e string, args interface{}) {
 switch e {
 case EventWriteTodoCmd:
 if it, ok := args.(*TodoEvent); ok {
 me.HandleTodoEvent(it)
 }
 break
 }
}
func (me *tTodoWriter) HandleTodoEvent(e *TodoEvent) {
 j := gDefaultEventSerializer.Serialize(e)
 if j != nil {
 MockJSONStore.Append(j)
 }
}
var gWriterCounter int32 = 0

tMockJSONStore.go

虚拟的JSON文件读写实现

package todo_app
import (
 "fmt"
 "learning/gooop/saga/mqs/logger"
 "strings"
 "sync"
)
type tMockJSONStore struct {
 rwmutex *sync.RWMutex
 once sync.Once
 items []*tJSONData
}
func newMockJSONStore() iJSONStore {
 it := new(tMockJSONStore)
 it.init()
 return it
}
func (me *tMockJSONStore) init() {
 me.rwmutex = new(sync.RWMutex)
 me.items = []*tJSONData{}
}
func (me *tMockJSONStore) Load() {
 me.once.Do(func() {
 me.rwmutex.RLock()
 defer me.rwmutex.RUnlock()
 for _, it := range me.items {
 switch it.Tag {
 case TodoEventClass:
 v := new(TodoEvent)
 e := it.Get(v)
 if e == nil {
 GlobalEventBus.Pub(EventLoadTodoCmd, e)
 }
 break
 }
 }
 })
}
func (me *tMockJSONStore) Append(it *tJSONData) {
 me.rwmutex.Lock()
 defer me.rwmutex.Unlock()
 me.items = append(me.items, it)
 lines := []string{}
 for _,it := range me.items {
 lines = append(lines, fmt.Sprintf("%s", string(it.Content)))
 }
 logger.Logf("tMockJSONStore.items: %s", strings.Join(lines, ", "))
}
var MockJSONStore = newMockJSONStore()

tTodoReader.go

待办事宜读取器, 监听write和load指令, 并计算todo列表的当前状态

package todo_app
import (
 "fmt"
 "learning/gooop/saga/mqs/logger"
 "strings"
 "sync"
 "sync/atomic"
)
type tTodoReader struct {
 id string
 rwmutex *sync.RWMutex
 items []*TodoDTO
}
func newTodoReader() iTodoReader {
 it := new(tTodoReader)
 it.init()
 return it
}
func (me *tTodoReader) init() {
 id := fmt.Sprintf("tTodoReader.%d", atomic.AddInt32(&gReaderCounter, 1))
 me.id = id
 me.rwmutex = new(sync.RWMutex)
 GlobalEventBus.Sub(EventWriteTodoCmd, me.id, me.handleEvent)
 GlobalEventBus.Sub(EventLoadTodoCmd, me.id, me.handleEvent)
 GlobalEventBus.Sub(EventReadTodoCmd, me.id, me.handleEvent)
}
func (me *tTodoReader) handleEvent(e string, args interface{}) {
 switch e {
 case EventWriteTodoCmd:
 fallthrough
 case EventLoadTodoCmd:
 if v,ok := args.(*TodoEvent);ok {
 me.HandleTodoEvent(v)
 }
 break
 case EventReadTodoCmd:
 me.handleReadTodoList()
 }
}
func (me *tTodoReader) handleReadTodoList() {
 GlobalEventBus.Pub(EventReadTodoRet, me.All())
}
func (me *tTodoReader) All() []*TodoDTO {
 me.rwmutex.RLock()
 defer me.rwmutex.RUnlock()
 lst := make([]*TodoDTO, len(me.items))
 for i,it := range me.items {
 lst[i] = it
 }
 return lst
}
func (me *tTodoReader) HandleTodoEvent(e *TodoEvent) {
 me.rwmutex.Lock()
 defer me.rwmutex.Unlock()
 switch e.Tag {
 case OPCreated:
 me.items = append(me.items, e.Data.Clone())
 break
 case OPUpdated:
 for i,it := range me.items {
 if it.NO == e.Data.NO {
 me.items[i] = e.Data.Clone()
 break
 }
 }
 break
 case OPDeleted:
 for i,it := range me.items {
 if it.NO == e.Data.NO {
 lastI := len(me.items) - 1
 if i == lastI {
 me.items[i] = nil
 } else {
 me.items[i], me.items[lastI] = me.items[lastI], nil
 }
 me.items = me.items[:lastI]
 break
 }
 }
 break
 }
 lines := []string{}
 for _,it := range me.items {
 lines = append(lines, fmt.Sprintf("%v", it))
 }
 logger.Logf("tTodoReader.items: [%s]", strings.Join(lines, ", "))
}
var gReaderCounter int32 = 1

tMockTodoService.go

待办事宜服务的实现, 提供todo项的CRUD

package todo_app
type tMockTodoService struct {
 items []*TodoDTO
 writer iTodoWriter
 reader iTodoReader
}
func newMockTodoService() ITodoService {
 it := new(tMockTodoService)
 it.init()
 return it
}
func (me *tMockTodoService) init() {
 me.writer = newTodoWriter()
 me.reader = newTodoReader()
 GlobalEventBus.Sub(EventReadTodoRet, "tMockTodoService", me.handleReadTodoRet)
}
func (me *tMockTodoService) handleReadTodoRet(e string, args interface{}) {
 switch e {
 case EventReadTodoRet:
 if it,ok := args.([]*TodoDTO);ok {
 me.items = it
 }
 break
 }
}
func (me *tMockTodoService) Create(it *TodoDTO) {
 GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPCreated, it.Clone() })
}
func (me *tMockTodoService) Update(it *TodoDTO) {
 GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPUpdated, it.Clone() })
}
func (me *tMockTodoService) Delete(it *TodoDTO) {
 GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPDeleted, it.Clone() })
}
func (me *tMockTodoService) GetAll() []*TodoDTO {
 me.items = nil
 GlobalEventBus.Pub(EventReadTodoCmd, nil)
 lst := me.items
 me.items = nil
 return lst
}
var MockTodoService = newMockTodoService()

(ES-CQRS end)


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:老罗话编程

查看原文:手撸golang GO与微服务 ES-CQRS模式之2

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
569 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏