分享
  1. 首页
  2. 文章

golang-针对缓存击穿动态补充缓存策略

yzbzg · · 1695 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

  1. 什么是缓存击穿

某时刻缓存失效,大批量请求涌来,导致db io过大

2. go如何解决大量请求涌来,动态即缓存并返回且使db即cache中间件最低限度的io操作

通过channel控制所有请求只有一个请求去持久缓存,其余等待并订阅该事件(也就是获取缓存的事件)。当缓存写入完成后,通过每个请求上线文多路通知所有请求并返回。


//具体业务获取策略获取定制器
type BaseWorker struct {
}
func (b *BaseWorker) Context(p context.Context) (cCtx context.Context, cancel context.CancelFunc) {
	cCtx, cancel = context.WithCancel(p)
	return
}
func (b *BaseWorker) Do() (bool, interface{}) {
	panic("implement run")
}
func (b *BaseWorker) Fallback() interface{} {
	return nil
}
// TryGet 尝试获取对象
func (b *BaseWorker) TryGet() (bool, interface{}) {
	panic("implement try get")
}
type IWorker interface {
	Do() (bool, interface{})
	Context(p context.Context) (cCtx context.Context, cancel context.CancelFunc)
	TryGet() (bool, interface{})
	Fallback() interface{}
}
//核心多请求控制处理器
type parallel struct {
	//请求信号(只有一个请求可获取)
	ch chan struct{}
	//父级上下文
	pCxt context.Context
	pCancel context.CancelFunc
	tag string
	//请求池容量
	size int
	//请求池标识
	reqPool chan struct{} //连接池
	//信号返回值
	chRes interface{}
}
// NewParallel 实例化并行操作
func NewParallel(tag string, size int) *parallel {
	p := &parallel{
		ch: make(chan struct{}, 1),
		pCxt: context.Background(),
		tag: tag,
		size: size,
	}
	//初始化请求实例权
	if size > 0 {
		p.reqPool = make(chan struct{}, size)
		for i := 0; i < size; i++ {
			p.reqPool <- struct{}{}
		}
	}
	p.pCxt, p.pCancel = context.WithCancel(context.Background())
	p.ch <- struct{}{}
	return p
}
// release 释放
func (p *parallel) release() {
	select {
	case p.ch <- struct{}{}:
		p.pCxt, p.pCancel = context.WithCancel(context.Background())
	default:
		//并行重复写入
	}
}
// tryGetReq 获取请求连接
func (p *parallel) tryGetReq() bool {
	if p.size <= 0 {
		return true
	}
	select {
	case <-p.reqPool:
		return true
	default:
		return false
	}
}
// releaseReq 回收请求权
func (p *parallel) releaseReq() {
	if p.size <= 0 {
		return
	}
	//回收请求权
	select {
	case p.reqPool <- struct{}{}:
	default:
	}
}
// tryDoRes 获取结果值
type tryDoRes struct {
	Tip string
	Res interface{}
}
// TryDo 任务并行
func (p *parallel) TryDo(worker IWorker) (ret tryDoRes) {
	//获取连接
	ok := p.tryGetReq()
	if !ok {
		//超过连接池.需要做降级返回
		ret.Res = worker.Fallback()
		ret.Tip = "【熔断降级结果】"
		return
	}
	ok, obj := worker.TryGet()
	if ok {
		ret.Res = obj
		ret.Tip = "【正常读取缓存】"
		return
	}
	//获取上下文
	cCtx, cancel := worker.Context(p.pCxt)
	select {
	case <-p.ch:
		_, ret.Res = worker.Do()
		ret.Tip = "【执行操作】"
		p.chRes = ret.Res
		//消息通知
		p.pCancel()
		p.release()
	case <-cCtx.Done():
		//返回数据
		cancel()
		//拉取信号相应值
		ret.Res = p.chRes
		//_, ret.Res = worker.TryGet()
		ret.Tip = "【事件通知】"
	}
	//释放请求
	p.releaseReq()
	return
}
//对象工厂
type factory struct {
	pMap map[string]*parallel
	rwLock *sync.RWMutex
}
func NewFactory() *factory {
	return &factory{
		pMap: make(map[string]*parallel),
		rwLock: &sync.RWMutex{},
	}
}
// Register 注册并行实例
func (f *factory) Register(tag string, size int) {
	defer f.rwLock.Unlock()
	f.rwLock.Lock()
	f.pMap[tag] = NewParallel(tag, size)
	return
}
// Get 获取实例
func (f *factory) Get(tag string) *parallel {
	defer f.rwLock.RUnlock()
	f.rwLock.RLock()
	v, _ := f.pMap[tag]
	return v
}
//具体模拟请求并使用
var (
	//模拟一个缓存
	cacheMap map[string]interface{} = make(map[string]interface{})
	//构造并行对象池
	fac = parallel.NewFactory()
)
//构造一个拉取对应业务缓存的工作单元
type UserCacheWorker struct {
	parallel.BaseWorker //继承基础单元
}
func NewUserCacheWorker() *UserCacheWorker {
	return &UserCacheWorker{}
}
//Do 具体工作业务(如何写入缓存)
func (b *UserCacheWorker) Do() (bool, interface{}) {
	log.Println("写入缓存")
	//模拟高Q延迟
	time.Sleep(time.Second * 5)
	cacheMap["user"] = map[string]interface{}{
		"name": "lsh",
	}
	return true, cacheMap
}
func (b *UserCacheWorker) TryGet() (bool, interface{}) {
	//读缓存策略
	if len(cacheMap) > 0 {
		return true, cacheMap
	}
	//返回结果
	return false, nil
}
// Fallback 重写熔断管道
func (b *UserCacheWorker) Fallback() interface{} {
	return map[string]interface{}{
		"err": "from fallback",
	}
}
func main() {
	//注册并行操作器
	fac.Register("user_cache", 0)
	fmt.Println("====================================loop 1===========================================")
	for i := 0; i < 100000; i++ {
		go func(index int) {
			obj := fac.Get("user_cache").TryDo(NewUserCacheWorker())
			log.Println("[编号]:", index, "[结果]:", obj)
		}(i)
	}
	time.Sleep(time.Second * 10)
	//清空缓存
	cacheMap = make(map[string]interface{})
	time.Sleep(time.Second * 1)
	fmt.Println("====================================loop 2===========================================")
	//再次并行请求
	for i := 0; i < 100000; i++ {
		go func(index int) {
			obj := fac.Get("user_cache").TryDo(NewUserCacheWorker())
			log.Println("[编号]:", index, "[结果]:", obj)
		}(i)
	}
	fmt.Scanln()
}

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1695 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏