0

本文主要研究一下JetCache的CachePenetrationProtect

CachePenetrationProtect

com/alicp/jetcache/anno/CachePenetrationProtect.java

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.FIELD})
public @interface CachePenetrationProtect {
 boolean value() default true;
 int timeout() default CacheConsts.UNDEFINED_INT;
 TimeUnit timeUnit() default TimeUnit.SECONDS;
}
它定义value、timeout、timeUnit属性

computeIfAbsentImpl

com/alicp/jetcache/AbstractCache.java

 static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
 long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) {
 AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);
 CacheLoader<K, V> newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify);
 CacheGetResult<V> r;
 if (cache instanceof RefreshCache) {
 RefreshCache<K, V> refreshCache = ((RefreshCache<K, V>) cache);
 r = refreshCache.GET(key);
 refreshCache.addOrUpdateRefreshTask(key, newLoader);
 } else {
 r = cache.GET(key);
 }
 if (r.isSuccess()) {
 return r.getValue();
 } else {
 Consumer<V> cacheUpdater = (loadedValue) -> {
 if(needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) {
 if (timeUnit != null) {
 cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult();
 } else {
 cache.PUT(key, loadedValue).waitForResult();
 }
 }
 };
 V loadedValue;
 if (cache.config().isCachePenetrationProtect()) {
 loadedValue = synchronizedLoad(cache.config(), abstractCache, key, newLoader, cacheUpdater);
 } else {
 loadedValue = newLoader.apply(key);
 cacheUpdater.accept(loadedValue);
 }
 return loadedValue;
 }
 }
AbstractCache的computeIfAbsentImpl方法,在cache.config().isCachePenetrationProtect()执行的是synchronizedLoad

synchronizedLoad

 static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K,V> abstractCache,
 K key, Function<K, V> newLoader, Consumer<V> cacheUpdater) {
 ConcurrentHashMap<Object, LoaderLock> loaderMap = abstractCache.initOrGetLoaderMap();
 Object lockKey = buildLoaderLockKey(abstractCache, key);
 while (true) {
 boolean create[] = new boolean[1];
 LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
 create[0] = true;
 LoaderLock loaderLock = new LoaderLock();
 loaderLock.signal = new CountDownLatch(1);
 loaderLock.loaderThread = Thread.currentThread();
 return loaderLock;
 });
 if (create[0] || ll.loaderThread == Thread.currentThread()) {
 try {
 V loadedValue = newLoader.apply(key);
 ll.success = true;
 ll.value = loadedValue;
 cacheUpdater.accept(loadedValue);
 return loadedValue;
 } finally {
 if (create[0]) {
 ll.signal.countDown();
 loaderMap.remove(lockKey);
 }
 }
 } else {
 try {
 Duration timeout = config.getPenetrationProtectTimeout();
 if (timeout == null) {
 ll.signal.await();
 } else {
 boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
 if(!ok) {
 logger.info("loader wait timeout:" + timeout);
 return newLoader.apply(key);
 }
 }
 } catch (InterruptedException e) {
 logger.warn("loader wait interrupted");
 return newLoader.apply(key);
 }
 if (ll.success) {
 return (V) ll.value;
 } else {
 continue;
 }
 }
 }
 }
synchronizedLoad会判断如果loaderThread不是当前线程,则通过ll.signal.await进行等待,等待timeout或者InterruptedException时则执行加载newLoader.apply(key)

小结

JetCache提供了@CachePenetrationProtect注解,支持多线程并发去回源的时候,控制在指定超时时间内整个JVM中只有1个去回源,过了超时时间若未有结果则自行加载,若已有结果则返回loader线程加载的结果。

doc


codecraft
11.9k 声望2k 粉丝

当一个代码的工匠回首往事时,不因虚度年华而悔恨,也不因碌碌无为而羞愧,这样,当他老的时候,可以很自豪告诉世人,我曾经将代码注入生命去打造互联网的浪潮之巅,那是个很疯狂的时代,我在一波波的浪潮上留下...


引用和评论

0 条评论
评论支持部分 Markdown 语法:**粗体** _斜体_ [链接](http://example.com) `代码` - 列表 > 引用。你还可以使用 @ 来通知其他用户。

AltStyle によって変換されたページ (->オリジナル) /