Skip to main content
Code Review

Return to Question

Notice removed Canonical answer required by huseyin tugrul buyukisik
Bounty Ended with Sᴀᴍ Onᴇᴌᴀ's answer chosen by huseyin tugrul buyukisik
edited tags
Link
Notice added Canonical answer required by huseyin tugrul buyukisik
Bounty Started worth 150 reputation by huseyin tugrul buyukisik
deleted 1012 characters in body
Source Link
'use strict';
/* 
* cacheSize: number of elements in cache, constant, must be greater than or equal to number of asynchronous accessors / cache misses
* callbackBackingStoreLoad: user-given cache(read)-miss function to load data from datastore
* takes 2 parameters: key, callback
* example:
* async function(key,callback){ 
* redis.get(key,function(data,err){ 
* callback(data); 
* }); 
* }
* callbackBackingStoreSave: user-given cache(write)-miss function to save data to datastore
* takes 3 parameters: key, value, callback
* example:
* async function(key,value,callback){ 
* redis.set(key,value,function(err){ 
* callback(); 
* }); 
* }
* elementLifeTimeMs: maximum miliseconds before an element is invalidated, only invalidated at next get() or set() call with its key
* flush(): all in-flight get/set accesses are awaited and all edited keys are written back to backing-store. flushes the cache.
* reload(): evicts all cache to reload new values from backing store
* reloadKey(): only evicts selected item (to reload its new value on next access)
*
*/
let Lru = function(cacheSize,callbackBackingStoreLoad,elementLifeTimeMs=1000,callbackBackingStoreSave){
 const me = this;
 const aTypeGet = 0;
 const aTypeSet = 1;
 const maxWait = elementLifeTimeMs;
 const size = parseInt(cacheSize,10);
 const mapping = {}new Map();
 const mappingInFlightMiss = {}new Map();
 const bufData = new Array(size);
 const bufVisited = new Uint8Array(size);

 // todo: add write-cache operations. set(key,value,callback) setMultiple(keys, values, callback) setAwaitable(key,value) setMultipleAwaitable(keys,values)
 const bufEdited = new Uint8Array(size);
 const bufKey = new Array(size);
 const bufTime = new Float64Array(size);
 const bufLocked = new Uint8Array(size);
 for(let i=0;i<size;i++)
 {
 let rnd = Math.random();
 mapping[rnd] = i;
 mapping.set(rnd,i); 
 bufData[i]="";
 bufVisited[i]=0;
 bufEdited[i]=0;
 bufKey[i]=rnd;
 bufTime[i]=0;
 bufLocked[i]=0;
 }
 let ctr = 0;
 let ctrEvict = parseInt(cacheSize/2,10);
 const loadData = callbackBackingStoreLoad;
 const saveData = callbackBackingStoreSave;
 let inFlightMissCtr = 0;
 // refresh all items time-span in cache
 this.reload=function(){
 for(let i=0;i<size;i++)
 {
 bufTime[i]=0;
 }
 };
 // refresh item time-span in cache by triggering eviction
 this.reloadKey=function(key){
 if(key in mapping.has(key))
 {
 bufTime[mapping[key]]=0;
 }
 };
 // get value by key
 this.get = function(keyPrm,callbackPrm){
 // aType=0: get
 access(keyPrm,callbackPrm,aTypeGet);
 };
 // set value by key (callback returns same value)
 this.set = function(keyPrm,valuePrm,callbackPrm){
 // aType=1: set
 access(keyPrm,callbackPrm,aTypeSet,valuePrm);
 };
 
 // aType=0: get
 // aType=1: set
 function access(keyPrm,callbackPrm,aType,valuePrm){
 
 const key = keyPrm;
 const callback = callbackPrm;
 const value = valuePrm;
 // stop dead-lock when many async get calls are made
 if(inFlightMissCtr>=size)
 {
 setTimeout(function(){
 // get/set
 access(key,function(newData){
 callback(newData);
 },aType,value);
 },0);
 return;
 }
 
 // if key is busy, then delay the request towards end of the cache-miss completion
 if(key in mappingInFlightMiss.has(key))
 {
 
 setTimeout(function(){
 // get/set
 access(key,function(newData){
 callback(newData);
 },aType,value);
 },0);
 return;
 }
 if(key in mapping.has(key))
 {
 // slot is an element in the circular buffer of CLOCK algorithm
 let slot = mapping[key];mapping.get(key);
 // RAM speed data
 if((Date.now() - bufTime[slot]) > maxWait)
 {
 
 // if slot is locked by another operation, postpone the current operation
 if(bufLocked[slot])
 { 
 setTimeout(function(){
 access(key,function(newData){
 callback(newData);
 },aType,value);
 },0);
 
 }
 else // slot is not locked and its lifespan has ended
 {
 // if it was edited, update the backing-store first
 if(bufEdited[slot] == 1)
 {
 bufLocked[slot] = 1;
 bufEdited[slot]=0;
 mappingInFlightMiss[key] = 1;mappingInFlightMiss.set(key,1); // lock key
 inFlightMissCtr++;
 // update backing-store, this is async
 saveData(bufKey[slot],bufData[slot],function(){ 
 mappingInFlightMiss.delete mappingInFlightMiss[key];(key); // unlock key
 bufLocked[slot] = 0;
 inFlightMissCtr--;
 mapping.delete mapping[key];(key); // disable mapping for current key
 
 // re-simulate the access, async
 access(key,function(newData){
 callback(newData);
 },aType,value);
 });
 }
 else
 {
 mapping.delete mapping[key];(key); // disable mapping for current key
 access(key,function(newData){
 
 callback(newData);
 },aType,value);
 }
 }
 
 }
 else // slot life span has not ended
 {
 bufVisited[slot]=1;
 bufTime[slot] = Date.now();
 // if it is a "set" operation
 if(aType == aTypeSet)
 { 
 bufEdited[slot] = 1; // later used when data needs to be written to data-store (write-cache feature)
 bufData[slot] = value;
 }
 callback(bufData[slot]);
 }
 }
 else
 {
 // datastore loading + cache eviction
 let ctrFound = -1;
 let oldVal = 0;
 let oldKey = 0;
 while(ctrFound===-1)
 {
 // give slot a second chance before eviction
 if(!bufLocked[ctr] && bufVisited[ctr])
 {
 bufVisited[ctr]=0;
 }
 ctr++;
 if(ctr >= size)
 {
 ctr=0;
 }
 // eviction conditions
 if(!bufLocked[ctrEvict] && !bufVisited[ctrEvict])
 {
 // eviction preparations, lock the slot
 bufLocked[ctrEvict] = 1;
 inFlightMissCtr++;
 ctrFound = ctrEvict;
 oldVal = bufData[ctrFound];
 oldKey = bufKey[ctrFound];
 }
 ctrEvict++;
 if(ctrEvict >= size)
 {
 ctrEvict=0;
 }
 }
 
 // user-requested key is now asynchronously in-flight & locked for other operations
 mappingInFlightMiss[key]=1;mappingInFlightMiss.set(key,1);
 
 // eviction function. least recently used data is gone, newest recently used data is assigned
 let evict = function(res){
 mapping.delete mapping[bufKey[ctrFound]];(bufKey[ctrFound]);
 bufData[ctrFound]=res;
 bufVisited[ctrFound]=0;
 bufKey[ctrFound]=key;
 bufTime[ctrFound]=Date.now();
 bufLocked[ctrFound]=0;
 mapping[key] = ctrFound;mapping.set(key,ctrFound);
 callback(res);
 inFlightMissCtr--;
 mappingInFlightMiss.delete mappingInFlightMiss[key];(key);
 
 };
 // if old data was edited, send it to data-store first, then fetch new data
 if(bufEdited[ctrFound] == 1)
 {
 if(aType == aTypeGet)
 bufEdited[ctrFound] = 0;
 // old edited data is sent back to data-store
 saveData(oldKey,oldVal,function(){ 
 if(aType == aTypeGet)
 loadData(key,evict);
 else if(aType == aTypeSet)
 evict(value);
 });
 }
 else
 {
 if(aType == aTypeSet)
 bufEdited[ctrFound] = 1;
 if(aType == aTypeGet)
 loadData(key,evict);
 else if(aType == aTypeSet)
 evict(value); 
 }
 }
 };
 this.getAwaitable = function(key){
 return new Promise(function(success,fail){ 
 me.get(key,function(data){
 success(data);
 });
 });
 }
 this.setAwaitable = function(key,value){
 return new Promise(function(success,fail){ 
 me.set(key,value,function(data){
 success(data);
 });
 });
 }
 // as many keys as required can be given, separated by commas
 this.getMultiple = function(callback, ... keys){
 let result = [];
 let ctr1 = keys.length;
 for(let i=0;i<ctr1;i++)
 result.push(0);
 let ctr2 = 0;
 keys.forEach(function(key){
 let ctr3 = ctr2++;
 me.get(key,function(data){
 result[ctr3] = data;
 ctr1--;
 if(ctr1==0)
 {
 callback(result);
 }
 });
 });
 };
 // as many key-value pairs ( in form of { key:foo, value:bar } ) can be given, separated by commas
 this.setMultiple = function(callback, ... keyValuePairs){
 let result = [];
 let ctr1 = keyskeyValuePairs.length;
 for(let i=0;i<ctr1;i++)
 result.push(0);
 let ctr2 = 0;
 keyValuePairs.forEach(function(pair){
 let ctr3 = ctr2++;
 me.set(pair.key,pair.value,function(data){
 result[ctr3] = data;
 ctr1--;
 if(ctr1==0)
 {
 callback(result);
 }
 });
 });
 };
 // as many keys as required can be given, separated by commas
 this.getMultipleAwaitable = function(... keys){
 return new Promise(function(success,fail){
 me.getMultiple(function(results){
 success(results);
 }, ... keys);
 });
 };
 // as many key-value pairs ( in form of { key:foo, value:bar } ) can be given, separated by commas
 this.setMultipleAwaitable = function(... keyValuePairs){
 return new Promise(function(success,fail){
 me.setMultiple(function(results){
 success(results);
 }, ... keyValuePairs);
 });
 };
 // push all edited slots to backing-store and reset all slots lifetime to "out of date"
 this.flush = function(callback){
 let ctr1 = 0;
 function waitForReadWrite(callbackW){
 // if there are in-flight cache-misses cache-write-misses or active slot locks, then wait
 if(Object.keys(mappingInFlightMiss).lengthsize > 0 || bufLocked.reduce((e1,e2)=>{return e1+e2;}) > 0)
 {
 setTimeout(()=>{ waitForReadWrite(callbackW); },010);
 }
 else
 callbackW();
 }
 waitForReadWrite(async function(){ // flush all slots
 for(let i=0;i<size;i++)
 {
 bufTime[i]=0;
 if(bufEdited[i] == 1) 
 ctr1++;
 }
 for(let i=0;i<size;i++)
 {
 if(bufEdited[i] == 1)
 { 
 // async
 less concurrency me.set(bufKey[i],bufData[i]pressure,function(val){
 ctr1--;
 if(ctr1 == 0)
  less {failure
 await callbackme.setAwaitable(); // flush complete
 }
 }bufKey[i],bufData[i]);
 }
 }
 callback(); // flush complete
 });
 };
};
exports.Lru = Lru;

Cache hit ratio test with a simulation of image smoothing using get and setCaching Redis For Get/Set Operations:

'use strict';
let read_and_write_on_same_cache = false;
let N = 10;
let backing_store = [];
let backing_store2 = [];
for(let y=0;y<N;y++)
{
 backing_store.push([]);
 backing_store2.push([]);
 for(let"use x=0;x<N;x++)strict"
 {
 let r = Math.sqrt( (x-N/2) * (x-N/2) + (y-N/2) * (ybacking-N/2) ); // pixel value depending on distance to center of circlestore
const Redis = backing_store[y].pushrequire(r"ioredis");
const redis = new backing_store2[y].pushRedis(0);
 }
}
let cache_hit_and_miss = 0;
let cache_misses = 0;
let cache_write_misses// =LRU 0;
cache
let Lru = require("./lrucache.js").Lru;
let num_cache_elements = 25;1500;
let element_life_time_miliseconds = 10000;1000;
let cache = new Lru(num_cache_elements, async function(key,callback){ 
 cache_misses++;

 let obj = JSONredis.parseget(key);
 let x = obj.x;
 let y = obj.y;
 // simulating slow access to a backing, storefunction (will be hidden by async accesserr, anywayresult) setTimeout(function(){
 // send the result to a RAM slot in the cache
 callback(backing_store[y][x]result);
 },10);
}, element_life_time_miliseconds, async function(key,value,callback){
 cache_write_misses++; 
redis.set(key,value, function (err, result) {
 let obj = JSON.parse callback(key);
 let});
});
const xN_repeat = obj.x;20;
 letconst yN_bench = obj.y;
20000;
 // simulatingconst slowN_concurrency access= to100;
const aN_dataset backing= store1000;
function randomKey(will be hidden by async access anyway){ return setTimeoutMath.floor(functionMath.random(){*N_dataset); }
// sendwithout theLRU resultcaching
async tofunction abenchWithout(callback){
 RAM slot in thefor(let cachei=0;i<N_bench;i+=N_concurrency){
 backing_store[y][x]=value;
 let ctr = 0;
 callback();
 },10);
});
let cache2w8 = new LruPromise(num_cache_elements, async function(keysuccess,callbackfail)=>{
 cache_misses++;
 let obj = JSON.parse(key);
  for(let x = obj.x;j=0;j<N_concurrency;j++)
 let y = obj.y;
 // simulating slow{
 access to a backing store (will be hidden by async access anyway)
 setTimeoutredis.set(functionrandomKey(){
,i, function (err, result) {
 // send the result to a RAM slot in the cache
 callbackredis.get(backing_store2[y][x]);
 },100randomKey();
}, element_life_time_miliseconds, async function(key,valueerr,callback result){
 cache_write_misses++;
 let obj = JSON.parse(key);
 let x = obj.x;
 ctr++;
 let y = obj.y;
 // simulating slow access to a backing store (will be hidden by async access anyway)
  if(ctr == setTimeout(function(N_concurrency){

 // send the result to a RAM slot in the cache
  backing_store2[y][x]=value;{
 callback();
 },100);
});
console.log("cache size = "+num_cache_elements+" pixels");
console.log("image size = "+(N*N)+" pixels");
// image softening algorithm
async function benchmark()
{
 let timing = Date.nowsuccess(1);
 for(let y=1;y<N-1;y++)
 {
 for(let x=1;x<N-1;x++)
 } {
 });
 cache_hit_and_miss += 6; // 5 pixels requested for read, 1 for write });
 }
 });
 let softened_pixelresult = (await cache.getMultipleAwaitable(w8;
 JSON.stringify({x:x,y:y}), // center pixel}
 callback();
}
// with LRU caching
async function benchWith(callback){
 for(let i=0;i<N_bench;i+=N_concurrency){
 let ctr = 0;
 let w8 = new JSON.stringifyPromise({x:x(success,y:y-1}fail), // neighbor up=>{
 for(let j=0;j<N_concurrency;j++)
 {
 JSON cache.stringifyset({x:x,y:y+1}randomKey(), //i, neighborfunction down(result) {
 cache.get(randomKey(), function (result) {
 JSON.stringify({x:x-1,y:y}), // neighbor left
 ctr++;
 if(ctr == N_concurrency)
 JSON.stringify({x:x+1,y:y}) // neighbor right
 )).reduce((e1,e2)=>{return e1+e2;})/5.0;

 let target = 0;
 if success(read_and_write_on_same_cache1);
 target = cache;
  } else
 target = cache2;
  target.set(JSON.stringify({x:x,y:y}),softened_pixel,async function(val){;
 });
 }
 if(y==N-2 && x == N-2});
 let result = await w8;
 {}
 callback();
}
let ctr = 0;
function restartWithoutLRU(callback){
 let t = Date.now();
 benchWithout(function(){
 // write all edited data to backing-store andconsole.log("without readLRU: all"+(Date.now() non-read data fromt)+" backing-storemilliseconds");
 ctr++;
 if(ctr != target.flush(function(N_repeat){
 {
 console.logrestartWithoutLRU("softened image pixels: "callback);
 }
 else
 console.log(read_and_write_on_same_cache?backing_store:backing_store2);{
 ctr=0;
 console.log("run-time: "+(Date.nowcallback() -;
 timing)+" ms }
 "+((N-2)*(N-2)*5})+" reads;
}
function "+((N-2)*restartWithLRU(N-2))+" writes");{
 let t = Date.now();
 benchWith(function(){
 console.log("cache read"with hitLRU: ratio="+"+(100*Date.now(cache_hit_and_miss) - cache_misses)/cache_hit_and_misst)+" milliseconds");
 ctr++;
 console.log("cache write hit ratio="+(100*if(cache_hit_and_missctr -!= cache_write_misses)/cache_hit_and_miss)N_repeat);
 {
 }restartWithLRU();
 }
 });
}
 }
}

benchmarkrestartWithoutLRU(restartWithLRU);

On my low end computer (2GHz bulldozer cpu, 1 channel ddr3 1600MHz), API overhead causes a performance limitation that is around 1500 cache (read/write) hits per millisecond and about 300850 cache (read/write) misses per millisecond. I couldn't find any better way to keep asynchronity and run faster. Would it be logical to shard the cache on multiple cores and have a multi-core cache or would it be better with single thread server serving to all cores?

'use strict';
/* 
cacheSize: number of elements in cache, constant, must be greater than or equal to number of asynchronous accessors / cache misses
callbackBackingStoreLoad: user-given cache-miss function to load data from datastore
elementLifeTimeMs: maximum miliseconds before an element is invalidated, only invalidated at next get() call with its key
reload: evicts all cache to reload new values from backing store
reloadKey: only evicts selected item (to reload its new value on next access)
*/
let Lru = function(cacheSize,callbackBackingStoreLoad,elementLifeTimeMs=1000,callbackBackingStoreSave){
 const me = this;
 const aTypeGet = 0;
 const aTypeSet = 1;
 const maxWait = elementLifeTimeMs;
 const size = parseInt(cacheSize,10);
 const mapping = {};
 const mappingInFlightMiss = {};
 const bufData = new Array(size);
 const bufVisited = new Uint8Array(size);

 // todo: add write-cache operations. set(key,value,callback) setMultiple(keys, values, callback) setAwaitable(key,value) setMultipleAwaitable(keys,values)
 const bufEdited = new Uint8Array(size);
 const bufKey = new Array(size);
 const bufTime = new Float64Array(size);
 const bufLocked = new Uint8Array(size);
 for(let i=0;i<size;i++)
 {
 let rnd = Math.random();
 mapping[rnd] = i;
  
 bufData[i]="";
 bufVisited[i]=0;
 bufEdited[i]=0;
 bufKey[i]=rnd;
 bufTime[i]=0;
 bufLocked[i]=0;
 }
 let ctr = 0;
 let ctrEvict = parseInt(cacheSize/2,10);
 const loadData = callbackBackingStoreLoad;
 const saveData = callbackBackingStoreSave;
 let inFlightMissCtr = 0;
 // refresh all items time-span in cache
 this.reload=function(){
 for(let i=0;i<size;i++)
 {
 bufTime[i]=0;
 }
 };
 // refresh item time-span in cache by triggering eviction
 this.reloadKey=function(key){
 if(key in mapping)
 {
 bufTime[mapping[key]]=0;
 }
 };
 // get value by key
 this.get = function(keyPrm,callbackPrm){
 // aType=0: get
 access(keyPrm,callbackPrm,aTypeGet);
 };
 // set value by key (callback returns same value)
 this.set = function(keyPrm,valuePrm,callbackPrm){
 // aType=1: set
 access(keyPrm,callbackPrm,aTypeSet,valuePrm);
 };
 
 // aType=0: get
 // aType=1: set
 function access(keyPrm,callbackPrm,aType,valuePrm){
 
 const key = keyPrm;
 const callback = callbackPrm;
 const value = valuePrm;
 // stop dead-lock when many async get calls are made
 if(inFlightMissCtr>=size)
 {
 setTimeout(function(){
 // get/set
 access(key,function(newData){
 callback(newData);
 },aType,value);
 },0);
 return;
 }
 
 // if key is busy, then delay the request towards end of the cache-miss completion
 if(key in mappingInFlightMiss)
 {
 
 setTimeout(function(){
 // get/set
 access(key,function(newData){
 callback(newData);
 },aType,value);
 },0);
 return;
 }
 if(key in mapping)
 {
 // slot is an element in the circular buffer of CLOCK algorithm
 let slot = mapping[key];
 // RAM speed data
 if((Date.now() - bufTime[slot]) > maxWait)
 {
 
 // if slot is locked by another operation, postpone the current operation
 if(bufLocked[slot])
 { 
 setTimeout(function(){
 access(key,function(newData){
 callback(newData);
 },aType,value);
 },0);
 
 }
 else // slot is not locked and its lifespan has ended
 {
 // if it was edited, update the backing-store first
 if(bufEdited[slot] == 1)
 {
 bufLocked[slot] = 1;
 bufEdited[slot]=0;
 mappingInFlightMiss[key] = 1; // lock key
 inFlightMissCtr++;
 // update backing-store, this is async
 saveData(bufKey[slot],bufData[slot],function(){ 
 delete mappingInFlightMiss[key]; // unlock key
 bufLocked[slot] = 0;
 inFlightMissCtr--;
 delete mapping[key]; // disable mapping for current key
 
 // re-simulate the access, async
 access(key,function(newData){
 callback(newData);
 },aType,value);
 });
 }
 else
 {
 delete mapping[key]; // disable mapping for current key
 access(key,function(newData){
 
 callback(newData);
 },aType,value);
 }
 }
 
 }
 else // slot life span has not ended
 {
 bufVisited[slot]=1;
 bufTime[slot] = Date.now();
 // if it is a "set" operation
 if(aType == aTypeSet)
 { 
 bufEdited[slot] = 1; // later used when data needs to be written to data-store (write-cache feature)
 bufData[slot] = value;
 }
 callback(bufData[slot]);
 }
 }
 else
 {
 // datastore loading + cache eviction
 let ctrFound = -1;
 let oldVal = 0;
 let oldKey = 0;
 while(ctrFound===-1)
 {
 // give slot a second chance before eviction
 if(!bufLocked[ctr] && bufVisited[ctr])
 {
 bufVisited[ctr]=0;
 }
 ctr++;
 if(ctr >= size)
 {
 ctr=0;
 }
 // eviction conditions
 if(!bufLocked[ctrEvict] && !bufVisited[ctrEvict])
 {
 // eviction preparations, lock the slot
 bufLocked[ctrEvict] = 1;
 inFlightMissCtr++;
 ctrFound = ctrEvict;
 oldVal = bufData[ctrFound];
 oldKey = bufKey[ctrFound];
 }
 ctrEvict++;
 if(ctrEvict >= size)
 {
 ctrEvict=0;
 }
 }
 
 // user-requested key is now asynchronously in-flight & locked for other operations
 mappingInFlightMiss[key]=1;
 
 // eviction function. least recently used data is gone, newest recently used data is assigned
 let evict = function(res){
 delete mapping[bufKey[ctrFound]];
 bufData[ctrFound]=res;
 bufVisited[ctrFound]=0;
 bufKey[ctrFound]=key;
 bufTime[ctrFound]=Date.now();
 bufLocked[ctrFound]=0;
 mapping[key] = ctrFound;
 callback(res);
 inFlightMissCtr--;
 delete mappingInFlightMiss[key];
 
 };
 // if old data was edited, send it to data-store first, then fetch new data
 if(bufEdited[ctrFound] == 1)
 {
 if(aType == aTypeGet)
 bufEdited[ctrFound] = 0;
 // old edited data is sent back to data-store
 saveData(oldKey,oldVal,function(){ 
 if(aType == aTypeGet)
 loadData(key,evict);
 else if(aType == aTypeSet)
 evict(value);
 });
 }
 else
 {
 if(aType == aTypeSet)
 bufEdited[ctrFound] = 1;
 if(aType == aTypeGet)
 loadData(key,evict);
 else if(aType == aTypeSet)
 evict(value); 
 }
 }
 };
 this.getAwaitable = function(key){
 return new Promise(function(success,fail){ 
 me.get(key,function(data){
 success(data);
 });
 });
 }
 this.setAwaitable = function(key,value){
 return new Promise(function(success,fail){ 
 me.set(key,value,function(data){
 success(data);
 });
 });
 }
 this.getMultiple = function(callback, ... keys){
 let result = [];
 let ctr1 = keys.length;
 for(let i=0;i<ctr1;i++)
 result.push(0);
 let ctr2 = 0;
 keys.forEach(function(key){
 let ctr3 = ctr2++;
 me.get(key,function(data){
 result[ctr3] = data;
 ctr1--;
 if(ctr1==0)
 {
 callback(result);
 }
 });
 });
 };
 this.setMultiple = function(callback, ... keyValuePairs){
 let result = [];
 let ctr1 = keys.length;
 for(let i=0;i<ctr1;i++)
 result.push(0);
 let ctr2 = 0;
 keyValuePairs.forEach(function(pair){
 let ctr3 = ctr2++;
 me.set(pair.key,pair.value,function(data){
 result[ctr3] = data;
 ctr1--;
 if(ctr1==0)
 {
 callback(result);
 }
 });
 });
 };
 this.getMultipleAwaitable = function(... keys){
 return new Promise(function(success,fail){
 me.getMultiple(function(results){
 success(results);
 }, ... keys);
 });
 };
 this.setMultipleAwaitable = function(... keyValuePairs){
 return new Promise(function(success,fail){
 me.setMultiple(function(results){
 success(results);
 }, ... keyValuePairs);
 });
 };
 // push all edited slots to backing-store and reset all slots lifetime to "out of date"
 this.flush = function(callback){
 let ctr1 = 0;
 function waitForReadWrite(callbackW){
 // if there are in-flight cache-misses cache-write-misses or active slot locks, then wait
 if(Object.keys(mappingInFlightMiss).length > 0 || bufLocked.reduce((e1,e2)=>{return e1+e2;}) > 0)
 {
 setTimeout(()=>{ waitForReadWrite(callbackW); },0);
 }
 else
 callbackW();
 }
 waitForReadWrite(function(){ // flush all slots
 for(let i=0;i<size;i++)
 {
 bufTime[i]=0;
 if(bufEdited[i] == 1) 
 ctr1++;
 }
 for(let i=0;i<size;i++)
 {
 if(bufEdited[i] == 1)
 { 
 // async
  me.set(bufKey[i],bufData[i],function(val){
 ctr1--;
 if(ctr1 == 0)
  {
 callback(); // flush complete
 }
 });
 }
 }
 });
 };
};
exports.Lru = Lru;

Cache hit ratio test with a simulation of image smoothing using get and set:

'use strict';
let read_and_write_on_same_cache = false;
let N = 10;
let backing_store = [];
let backing_store2 = [];
for(let y=0;y<N;y++)
{
 backing_store.push([]);
 backing_store2.push([]);
 for(let x=0;x<N;x++)
 {
 let r = Math.sqrt( (x-N/2) * (x-N/2) + (y-N/2) * (y-N/2) ); // pixel value depending on distance to center of circle
 backing_store[y].push(r);
 backing_store2[y].push(0);
 }
}
let cache_hit_and_miss = 0;
let cache_misses = 0;
let cache_write_misses = 0;

let Lru = require("./lrucache.js").Lru;
let num_cache_elements = 25;
let element_life_time_miliseconds = 10000;
let cache = new Lru(num_cache_elements, async function(key,callback){ 
 cache_misses++;

 let obj = JSON.parse(key);
 let x = obj.x;
 let y = obj.y;
 // simulating slow access to a backing store (will be hidden by async access anyway) setTimeout(function(){
 // send the result to a RAM slot in the cache
 callback(backing_store[y][x]);
 },10);
}, element_life_time_miliseconds, async function(key,value,callback){
 cache_write_misses++; 
 
 let obj = JSON.parse(key);
 let x = obj.x;
 let y = obj.y;

 // simulating slow access to a backing store (will be hidden by async access anyway) setTimeout(function(){
// send the result to a RAM slot in the cache
 backing_store[y][x]=value;
  callback();
 },10);
});
let cache2 = new Lru(num_cache_elements, async function(key,callback){
 cache_misses++;
 let obj = JSON.parse(key);
  let x = obj.x;
 let y = obj.y;
 // simulating slow access to a backing store (will be hidden by async access anyway)
 setTimeout(function(){
 // send the result to a RAM slot in the cache
 callback(backing_store2[y][x]);
 },100);
}, element_life_time_miliseconds, async function(key,value,callback){
 cache_write_misses++;
 let obj = JSON.parse(key);
 let x = obj.x;
 let y = obj.y;
 // simulating slow access to a backing store (will be hidden by async access anyway)
  setTimeout(function(){

 // send the result to a RAM slot in the cache
  backing_store2[y][x]=value;
 callback();
 },100);
});
console.log("cache size = "+num_cache_elements+" pixels");
console.log("image size = "+(N*N)+" pixels");
// image softening algorithm
async function benchmark()
{
 let timing = Date.now();
 for(let y=1;y<N-1;y++)
 {
 for(let x=1;x<N-1;x++)
 {
 cache_hit_and_miss += 6; // 5 pixels requested for read, 1 for write
 
 
 let softened_pixel = (await cache.getMultipleAwaitable( JSON.stringify({x:x,y:y}), // center pixel
 JSON.stringify({x:x,y:y-1}), // neighbor up
 JSON.stringify({x:x,y:y+1}), // neighbor down 
 JSON.stringify({x:x-1,y:y}), // neighbor left
 JSON.stringify({x:x+1,y:y}) // neighbor right
 )).reduce((e1,e2)=>{return e1+e2;})/5.0;

 let target = 0;
 if(read_and_write_on_same_cache)
 target = cache;
  else
 target = cache2;
  target.set(JSON.stringify({x:x,y:y}),softened_pixel,async function(val){
 
 if(y==N-2 && x == N-2)
 {
 // write all edited data to backing-store and read all non-read data from backing-store
 target.flush(function(){
 console.log("softened image pixels: ");
 console.log(read_and_write_on_same_cache?backing_store:backing_store2);
 console.log("run-time: "+(Date.now() - timing)+" ms "+((N-2)*(N-2)*5)+" reads "+((N-2)*(N-2))+" writes");
 console.log("cache read hit ratio="+(100*(cache_hit_and_miss - cache_misses)/cache_hit_and_miss));
 console.log("cache write hit ratio="+(100*(cache_hit_and_miss - cache_write_misses)/cache_hit_and_miss));
 });
 }
 });
}
 }
}

benchmark();

On my low end computer, API overhead is around 1500 cache hits per millisecond and about 300 cache misses per millisecond. I couldn't find any better way to keep asynchronity and run faster. Would it be logical to shard the cache on multiple cores and have a multi-core cache or would it be better with single thread server serving to all cores?

'use strict';
/* 
* cacheSize: number of elements in cache, constant, must be greater than or equal to number of asynchronous accessors / cache misses
* callbackBackingStoreLoad: user-given cache(read)-miss function to load data from datastore
* takes 2 parameters: key, callback
* example:
* async function(key,callback){ 
* redis.get(key,function(data,err){ 
* callback(data); 
* }); 
* }
* callbackBackingStoreSave: user-given cache(write)-miss function to save data to datastore
* takes 3 parameters: key, value, callback
* example:
* async function(key,value,callback){ 
* redis.set(key,value,function(err){ 
* callback(); 
* }); 
* }
* elementLifeTimeMs: maximum miliseconds before an element is invalidated, only invalidated at next get() or set() call with its key
* flush(): all in-flight get/set accesses are awaited and all edited keys are written back to backing-store. flushes the cache.
* reload(): evicts all cache to reload new values from backing store
* reloadKey(): only evicts selected item (to reload its new value on next access)
*
*/
let Lru = function(cacheSize,callbackBackingStoreLoad,elementLifeTimeMs=1000,callbackBackingStoreSave){
 const me = this;
 const aTypeGet = 0;
 const aTypeSet = 1;
 const maxWait = elementLifeTimeMs;
 const size = parseInt(cacheSize,10);
 const mapping = new Map();
 const mappingInFlightMiss = new Map();
 const bufData = new Array(size);
 const bufVisited = new Uint8Array(size);
 const bufEdited = new Uint8Array(size);
 const bufKey = new Array(size);
 const bufTime = new Float64Array(size);
 const bufLocked = new Uint8Array(size);
 for(let i=0;i<size;i++)
 {
 let rnd = Math.random();
 mapping.set(rnd,i); 
 bufData[i]="";
 bufVisited[i]=0;
 bufEdited[i]=0;
 bufKey[i]=rnd;
 bufTime[i]=0;
 bufLocked[i]=0;
 }
 let ctr = 0;
 let ctrEvict = parseInt(cacheSize/2,10);
 const loadData = callbackBackingStoreLoad;
 const saveData = callbackBackingStoreSave;
 let inFlightMissCtr = 0;
 // refresh all items time-span in cache
 this.reload=function(){
 for(let i=0;i<size;i++)
 {
 bufTime[i]=0;
 }
 };
 // refresh item time-span in cache by triggering eviction
 this.reloadKey=function(key){
 if(mapping.has(key))
 {
 bufTime[mapping[key]]=0;
 }
 };
 // get value by key
 this.get = function(keyPrm,callbackPrm){
 // aType=0: get
 access(keyPrm,callbackPrm,aTypeGet);
 };
 // set value by key (callback returns same value)
 this.set = function(keyPrm,valuePrm,callbackPrm){
 // aType=1: set
 access(keyPrm,callbackPrm,aTypeSet,valuePrm);
 };
 
 // aType=0: get
 // aType=1: set
 function access(keyPrm,callbackPrm,aType,valuePrm){
 
 const key = keyPrm;
 const callback = callbackPrm;
 const value = valuePrm;
 // stop dead-lock when many async get calls are made
 if(inFlightMissCtr>=size)
 {
 setTimeout(function(){
 // get/set
 access(key,function(newData){
 callback(newData);
 },aType,value);
 },0);
 return;
 }
 
 // if key is busy, then delay the request towards end of the cache-miss completion
 if(mappingInFlightMiss.has(key))
 {
 
 setTimeout(function(){
 // get/set
 access(key,function(newData){
 callback(newData);
 },aType,value);
 },0);
 return;
 }
 if(mapping.has(key))
 {
 // slot is an element in the circular buffer of CLOCK algorithm
 let slot = mapping.get(key);
 // RAM speed data
 if((Date.now() - bufTime[slot]) > maxWait)
 {
 
 // if slot is locked by another operation, postpone the current operation
 if(bufLocked[slot])
 { 
 setTimeout(function(){
 access(key,function(newData){
 callback(newData);
 },aType,value);
 },0);
 
 }
 else // slot is not locked and its lifespan has ended
 {
 // if it was edited, update the backing-store first
 if(bufEdited[slot] == 1)
 {
 bufLocked[slot] = 1;
 bufEdited[slot]=0;
 mappingInFlightMiss.set(key,1); // lock key
 inFlightMissCtr++;
 // update backing-store, this is async
 saveData(bufKey[slot],bufData[slot],function(){ 
 mappingInFlightMiss.delete(key); // unlock key
 bufLocked[slot] = 0;
 inFlightMissCtr--;
 mapping.delete(key); // disable mapping for current key
 
 // re-simulate the access, async
 access(key,function(newData){
 callback(newData);
 },aType,value);
 });
 }
 else
 {
 mapping.delete(key); // disable mapping for current key
 access(key,function(newData){
 
 callback(newData);
 },aType,value);
 }
 }
 
 }
 else // slot life span has not ended
 {
 bufVisited[slot]=1;
 bufTime[slot] = Date.now();
 // if it is a "set" operation
 if(aType == aTypeSet)
 { 
 bufEdited[slot] = 1; // later used when data needs to be written to data-store (write-cache feature)
 bufData[slot] = value;
 }
 callback(bufData[slot]);
 }
 }
 else
 {
 // datastore loading + cache eviction
 let ctrFound = -1;
 let oldVal = 0;
 let oldKey = 0;
 while(ctrFound===-1)
 {
 // give slot a second chance before eviction
 if(!bufLocked[ctr] && bufVisited[ctr])
 {
 bufVisited[ctr]=0;
 }
 ctr++;
 if(ctr >= size)
 {
 ctr=0;
 }
 // eviction conditions
 if(!bufLocked[ctrEvict] && !bufVisited[ctrEvict])
 {
 // eviction preparations, lock the slot
 bufLocked[ctrEvict] = 1;
 inFlightMissCtr++;
 ctrFound = ctrEvict;
 oldVal = bufData[ctrFound];
 oldKey = bufKey[ctrFound];
 }
 ctrEvict++;
 if(ctrEvict >= size)
 {
 ctrEvict=0;
 }
 }
 
 // user-requested key is now asynchronously in-flight & locked for other operations
 mappingInFlightMiss.set(key,1);
 
 // eviction function. least recently used data is gone, newest recently used data is assigned
 let evict = function(res){
 mapping.delete(bufKey[ctrFound]);
 bufData[ctrFound]=res;
 bufVisited[ctrFound]=0;
 bufKey[ctrFound]=key;
 bufTime[ctrFound]=Date.now();
 bufLocked[ctrFound]=0;
 mapping.set(key,ctrFound);
 callback(res);
 inFlightMissCtr--;
 mappingInFlightMiss.delete(key);
 
 };
 // if old data was edited, send it to data-store first, then fetch new data
 if(bufEdited[ctrFound] == 1)
 {
 if(aType == aTypeGet)
 bufEdited[ctrFound] = 0;
 // old edited data is sent back to data-store
 saveData(oldKey,oldVal,function(){ 
 if(aType == aTypeGet)
 loadData(key,evict);
 else if(aType == aTypeSet)
 evict(value);
 });
 }
 else
 {
 if(aType == aTypeSet)
 bufEdited[ctrFound] = 1;
 if(aType == aTypeGet)
 loadData(key,evict);
 else if(aType == aTypeSet)
 evict(value); 
 }
 }
 };
 this.getAwaitable = function(key){
 return new Promise(function(success,fail){ 
 me.get(key,function(data){
 success(data);
 });
 });
 }
 this.setAwaitable = function(key,value){
 return new Promise(function(success,fail){ 
 me.set(key,value,function(data){
 success(data);
 });
 });
 }
 // as many keys as required can be given, separated by commas
 this.getMultiple = function(callback, ... keys){
 let result = [];
 let ctr1 = keys.length;
 for(let i=0;i<ctr1;i++)
 result.push(0);
 let ctr2 = 0;
 keys.forEach(function(key){
 let ctr3 = ctr2++;
 me.get(key,function(data){
 result[ctr3] = data;
 ctr1--;
 if(ctr1==0)
 {
 callback(result);
 }
 });
 });
 };
 // as many key-value pairs ( in form of { key:foo, value:bar } ) can be given, separated by commas
 this.setMultiple = function(callback, ... keyValuePairs){
 let result = [];
 let ctr1 = keyValuePairs.length;
 for(let i=0;i<ctr1;i++)
 result.push(0);
 let ctr2 = 0;
 keyValuePairs.forEach(function(pair){
 let ctr3 = ctr2++;
 me.set(pair.key,pair.value,function(data){
 result[ctr3] = data;
 ctr1--;
 if(ctr1==0)
 {
 callback(result);
 }
 });
 });
 };
 // as many keys as required can be given, separated by commas
 this.getMultipleAwaitable = function(... keys){
 return new Promise(function(success,fail){
 me.getMultiple(function(results){
 success(results);
 }, ... keys);
 });
 };
 // as many key-value pairs ( in form of { key:foo, value:bar } ) can be given, separated by commas
 this.setMultipleAwaitable = function(... keyValuePairs){
 return new Promise(function(success,fail){
 me.setMultiple(function(results){
 success(results);
 }, ... keyValuePairs);
 });
 };
 // push all edited slots to backing-store and reset all slots lifetime to "out of date"
 this.flush = function(callback){
 function waitForReadWrite(callbackW){
 // if there are in-flight cache-misses cache-write-misses or active slot locks, then wait
 if(mappingInFlightMiss.size > 0 || bufLocked.reduce((e1,e2)=>{return e1+e2;}) > 0)
 {
 setTimeout(()=>{ waitForReadWrite(callbackW); },10);
 }
 else
 callbackW();
 }
 waitForReadWrite(async function(){ 
 for(let i=0;i<size;i++)
 {
 bufTime[i]=0;
 if(bufEdited[i] == 1)
 { 
 // less concurrency pressure, less failure
 await me.setAwaitable(bufKey[i],bufData[i]);
 }
 }
 callback(); // flush complete
 });
 };
};
exports.Lru = Lru;

Caching Redis For Get/Set Operations:

"use strict"
// backing-store
const Redis = require("ioredis");
const redis = new Redis(); 
// LRU cache
let Lru = require("./lrucache.js").Lru;
let num_cache_elements = 1500;
let element_life_time_miliseconds = 1000;
let cache = new Lru(num_cache_elements, async function(key,callback){
 redis.get(key, function (err, result) {
 callback(result);
 });
}, element_life_time_miliseconds, async function(key,value,callback){
 redis.set(key,value, function (err, result) {
  callback();
 });
});
const N_repeat = 20;
const N_bench = 20000;
const N_concurrency = 100;
const N_dataset = 1000;
function randomKey(){ return Math.floor(Math.random()*N_dataset); }
// without LRU caching
async function benchWithout(callback){
 for(let i=0;i<N_bench;i+=N_concurrency){
 let ctr = 0;
 let w8 = new Promise((success,fail)=>{
 for(let j=0;j<N_concurrency;j++)
 {
 redis.set(randomKey(),i, function (err, result) {
 redis.get(randomKey(), function(err, result){
 ctr++;
 if(ctr == N_concurrency)
 {
 success(1);
 } 
 });
  });
 }
 });
 let result = await w8;
 }
 callback();
}
// with LRU caching
async function benchWith(callback){
 for(let i=0;i<N_bench;i+=N_concurrency){
 let ctr = 0;
 let w8 = new Promise((success,fail)=>{
 for(let j=0;j<N_concurrency;j++)
 {
  cache.set(randomKey(),i, function (result) {
 cache.get(randomKey(), function (result) {
 ctr++;
 if(ctr == N_concurrency)
 {
  success(1);
 } 
 });
 });
 }
 });
 let result = await w8;
 }
 callback();
}
let ctr = 0;
function restartWithoutLRU(callback){
 let t = Date.now();
 benchWithout(function(){
 console.log("without LRU: "+(Date.now() - t)+" milliseconds");
 ctr++;
 if(ctr != N_repeat)
 {
 restartWithoutLRU(callback);
 }
 else
 {
 ctr=0;
 callback();
 }
 });
}
function restartWithLRU(){
 let t = Date.now();
 benchWith(function(){
 console.log("with LRU: "+(Date.now() - t)+" milliseconds");
 ctr++;
 if(ctr != N_repeat)
 {
 restartWithLRU();
 }
 });
}
restartWithoutLRU(restartWithLRU);

On my low end computer (2GHz bulldozer cpu, 1 channel ddr3 1600MHz), API overhead causes a performance limitation that is around 1500 cache (read/write) hits per millisecond and about 850 cache (read/write) misses per millisecond. I couldn't find any better way to keep asynchronity and run faster. Would it be logical to shard the cache on multiple cores and have a multi-core cache or would it be better with single thread server serving to all cores?

added 147 characters in body
Source Link

Edit: using new Map instead of plain object for the mapping doubled the cache miss performance, lowered its latency for both reads and writes.

Edit: using new Map instead of plain object for the mapping doubled the cache miss performance, lowered its latency for both reads and writes.

Added extra info at end.
Source Link
Loading
added 43 characters in body
Source Link
Loading
Source Link
Loading
lang-js

AltStyle によって変換されたページ (->オリジナル) /