\$\begingroup\$
\$\endgroup\$
I have about a million records in Redis which I want to dump into Elasticsearch periodically. I just want to make sure that my script is decent enough in terms of speed and no memory leaks.
'use strict';
const redis = require('redis');
const bluebird = require('bluebird');
const request = require('request');
const elasticsearch = require('elasticsearch');
const fs = require('fs');
const _ = require('lodash');
const async = require('async');
const sh = require('shorthash');
const sleep = require('sleep');
const config = require('../config');
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
let client = redis.createClient({
host: config.redis.url,
port: config.redis.port
});
let ES = elasticsearch.Client({
host: config.elasticsearch.url,
requestTimeout: 30000000
});
var keys = fs.readFileSync('no-keys').toString().split('\n');
keys = keys.filter((e) => e);
let chunkedKeys = _.chunk(keys, 1000);
console.log('We have ' + chunkedKeys.length + ' keys');
_.each(chunkedKeys, (chunkedKey) => {
client.mget(chunkedKey, (mgetError, replies) => {
if (mgetError) {
console.error(mgetError);
}
console.log('MGET complete from Redis');
console.log('We have ' + replies.length + ' documents');
async.mapLimit(replies, 5, (reply, callback) => {
try {
let content = JSON.parse(reply);
let k = sh.unique(content.url);
let body = [{index: {_index: config.elasticsearch.index, _type: 'article', _id: k, _timestamp: (new Date()).toISOString() }}];
body.push(content);
callback(null, body);
} catch(e) {
console.error(e);
callback(e, []);
}
}, (err, results) => {
if(err) {
console.log(err);
}
let mergedResult = _.flatten(results.filter((e) => e));
console.log('Export complete with ' + mergedResult.length);
ES.bulk({body: mergedResult}, () => {
console.log('Import complete');
});
});
});
});
Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Apr 17, 2016 at 17:30
1 Answer 1
\$\begingroup\$
\$\endgroup\$
2
I can see two problems with your script:
- You are doing sync stuff in
async
. It is not recommended. Async is recommended for asynchronous operations. You are using it to do synchronous stuff - in the async.mapLimit block. Neither JSON.parse nor sh.unique is asynchronous. It is critical that you under the difference. Please read async docs on synchronous operations - You are buffering all your keys into memory. I guess that is a smaller problem, but it is less efficient than using a readStream.
Please take a look at my implementation and feel free to use any parts of it
'use strict';
const redis = require('redis');
const bluebird = require('bluebird');
const request = require('request');
const elasticsearch = require('elasticsearch');
const fs = require('fs');
const _ = require('lodash');
const async = require('async');
const sh = require('shorthash');
const sleep = require('sleep');
const config = require('../config');
const readline = require('readline'); //handy wrapper for readStream
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
let client = redis.createClient({
host: config.redis.url,
port: config.redis.port
});
let ES = elasticsearch.Client({
host: config.elasticsearch.url,
requestTimeout: 30000000
});
const readLineStream = readline.createInterface({ //this stream will serve keys one per line
input: fs.createReadStream('no-keys')
});
let keysBuffer = [];
readLineStream.on('line', (line) => { // you were splitting with '\n' I guess your input is key per-line
if (line) { //check for empty lines
keysBuffer.push(line);
if (keysBuffer.length === 1000) {
migrateKeys(keysBuffer);
keysBuffer = [];
}
}
});
readLineStream.on('end', ()=>{
if(keysBuffer.length >0){
migrateKeys(keysBuffer); // remember to flush your local buffer
}
});
function migrateKeys(chunkOfKeys) {
client.mget(chunkOfKeys, (mgetError, replies) => {
if (mgetError) {
console.error(mgetError); // Consider returning early
}
console.log('MGET complete from Redis');
console.log('We have ' + replies.length + ' documents');
let parsedReplies = [];
replies.forEach((reply)=> {
try {
let content = JSON.parse(reply);
parsedReplies.push([{
index: {
_index: config.elasticsearch.index,
_type: 'article',
_id: sh.unique(content.url),
_timestamp: (new Date()).toISOString()
}
}, content]); // no need to filter out replies with parse errors
} catch (e) {
console.error(e);
}
}); //using async for sync code is not recommended
console.log('Export complete with ' + parsedReplies.length);
ES.bulk({body: parsedReplies}, () => {
console.log('Import complete');
});
});
}
answered Apr 19, 2016 at 21:55
-
\$\begingroup\$ Could you add an explanation of why
async
isn't recommended to your answer. If I understood why, I might be able to vote your answer up. \$\endgroup\$2016年08月13日 13:51:32 +00:00Commented Aug 13, 2016 at 13:51 -
\$\begingroup\$ Async is recommended for asynchronous operations. You are using it to do synchronous stuff - in the async.mapLimit block. Neither JSON.parse nor sh.unique is asynchronous. It is critical that you under the difference. Please read async docs on synchronous operations \$\endgroup\$Jan Grz– Jan Grz2016年08月13日 15:45:45 +00:00Commented Aug 13, 2016 at 15:45
Explore related questions
See similar questions with these tags.
default