- 28
- 1
- 11
'use strict';
/*
* cacheSize: number of elements in cache, constant, must be greater than or equal to number of asynchronous accessors / cache misses
* callbackBackingStoreLoad: user-given cache(read)-miss function to load data from datastore
* takes 2 parameters: key, callback
* example:
* async function(key,callback){
* redis.get(key,function(data,err){
* callback(data);
* });
* }
* callbackBackingStoreSave: user-given cache(write)-miss function to save data to datastore
* takes 3 parameters: key, value, callback
* example:
* async function(key,value,callback){
* redis.set(key,value,function(err){
* callback();
* });
* }
* elementLifeTimeMs: maximum miliseconds before an element is invalidated, only invalidated at next get() or set() call with its key
* flush(): all in-flight get/set accesses are awaited and all edited keys are written back to backing-store. flushes the cache.
* reload(): evicts all cache to reload new values from backing store
* reloadKey(): only evicts selected item (to reload its new value on next access)
*
*/
let Lru = function(cacheSize,callbackBackingStoreLoad,elementLifeTimeMs=1000,callbackBackingStoreSave){
const me = this;
const aTypeGet = 0;
const aTypeSet = 1;
const maxWait = elementLifeTimeMs;
const size = parseInt(cacheSize,10);
const mapping = {}new Map();
const mappingInFlightMiss = {}new Map();
const bufData = new Array(size);
const bufVisited = new Uint8Array(size);
// todo: add write-cache operations. set(key,value,callback) setMultiple(keys, values, callback) setAwaitable(key,value) setMultipleAwaitable(keys,values)
const bufEdited = new Uint8Array(size);
const bufKey = new Array(size);
const bufTime = new Float64Array(size);
const bufLocked = new Uint8Array(size);
for(let i=0;i<size;i++)
{
let rnd = Math.random();
mapping[rnd] = i;
mapping.set(rnd,i);
bufData[i]="";
bufVisited[i]=0;
bufEdited[i]=0;
bufKey[i]=rnd;
bufTime[i]=0;
bufLocked[i]=0;
}
let ctr = 0;
let ctrEvict = parseInt(cacheSize/2,10);
const loadData = callbackBackingStoreLoad;
const saveData = callbackBackingStoreSave;
let inFlightMissCtr = 0;
// refresh all items time-span in cache
this.reload=function(){
for(let i=0;i<size;i++)
{
bufTime[i]=0;
}
};
// refresh item time-span in cache by triggering eviction
this.reloadKey=function(key){
if(key in mapping.has(key))
{
bufTime[mapping[key]]=0;
}
};
// get value by key
this.get = function(keyPrm,callbackPrm){
// aType=0: get
access(keyPrm,callbackPrm,aTypeGet);
};
// set value by key (callback returns same value)
this.set = function(keyPrm,valuePrm,callbackPrm){
// aType=1: set
access(keyPrm,callbackPrm,aTypeSet,valuePrm);
};
// aType=0: get
// aType=1: set
function access(keyPrm,callbackPrm,aType,valuePrm){
const key = keyPrm;
const callback = callbackPrm;
const value = valuePrm;
// stop dead-lock when many async get calls are made
if(inFlightMissCtr>=size)
{
setTimeout(function(){
// get/set
access(key,function(newData){
callback(newData);
},aType,value);
},0);
return;
}
// if key is busy, then delay the request towards end of the cache-miss completion
if(key in mappingInFlightMiss.has(key))
{
setTimeout(function(){
// get/set
access(key,function(newData){
callback(newData);
},aType,value);
},0);
return;
}
if(key in mapping.has(key))
{
// slot is an element in the circular buffer of CLOCK algorithm
let slot = mapping[key];mapping.get(key);
// RAM speed data
if((Date.now() - bufTime[slot]) > maxWait)
{
// if slot is locked by another operation, postpone the current operation
if(bufLocked[slot])
{
setTimeout(function(){
access(key,function(newData){
callback(newData);
},aType,value);
},0);
}
else // slot is not locked and its lifespan has ended
{
// if it was edited, update the backing-store first
if(bufEdited[slot] == 1)
{
bufLocked[slot] = 1;
bufEdited[slot]=0;
mappingInFlightMiss[key] = 1;mappingInFlightMiss.set(key,1); // lock key
inFlightMissCtr++;
// update backing-store, this is async
saveData(bufKey[slot],bufData[slot],function(){
mappingInFlightMiss.delete mappingInFlightMiss[key];(key); // unlock key
bufLocked[slot] = 0;
inFlightMissCtr--;
mapping.delete mapping[key];(key); // disable mapping for current key
// re-simulate the access, async
access(key,function(newData){
callback(newData);
},aType,value);
});
}
else
{
mapping.delete mapping[key];(key); // disable mapping for current key
access(key,function(newData){
callback(newData);
},aType,value);
}
}
}
else // slot life span has not ended
{
bufVisited[slot]=1;
bufTime[slot] = Date.now();
// if it is a "set" operation
if(aType == aTypeSet)
{
bufEdited[slot] = 1; // later used when data needs to be written to data-store (write-cache feature)
bufData[slot] = value;
}
callback(bufData[slot]);
}
}
else
{
// datastore loading + cache eviction
let ctrFound = -1;
let oldVal = 0;
let oldKey = 0;
while(ctrFound===-1)
{
// give slot a second chance before eviction
if(!bufLocked[ctr] && bufVisited[ctr])
{
bufVisited[ctr]=0;
}
ctr++;
if(ctr >= size)
{
ctr=0;
}
// eviction conditions
if(!bufLocked[ctrEvict] && !bufVisited[ctrEvict])
{
// eviction preparations, lock the slot
bufLocked[ctrEvict] = 1;
inFlightMissCtr++;
ctrFound = ctrEvict;
oldVal = bufData[ctrFound];
oldKey = bufKey[ctrFound];
}
ctrEvict++;
if(ctrEvict >= size)
{
ctrEvict=0;
}
}
// user-requested key is now asynchronously in-flight & locked for other operations
mappingInFlightMiss[key]=1;mappingInFlightMiss.set(key,1);
// eviction function. least recently used data is gone, newest recently used data is assigned
let evict = function(res){
mapping.delete mapping[bufKey[ctrFound]];(bufKey[ctrFound]);
bufData[ctrFound]=res;
bufVisited[ctrFound]=0;
bufKey[ctrFound]=key;
bufTime[ctrFound]=Date.now();
bufLocked[ctrFound]=0;
mapping[key] = ctrFound;mapping.set(key,ctrFound);
callback(res);
inFlightMissCtr--;
mappingInFlightMiss.delete mappingInFlightMiss[key];(key);
};
// if old data was edited, send it to data-store first, then fetch new data
if(bufEdited[ctrFound] == 1)
{
if(aType == aTypeGet)
bufEdited[ctrFound] = 0;
// old edited data is sent back to data-store
saveData(oldKey,oldVal,function(){
if(aType == aTypeGet)
loadData(key,evict);
else if(aType == aTypeSet)
evict(value);
});
}
else
{
if(aType == aTypeSet)
bufEdited[ctrFound] = 1;
if(aType == aTypeGet)
loadData(key,evict);
else if(aType == aTypeSet)
evict(value);
}
}
};
this.getAwaitable = function(key){
return new Promise(function(success,fail){
me.get(key,function(data){
success(data);
});
});
}
this.setAwaitable = function(key,value){
return new Promise(function(success,fail){
me.set(key,value,function(data){
success(data);
});
});
}
// as many keys as required can be given, separated by commas
this.getMultiple = function(callback, ... keys){
let result = [];
let ctr1 = keys.length;
for(let i=0;i<ctr1;i++)
result.push(0);
let ctr2 = 0;
keys.forEach(function(key){
let ctr3 = ctr2++;
me.get(key,function(data){
result[ctr3] = data;
ctr1--;
if(ctr1==0)
{
callback(result);
}
});
});
};
// as many key-value pairs ( in form of { key:foo, value:bar } ) can be given, separated by commas
this.setMultiple = function(callback, ... keyValuePairs){
let result = [];
let ctr1 = keyskeyValuePairs.length;
for(let i=0;i<ctr1;i++)
result.push(0);
let ctr2 = 0;
keyValuePairs.forEach(function(pair){
let ctr3 = ctr2++;
me.set(pair.key,pair.value,function(data){
result[ctr3] = data;
ctr1--;
if(ctr1==0)
{
callback(result);
}
});
});
};
// as many keys as required can be given, separated by commas
this.getMultipleAwaitable = function(... keys){
return new Promise(function(success,fail){
me.getMultiple(function(results){
success(results);
}, ... keys);
});
};
// as many key-value pairs ( in form of { key:foo, value:bar } ) can be given, separated by commas
this.setMultipleAwaitable = function(... keyValuePairs){
return new Promise(function(success,fail){
me.setMultiple(function(results){
success(results);
}, ... keyValuePairs);
});
};
// push all edited slots to backing-store and reset all slots lifetime to "out of date"
this.flush = function(callback){
let ctr1 = 0;
function waitForReadWrite(callbackW){
// if there are in-flight cache-misses cache-write-misses or active slot locks, then wait
if(Object.keys(mappingInFlightMiss).lengthsize > 0 || bufLocked.reduce((e1,e2)=>{return e1+e2;}) > 0)
{
setTimeout(()=>{ waitForReadWrite(callbackW); },010);
}
else
callbackW();
}
waitForReadWrite(async function(){ // flush all slots
for(let i=0;i<size;i++)
{
bufTime[i]=0;
if(bufEdited[i] == 1)
ctr1++;
}
for(let i=0;i<size;i++)
{
if(bufEdited[i] == 1)
{
// async
less concurrency me.set(bufKey[i],bufData[i]pressure,function(val){
ctr1--;
if(ctr1 == 0)
less {failure
await callbackme.setAwaitable(); // flush complete
}
}bufKey[i],bufData[i]);
}
}
callback(); // flush complete
});
};
};
exports.Lru = Lru;
Cache hit ratio test with a simulation of image smoothing using get and setCaching Redis For Get/Set Operations:
'use strict';
let read_and_write_on_same_cache = false;
let N = 10;
let backing_store = [];
let backing_store2 = [];
for(let y=0;y<N;y++)
{
backing_store.push([]);
backing_store2.push([]);
for(let"use x=0;x<N;x++)strict"
{
let r = Math.sqrt( (x-N/2) * (x-N/2) + (y-N/2) * (ybacking-N/2) ); // pixel value depending on distance to center of circlestore
const Redis = backing_store[y].pushrequire(r"ioredis");
const redis = new backing_store2[y].pushRedis(0);
}
}
let cache_hit_and_miss = 0;
let cache_misses = 0;
let cache_write_misses// =LRU 0;
cache
let Lru = require("./lrucache.js").Lru;
let num_cache_elements = 25;1500;
let element_life_time_miliseconds = 10000;1000;
let cache = new Lru(num_cache_elements, async function(key,callback){
cache_misses++;
let obj = JSONredis.parseget(key);
let x = obj.x;
let y = obj.y;
// simulating slow access to a backing, storefunction (will be hidden by async accesserr, anywayresult) setTimeout(function(){
// send the result to a RAM slot in the cache
callback(backing_store[y][x]result);
},10);
}, element_life_time_miliseconds, async function(key,value,callback){
cache_write_misses++;
redis.set(key,value, function (err, result) {
let obj = JSON.parse callback(key);
let});
});
const xN_repeat = obj.x;20;
letconst yN_bench = obj.y;
20000;
// simulatingconst slowN_concurrency access= to100;
const aN_dataset backing= store1000;
function randomKey(will be hidden by async access anyway){ return setTimeoutMath.floor(functionMath.random(){*N_dataset); }
// sendwithout theLRU resultcaching
async tofunction abenchWithout(callback){
RAM slot in thefor(let cachei=0;i<N_bench;i+=N_concurrency){
backing_store[y][x]=value;
let ctr = 0;
callback();
},10);
});
let cache2w8 = new LruPromise(num_cache_elements, async function(keysuccess,callbackfail)=>{
cache_misses++;
let obj = JSON.parse(key);
for(let x = obj.x;j=0;j<N_concurrency;j++)
let y = obj.y;
// simulating slow{
access to a backing store (will be hidden by async access anyway)
setTimeoutredis.set(functionrandomKey(){
,i, function (err, result) {
// send the result to a RAM slot in the cache
callbackredis.get(backing_store2[y][x]);
},100randomKey();
}, element_life_time_miliseconds, async function(key,valueerr,callback result){
cache_write_misses++;
let obj = JSON.parse(key);
let x = obj.x;
ctr++;
let y = obj.y;
// simulating slow access to a backing store (will be hidden by async access anyway)
if(ctr == setTimeout(function(N_concurrency){
// send the result to a RAM slot in the cache
backing_store2[y][x]=value;{
callback();
},100);
});
console.log("cache size = "+num_cache_elements+" pixels");
console.log("image size = "+(N*N)+" pixels");
// image softening algorithm
async function benchmark()
{
let timing = Date.nowsuccess(1);
for(let y=1;y<N-1;y++)
{
for(let x=1;x<N-1;x++)
} {
});
cache_hit_and_miss += 6; // 5 pixels requested for read, 1 for write });
}
});
let softened_pixelresult = (await cache.getMultipleAwaitable(w8;
JSON.stringify({x:x,y:y}), // center pixel}
callback();
}
// with LRU caching
async function benchWith(callback){
for(let i=0;i<N_bench;i+=N_concurrency){
let ctr = 0;
let w8 = new JSON.stringifyPromise({x:x(success,y:y-1}fail), // neighbor up=>{
for(let j=0;j<N_concurrency;j++)
{
JSON cache.stringifyset({x:x,y:y+1}randomKey(), //i, neighborfunction down(result) {
cache.get(randomKey(), function (result) {
JSON.stringify({x:x-1,y:y}), // neighbor left
ctr++;
if(ctr == N_concurrency)
JSON.stringify({x:x+1,y:y}) // neighbor right
)).reduce((e1,e2)=>{return e1+e2;})/5.0;
let target = 0;
if success(read_and_write_on_same_cache1);
target = cache;
} else
target = cache2;
target.set(JSON.stringify({x:x,y:y}),softened_pixel,async function(val){;
});
}
if(y==N-2 && x == N-2});
let result = await w8;
{}
callback();
}
let ctr = 0;
function restartWithoutLRU(callback){
let t = Date.now();
benchWithout(function(){
// write all edited data to backing-store andconsole.log("without readLRU: all"+(Date.now() non-read data fromt)+" backing-storemilliseconds");
ctr++;
if(ctr != target.flush(function(N_repeat){
{
console.logrestartWithoutLRU("softened image pixels: "callback);
}
else
console.log(read_and_write_on_same_cache?backing_store:backing_store2);{
ctr=0;
console.log("run-time: "+(Date.nowcallback() -;
timing)+" ms }
"+((N-2)*(N-2)*5})+" reads;
}
function "+((N-2)*restartWithLRU(N-2))+" writes");{
let t = Date.now();
benchWith(function(){
console.log("cache read"with hitLRU: ratio="+"+(100*Date.now(cache_hit_and_miss) - cache_misses)/cache_hit_and_misst)+" milliseconds");
ctr++;
console.log("cache write hit ratio="+(100*if(cache_hit_and_missctr -!= cache_write_misses)/cache_hit_and_miss)N_repeat);
{
}restartWithLRU();
}
});
}
}
}
benchmarkrestartWithoutLRU(restartWithLRU);
On my low end computer (2GHz bulldozer cpu, 1 channel ddr3 1600MHz), API overhead causes a performance limitation that is around 1500 cache (read/write) hits per millisecond and about 300850 cache (read/write) misses per millisecond. I couldn't find any better way to keep asynchronity and run faster. Would it be logical to shard the cache on multiple cores and have a multi-core cache or would it be better with single thread server serving to all cores?
'use strict';
/*
cacheSize: number of elements in cache, constant, must be greater than or equal to number of asynchronous accessors / cache misses
callbackBackingStoreLoad: user-given cache-miss function to load data from datastore
elementLifeTimeMs: maximum miliseconds before an element is invalidated, only invalidated at next get() call with its key
reload: evicts all cache to reload new values from backing store
reloadKey: only evicts selected item (to reload its new value on next access)
*/
let Lru = function(cacheSize,callbackBackingStoreLoad,elementLifeTimeMs=1000,callbackBackingStoreSave){
const me = this;
const aTypeGet = 0;
const aTypeSet = 1;
const maxWait = elementLifeTimeMs;
const size = parseInt(cacheSize,10);
const mapping = {};
const mappingInFlightMiss = {};
const bufData = new Array(size);
const bufVisited = new Uint8Array(size);
// todo: add write-cache operations. set(key,value,callback) setMultiple(keys, values, callback) setAwaitable(key,value) setMultipleAwaitable(keys,values)
const bufEdited = new Uint8Array(size);
const bufKey = new Array(size);
const bufTime = new Float64Array(size);
const bufLocked = new Uint8Array(size);
for(let i=0;i<size;i++)
{
let rnd = Math.random();
mapping[rnd] = i;
bufData[i]="";
bufVisited[i]=0;
bufEdited[i]=0;
bufKey[i]=rnd;
bufTime[i]=0;
bufLocked[i]=0;
}
let ctr = 0;
let ctrEvict = parseInt(cacheSize/2,10);
const loadData = callbackBackingStoreLoad;
const saveData = callbackBackingStoreSave;
let inFlightMissCtr = 0;
// refresh all items time-span in cache
this.reload=function(){
for(let i=0;i<size;i++)
{
bufTime[i]=0;
}
};
// refresh item time-span in cache by triggering eviction
this.reloadKey=function(key){
if(key in mapping)
{
bufTime[mapping[key]]=0;
}
};
// get value by key
this.get = function(keyPrm,callbackPrm){
// aType=0: get
access(keyPrm,callbackPrm,aTypeGet);
};
// set value by key (callback returns same value)
this.set = function(keyPrm,valuePrm,callbackPrm){
// aType=1: set
access(keyPrm,callbackPrm,aTypeSet,valuePrm);
};
// aType=0: get
// aType=1: set
function access(keyPrm,callbackPrm,aType,valuePrm){
const key = keyPrm;
const callback = callbackPrm;
const value = valuePrm;
// stop dead-lock when many async get calls are made
if(inFlightMissCtr>=size)
{
setTimeout(function(){
// get/set
access(key,function(newData){
callback(newData);
},aType,value);
},0);
return;
}
// if key is busy, then delay the request towards end of the cache-miss completion
if(key in mappingInFlightMiss)
{
setTimeout(function(){
// get/set
access(key,function(newData){
callback(newData);
},aType,value);
},0);
return;
}
if(key in mapping)
{
// slot is an element in the circular buffer of CLOCK algorithm
let slot = mapping[key];
// RAM speed data
if((Date.now() - bufTime[slot]) > maxWait)
{
// if slot is locked by another operation, postpone the current operation
if(bufLocked[slot])
{
setTimeout(function(){
access(key,function(newData){
callback(newData);
},aType,value);
},0);
}
else // slot is not locked and its lifespan has ended
{
// if it was edited, update the backing-store first
if(bufEdited[slot] == 1)
{
bufLocked[slot] = 1;
bufEdited[slot]=0;
mappingInFlightMiss[key] = 1; // lock key
inFlightMissCtr++;
// update backing-store, this is async
saveData(bufKey[slot],bufData[slot],function(){
delete mappingInFlightMiss[key]; // unlock key
bufLocked[slot] = 0;
inFlightMissCtr--;
delete mapping[key]; // disable mapping for current key
// re-simulate the access, async
access(key,function(newData){
callback(newData);
},aType,value);
});
}
else
{
delete mapping[key]; // disable mapping for current key
access(key,function(newData){
callback(newData);
},aType,value);
}
}
}
else // slot life span has not ended
{
bufVisited[slot]=1;
bufTime[slot] = Date.now();
// if it is a "set" operation
if(aType == aTypeSet)
{
bufEdited[slot] = 1; // later used when data needs to be written to data-store (write-cache feature)
bufData[slot] = value;
}
callback(bufData[slot]);
}
}
else
{
// datastore loading + cache eviction
let ctrFound = -1;
let oldVal = 0;
let oldKey = 0;
while(ctrFound===-1)
{
// give slot a second chance before eviction
if(!bufLocked[ctr] && bufVisited[ctr])
{
bufVisited[ctr]=0;
}
ctr++;
if(ctr >= size)
{
ctr=0;
}
// eviction conditions
if(!bufLocked[ctrEvict] && !bufVisited[ctrEvict])
{
// eviction preparations, lock the slot
bufLocked[ctrEvict] = 1;
inFlightMissCtr++;
ctrFound = ctrEvict;
oldVal = bufData[ctrFound];
oldKey = bufKey[ctrFound];
}
ctrEvict++;
if(ctrEvict >= size)
{
ctrEvict=0;
}
}
// user-requested key is now asynchronously in-flight & locked for other operations
mappingInFlightMiss[key]=1;
// eviction function. least recently used data is gone, newest recently used data is assigned
let evict = function(res){
delete mapping[bufKey[ctrFound]];
bufData[ctrFound]=res;
bufVisited[ctrFound]=0;
bufKey[ctrFound]=key;
bufTime[ctrFound]=Date.now();
bufLocked[ctrFound]=0;
mapping[key] = ctrFound;
callback(res);
inFlightMissCtr--;
delete mappingInFlightMiss[key];
};
// if old data was edited, send it to data-store first, then fetch new data
if(bufEdited[ctrFound] == 1)
{
if(aType == aTypeGet)
bufEdited[ctrFound] = 0;
// old edited data is sent back to data-store
saveData(oldKey,oldVal,function(){
if(aType == aTypeGet)
loadData(key,evict);
else if(aType == aTypeSet)
evict(value);
});
}
else
{
if(aType == aTypeSet)
bufEdited[ctrFound] = 1;
if(aType == aTypeGet)
loadData(key,evict);
else if(aType == aTypeSet)
evict(value);
}
}
};
this.getAwaitable = function(key){
return new Promise(function(success,fail){
me.get(key,function(data){
success(data);
});
});
}
this.setAwaitable = function(key,value){
return new Promise(function(success,fail){
me.set(key,value,function(data){
success(data);
});
});
}
this.getMultiple = function(callback, ... keys){
let result = [];
let ctr1 = keys.length;
for(let i=0;i<ctr1;i++)
result.push(0);
let ctr2 = 0;
keys.forEach(function(key){
let ctr3 = ctr2++;
me.get(key,function(data){
result[ctr3] = data;
ctr1--;
if(ctr1==0)
{
callback(result);
}
});
});
};
this.setMultiple = function(callback, ... keyValuePairs){
let result = [];
let ctr1 = keys.length;
for(let i=0;i<ctr1;i++)
result.push(0);
let ctr2 = 0;
keyValuePairs.forEach(function(pair){
let ctr3 = ctr2++;
me.set(pair.key,pair.value,function(data){
result[ctr3] = data;
ctr1--;
if(ctr1==0)
{
callback(result);
}
});
});
};
this.getMultipleAwaitable = function(... keys){
return new Promise(function(success,fail){
me.getMultiple(function(results){
success(results);
}, ... keys);
});
};
this.setMultipleAwaitable = function(... keyValuePairs){
return new Promise(function(success,fail){
me.setMultiple(function(results){
success(results);
}, ... keyValuePairs);
});
};
// push all edited slots to backing-store and reset all slots lifetime to "out of date"
this.flush = function(callback){
let ctr1 = 0;
function waitForReadWrite(callbackW){
// if there are in-flight cache-misses cache-write-misses or active slot locks, then wait
if(Object.keys(mappingInFlightMiss).length > 0 || bufLocked.reduce((e1,e2)=>{return e1+e2;}) > 0)
{
setTimeout(()=>{ waitForReadWrite(callbackW); },0);
}
else
callbackW();
}
waitForReadWrite(function(){ // flush all slots
for(let i=0;i<size;i++)
{
bufTime[i]=0;
if(bufEdited[i] == 1)
ctr1++;
}
for(let i=0;i<size;i++)
{
if(bufEdited[i] == 1)
{
// async
me.set(bufKey[i],bufData[i],function(val){
ctr1--;
if(ctr1 == 0)
{
callback(); // flush complete
}
});
}
}
});
};
};
exports.Lru = Lru;
Cache hit ratio test with a simulation of image smoothing using get and set:
'use strict';
let read_and_write_on_same_cache = false;
let N = 10;
let backing_store = [];
let backing_store2 = [];
for(let y=0;y<N;y++)
{
backing_store.push([]);
backing_store2.push([]);
for(let x=0;x<N;x++)
{
let r = Math.sqrt( (x-N/2) * (x-N/2) + (y-N/2) * (y-N/2) ); // pixel value depending on distance to center of circle
backing_store[y].push(r);
backing_store2[y].push(0);
}
}
let cache_hit_and_miss = 0;
let cache_misses = 0;
let cache_write_misses = 0;
let Lru = require("./lrucache.js").Lru;
let num_cache_elements = 25;
let element_life_time_miliseconds = 10000;
let cache = new Lru(num_cache_elements, async function(key,callback){
cache_misses++;
let obj = JSON.parse(key);
let x = obj.x;
let y = obj.y;
// simulating slow access to a backing store (will be hidden by async access anyway) setTimeout(function(){
// send the result to a RAM slot in the cache
callback(backing_store[y][x]);
},10);
}, element_life_time_miliseconds, async function(key,value,callback){
cache_write_misses++;
let obj = JSON.parse(key);
let x = obj.x;
let y = obj.y;
// simulating slow access to a backing store (will be hidden by async access anyway) setTimeout(function(){
// send the result to a RAM slot in the cache
backing_store[y][x]=value;
callback();
},10);
});
let cache2 = new Lru(num_cache_elements, async function(key,callback){
cache_misses++;
let obj = JSON.parse(key);
let x = obj.x;
let y = obj.y;
// simulating slow access to a backing store (will be hidden by async access anyway)
setTimeout(function(){
// send the result to a RAM slot in the cache
callback(backing_store2[y][x]);
},100);
}, element_life_time_miliseconds, async function(key,value,callback){
cache_write_misses++;
let obj = JSON.parse(key);
let x = obj.x;
let y = obj.y;
// simulating slow access to a backing store (will be hidden by async access anyway)
setTimeout(function(){
// send the result to a RAM slot in the cache
backing_store2[y][x]=value;
callback();
},100);
});
console.log("cache size = "+num_cache_elements+" pixels");
console.log("image size = "+(N*N)+" pixels");
// image softening algorithm
async function benchmark()
{
let timing = Date.now();
for(let y=1;y<N-1;y++)
{
for(let x=1;x<N-1;x++)
{
cache_hit_and_miss += 6; // 5 pixels requested for read, 1 for write
let softened_pixel = (await cache.getMultipleAwaitable( JSON.stringify({x:x,y:y}), // center pixel
JSON.stringify({x:x,y:y-1}), // neighbor up
JSON.stringify({x:x,y:y+1}), // neighbor down
JSON.stringify({x:x-1,y:y}), // neighbor left
JSON.stringify({x:x+1,y:y}) // neighbor right
)).reduce((e1,e2)=>{return e1+e2;})/5.0;
let target = 0;
if(read_and_write_on_same_cache)
target = cache;
else
target = cache2;
target.set(JSON.stringify({x:x,y:y}),softened_pixel,async function(val){
if(y==N-2 && x == N-2)
{
// write all edited data to backing-store and read all non-read data from backing-store
target.flush(function(){
console.log("softened image pixels: ");
console.log(read_and_write_on_same_cache?backing_store:backing_store2);
console.log("run-time: "+(Date.now() - timing)+" ms "+((N-2)*(N-2)*5)+" reads "+((N-2)*(N-2))+" writes");
console.log("cache read hit ratio="+(100*(cache_hit_and_miss - cache_misses)/cache_hit_and_miss));
console.log("cache write hit ratio="+(100*(cache_hit_and_miss - cache_write_misses)/cache_hit_and_miss));
});
}
});
}
}
}
benchmark();
On my low end computer, API overhead is around 1500 cache hits per millisecond and about 300 cache misses per millisecond. I couldn't find any better way to keep asynchronity and run faster. Would it be logical to shard the cache on multiple cores and have a multi-core cache or would it be better with single thread server serving to all cores?
'use strict';
/*
* cacheSize: number of elements in cache, constant, must be greater than or equal to number of asynchronous accessors / cache misses
* callbackBackingStoreLoad: user-given cache(read)-miss function to load data from datastore
* takes 2 parameters: key, callback
* example:
* async function(key,callback){
* redis.get(key,function(data,err){
* callback(data);
* });
* }
* callbackBackingStoreSave: user-given cache(write)-miss function to save data to datastore
* takes 3 parameters: key, value, callback
* example:
* async function(key,value,callback){
* redis.set(key,value,function(err){
* callback();
* });
* }
* elementLifeTimeMs: maximum miliseconds before an element is invalidated, only invalidated at next get() or set() call with its key
* flush(): all in-flight get/set accesses are awaited and all edited keys are written back to backing-store. flushes the cache.
* reload(): evicts all cache to reload new values from backing store
* reloadKey(): only evicts selected item (to reload its new value on next access)
*
*/
let Lru = function(cacheSize,callbackBackingStoreLoad,elementLifeTimeMs=1000,callbackBackingStoreSave){
const me = this;
const aTypeGet = 0;
const aTypeSet = 1;
const maxWait = elementLifeTimeMs;
const size = parseInt(cacheSize,10);
const mapping = new Map();
const mappingInFlightMiss = new Map();
const bufData = new Array(size);
const bufVisited = new Uint8Array(size);
const bufEdited = new Uint8Array(size);
const bufKey = new Array(size);
const bufTime = new Float64Array(size);
const bufLocked = new Uint8Array(size);
for(let i=0;i<size;i++)
{
let rnd = Math.random();
mapping.set(rnd,i);
bufData[i]="";
bufVisited[i]=0;
bufEdited[i]=0;
bufKey[i]=rnd;
bufTime[i]=0;
bufLocked[i]=0;
}
let ctr = 0;
let ctrEvict = parseInt(cacheSize/2,10);
const loadData = callbackBackingStoreLoad;
const saveData = callbackBackingStoreSave;
let inFlightMissCtr = 0;
// refresh all items time-span in cache
this.reload=function(){
for(let i=0;i<size;i++)
{
bufTime[i]=0;
}
};
// refresh item time-span in cache by triggering eviction
this.reloadKey=function(key){
if(mapping.has(key))
{
bufTime[mapping[key]]=0;
}
};
// get value by key
this.get = function(keyPrm,callbackPrm){
// aType=0: get
access(keyPrm,callbackPrm,aTypeGet);
};
// set value by key (callback returns same value)
this.set = function(keyPrm,valuePrm,callbackPrm){
// aType=1: set
access(keyPrm,callbackPrm,aTypeSet,valuePrm);
};
// aType=0: get
// aType=1: set
function access(keyPrm,callbackPrm,aType,valuePrm){
const key = keyPrm;
const callback = callbackPrm;
const value = valuePrm;
// stop dead-lock when many async get calls are made
if(inFlightMissCtr>=size)
{
setTimeout(function(){
// get/set
access(key,function(newData){
callback(newData);
},aType,value);
},0);
return;
}
// if key is busy, then delay the request towards end of the cache-miss completion
if(mappingInFlightMiss.has(key))
{
setTimeout(function(){
// get/set
access(key,function(newData){
callback(newData);
},aType,value);
},0);
return;
}
if(mapping.has(key))
{
// slot is an element in the circular buffer of CLOCK algorithm
let slot = mapping.get(key);
// RAM speed data
if((Date.now() - bufTime[slot]) > maxWait)
{
// if slot is locked by another operation, postpone the current operation
if(bufLocked[slot])
{
setTimeout(function(){
access(key,function(newData){
callback(newData);
},aType,value);
},0);
}
else // slot is not locked and its lifespan has ended
{
// if it was edited, update the backing-store first
if(bufEdited[slot] == 1)
{
bufLocked[slot] = 1;
bufEdited[slot]=0;
mappingInFlightMiss.set(key,1); // lock key
inFlightMissCtr++;
// update backing-store, this is async
saveData(bufKey[slot],bufData[slot],function(){
mappingInFlightMiss.delete(key); // unlock key
bufLocked[slot] = 0;
inFlightMissCtr--;
mapping.delete(key); // disable mapping for current key
// re-simulate the access, async
access(key,function(newData){
callback(newData);
},aType,value);
});
}
else
{
mapping.delete(key); // disable mapping for current key
access(key,function(newData){
callback(newData);
},aType,value);
}
}
}
else // slot life span has not ended
{
bufVisited[slot]=1;
bufTime[slot] = Date.now();
// if it is a "set" operation
if(aType == aTypeSet)
{
bufEdited[slot] = 1; // later used when data needs to be written to data-store (write-cache feature)
bufData[slot] = value;
}
callback(bufData[slot]);
}
}
else
{
// datastore loading + cache eviction
let ctrFound = -1;
let oldVal = 0;
let oldKey = 0;
while(ctrFound===-1)
{
// give slot a second chance before eviction
if(!bufLocked[ctr] && bufVisited[ctr])
{
bufVisited[ctr]=0;
}
ctr++;
if(ctr >= size)
{
ctr=0;
}
// eviction conditions
if(!bufLocked[ctrEvict] && !bufVisited[ctrEvict])
{
// eviction preparations, lock the slot
bufLocked[ctrEvict] = 1;
inFlightMissCtr++;
ctrFound = ctrEvict;
oldVal = bufData[ctrFound];
oldKey = bufKey[ctrFound];
}
ctrEvict++;
if(ctrEvict >= size)
{
ctrEvict=0;
}
}
// user-requested key is now asynchronously in-flight & locked for other operations
mappingInFlightMiss.set(key,1);
// eviction function. least recently used data is gone, newest recently used data is assigned
let evict = function(res){
mapping.delete(bufKey[ctrFound]);
bufData[ctrFound]=res;
bufVisited[ctrFound]=0;
bufKey[ctrFound]=key;
bufTime[ctrFound]=Date.now();
bufLocked[ctrFound]=0;
mapping.set(key,ctrFound);
callback(res);
inFlightMissCtr--;
mappingInFlightMiss.delete(key);
};
// if old data was edited, send it to data-store first, then fetch new data
if(bufEdited[ctrFound] == 1)
{
if(aType == aTypeGet)
bufEdited[ctrFound] = 0;
// old edited data is sent back to data-store
saveData(oldKey,oldVal,function(){
if(aType == aTypeGet)
loadData(key,evict);
else if(aType == aTypeSet)
evict(value);
});
}
else
{
if(aType == aTypeSet)
bufEdited[ctrFound] = 1;
if(aType == aTypeGet)
loadData(key,evict);
else if(aType == aTypeSet)
evict(value);
}
}
};
this.getAwaitable = function(key){
return new Promise(function(success,fail){
me.get(key,function(data){
success(data);
});
});
}
this.setAwaitable = function(key,value){
return new Promise(function(success,fail){
me.set(key,value,function(data){
success(data);
});
});
}
// as many keys as required can be given, separated by commas
this.getMultiple = function(callback, ... keys){
let result = [];
let ctr1 = keys.length;
for(let i=0;i<ctr1;i++)
result.push(0);
let ctr2 = 0;
keys.forEach(function(key){
let ctr3 = ctr2++;
me.get(key,function(data){
result[ctr3] = data;
ctr1--;
if(ctr1==0)
{
callback(result);
}
});
});
};
// as many key-value pairs ( in form of { key:foo, value:bar } ) can be given, separated by commas
this.setMultiple = function(callback, ... keyValuePairs){
let result = [];
let ctr1 = keyValuePairs.length;
for(let i=0;i<ctr1;i++)
result.push(0);
let ctr2 = 0;
keyValuePairs.forEach(function(pair){
let ctr3 = ctr2++;
me.set(pair.key,pair.value,function(data){
result[ctr3] = data;
ctr1--;
if(ctr1==0)
{
callback(result);
}
});
});
};
// as many keys as required can be given, separated by commas
this.getMultipleAwaitable = function(... keys){
return new Promise(function(success,fail){
me.getMultiple(function(results){
success(results);
}, ... keys);
});
};
// as many key-value pairs ( in form of { key:foo, value:bar } ) can be given, separated by commas
this.setMultipleAwaitable = function(... keyValuePairs){
return new Promise(function(success,fail){
me.setMultiple(function(results){
success(results);
}, ... keyValuePairs);
});
};
// push all edited slots to backing-store and reset all slots lifetime to "out of date"
this.flush = function(callback){
function waitForReadWrite(callbackW){
// if there are in-flight cache-misses cache-write-misses or active slot locks, then wait
if(mappingInFlightMiss.size > 0 || bufLocked.reduce((e1,e2)=>{return e1+e2;}) > 0)
{
setTimeout(()=>{ waitForReadWrite(callbackW); },10);
}
else
callbackW();
}
waitForReadWrite(async function(){
for(let i=0;i<size;i++)
{
bufTime[i]=0;
if(bufEdited[i] == 1)
{
// less concurrency pressure, less failure
await me.setAwaitable(bufKey[i],bufData[i]);
}
}
callback(); // flush complete
});
};
};
exports.Lru = Lru;
Caching Redis For Get/Set Operations:
"use strict"
// backing-store
const Redis = require("ioredis");
const redis = new Redis();
// LRU cache
let Lru = require("./lrucache.js").Lru;
let num_cache_elements = 1500;
let element_life_time_miliseconds = 1000;
let cache = new Lru(num_cache_elements, async function(key,callback){
redis.get(key, function (err, result) {
callback(result);
});
}, element_life_time_miliseconds, async function(key,value,callback){
redis.set(key,value, function (err, result) {
callback();
});
});
const N_repeat = 20;
const N_bench = 20000;
const N_concurrency = 100;
const N_dataset = 1000;
function randomKey(){ return Math.floor(Math.random()*N_dataset); }
// without LRU caching
async function benchWithout(callback){
for(let i=0;i<N_bench;i+=N_concurrency){
let ctr = 0;
let w8 = new Promise((success,fail)=>{
for(let j=0;j<N_concurrency;j++)
{
redis.set(randomKey(),i, function (err, result) {
redis.get(randomKey(), function(err, result){
ctr++;
if(ctr == N_concurrency)
{
success(1);
}
});
});
}
});
let result = await w8;
}
callback();
}
// with LRU caching
async function benchWith(callback){
for(let i=0;i<N_bench;i+=N_concurrency){
let ctr = 0;
let w8 = new Promise((success,fail)=>{
for(let j=0;j<N_concurrency;j++)
{
cache.set(randomKey(),i, function (result) {
cache.get(randomKey(), function (result) {
ctr++;
if(ctr == N_concurrency)
{
success(1);
}
});
});
}
});
let result = await w8;
}
callback();
}
let ctr = 0;
function restartWithoutLRU(callback){
let t = Date.now();
benchWithout(function(){
console.log("without LRU: "+(Date.now() - t)+" milliseconds");
ctr++;
if(ctr != N_repeat)
{
restartWithoutLRU(callback);
}
else
{
ctr=0;
callback();
}
});
}
function restartWithLRU(){
let t = Date.now();
benchWith(function(){
console.log("with LRU: "+(Date.now() - t)+" milliseconds");
ctr++;
if(ctr != N_repeat)
{
restartWithLRU();
}
});
}
restartWithoutLRU(restartWithLRU);
On my low end computer (2GHz bulldozer cpu, 1 channel ddr3 1600MHz), API overhead causes a performance limitation that is around 1500 cache (read/write) hits per millisecond and about 850 cache (read/write) misses per millisecond. I couldn't find any better way to keep asynchronity and run faster. Would it be logical to shard the cache on multiple cores and have a multi-core cache or would it be better with single thread server serving to all cores?
Edit: using new Map instead of plain object for the mapping doubled the cache miss performance, lowered its latency for both reads and writes.
Edit: using new Map instead of plain object for the mapping doubled the cache miss performance, lowered its latency for both reads and writes.