2
\$\begingroup\$

I have about a million records in Redis which I want to dump into Elasticsearch periodically. I just want to make sure that my script is decent enough in terms of speed and no memory leaks.

'use strict';
const redis = require('redis');
const bluebird = require('bluebird');
const request = require('request');
const elasticsearch = require('elasticsearch');
const fs = require('fs');
const _ = require('lodash');
const async = require('async');
const sh = require('shorthash');
const sleep = require('sleep');
const config = require('../config');
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
let client = redis.createClient({
 host: config.redis.url,
 port: config.redis.port
});
let ES = elasticsearch.Client({
 host: config.elasticsearch.url,
 requestTimeout: 30000000
});
var keys = fs.readFileSync('no-keys').toString().split('\n');
keys = keys.filter((e) => e);
let chunkedKeys = _.chunk(keys, 1000);
console.log('We have ' + chunkedKeys.length + ' keys');
_.each(chunkedKeys, (chunkedKey) => {
 client.mget(chunkedKey, (mgetError, replies) => {
 if (mgetError) {
 console.error(mgetError);
 }
 console.log('MGET complete from Redis');
 console.log('We have ' + replies.length + ' documents');
 async.mapLimit(replies, 5, (reply, callback) => {
 try {
 let content = JSON.parse(reply);
 let k = sh.unique(content.url);
 let body = [{index: {_index: config.elasticsearch.index, _type: 'article', _id: k, _timestamp: (new Date()).toISOString() }}];
 body.push(content);
 callback(null, body);
 } catch(e) {
 console.error(e);
 callback(e, []);
 }
 }, (err, results) => {
 if(err) {
 console.log(err);
 }
 let mergedResult = _.flatten(results.filter((e) => e));
 console.log('Export complete with ' + mergedResult.length);
 ES.bulk({body: mergedResult}, () => {
 console.log('Import complete');
 });
 });
 });
});
Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Apr 17, 2016 at 17:30
\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

I can see two problems with your script:

  • You are doing sync stuff in async. It is not recommended. Async is recommended for asynchronous operations. You are using it to do synchronous stuff - in the async.mapLimit block. Neither JSON.parse nor sh.unique is asynchronous. It is critical that you under the difference. Please read async docs on synchronous operations
  • You are buffering all your keys into memory. I guess that is a smaller problem, but it is less efficient than using a readStream.

Please take a look at my implementation and feel free to use any parts of it

'use strict';
const redis = require('redis');
const bluebird = require('bluebird');
const request = require('request');
const elasticsearch = require('elasticsearch');
const fs = require('fs');
const _ = require('lodash');
const async = require('async');
const sh = require('shorthash');
const sleep = require('sleep');
const config = require('../config');
const readline = require('readline'); //handy wrapper for readStream
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
let client = redis.createClient({
 host: config.redis.url,
 port: config.redis.port
});
let ES = elasticsearch.Client({
 host: config.elasticsearch.url,
 requestTimeout: 30000000
});
const readLineStream = readline.createInterface({ //this stream will serve keys one per line
 input: fs.createReadStream('no-keys')
});
let keysBuffer = [];
readLineStream.on('line', (line) => { // you were splitting with '\n' I guess your input is key per-line
 if (line) { //check for empty lines
 keysBuffer.push(line);
 if (keysBuffer.length === 1000) {
 migrateKeys(keysBuffer);
 keysBuffer = [];
 }
 }
});
readLineStream.on('end', ()=>{
 if(keysBuffer.length >0){
 migrateKeys(keysBuffer); // remember to flush your local buffer
 }
});
function migrateKeys(chunkOfKeys) {
 client.mget(chunkOfKeys, (mgetError, replies) => {
 if (mgetError) {
 console.error(mgetError); // Consider returning early
 }
 console.log('MGET complete from Redis');
 console.log('We have ' + replies.length + ' documents');
 let parsedReplies = [];
 replies.forEach((reply)=> {
 try {
 let content = JSON.parse(reply);
 parsedReplies.push([{
 index: {
 _index: config.elasticsearch.index,
 _type: 'article',
 _id: sh.unique(content.url),
 _timestamp: (new Date()).toISOString()
 }
 }, content]); // no need to filter out replies with parse errors
 } catch (e) {
 console.error(e);
 }
 }); //using async for sync code is not recommended
 console.log('Export complete with ' + parsedReplies.length);
 ES.bulk({body: parsedReplies}, () => {
 console.log('Import complete');
 });
 });
}
pacmaninbw
26.2k13 gold badges47 silver badges113 bronze badges
answered Apr 19, 2016 at 21:55
\$\endgroup\$
2
  • \$\begingroup\$ Could you add an explanation of why async isn't recommended to your answer. If I understood why, I might be able to vote your answer up. \$\endgroup\$ Commented Aug 13, 2016 at 13:51
  • \$\begingroup\$ Async is recommended for asynchronous operations. You are using it to do synchronous stuff - in the async.mapLimit block. Neither JSON.parse nor sh.unique is asynchronous. It is critical that you under the difference. Please read async docs on synchronous operations \$\endgroup\$ Commented Aug 13, 2016 at 15:45

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.