4
\$\begingroup\$

I use this node.js script to migrate MongoDB collection to a schema to another. It does work if the collection is <20k documents, but slows down to a crawl and sometime throw a

FATAL ERROR: JS Allocation failed - process out of memory

Is it because of a memory leak? Or the intensive call to process.nextTick() (once per document)?

My guess is that if the mongoDB server fast enough to the save/too many documents, callback piles up and eat the CPU/RAM away and end up crashing the script.

Is my guess right? Is there a memory leak? What can I do to speed up the script/fix existing error(s)?

 var fs = require('fs'),
 mongoose = require('mongoose'),
 db = {}, //db.read will be the connection to the collection to migrate, db.write to the collection to migrate to
 S_logs = {}, //Mongoose schema (read and write)
 M_logs = {}, //Mongoose models (read and write)
 config, //Config file
 launched = 0,//Number of document migration started
 ended = 0, //Number of document migration finished
 percent = 0, //Used to calculate the progress bar
 stats = {
 ok: 0, //Number of succesful migration
 error: 0 //Number of failed migration
 };
function getCollection(callback) {
 console.log('|0% |10% |20% |30% |40% |50% |60% |70% |80% |90% |100%')
 M_logs.read.count({}, function (err, count) {
 var stream = M_logs.read.find().stream(); //Stream the entire collection, document by document
 stream.on('data', function (document) {
 launched = launched + 1;
 //Adapt the document to the new schema
 var P_log = {};
 for (name in config.write.data) {
 if (config.write.data.hasOwnProperty(name)) {
 P_log[name] = document[name] || '';
 }
 }
 //Clear the queue
 process.nextTick(function () {
 //Save the new document
 new M_logs.write(P_log).save(function (err) {
 //Update the progress bar
 while(ended > ((percent * count)/100)) {
 process.stdout.write('-');
 percent = percent + 1;
 }
 //Update the stats
 if(err) {
 stats.error = stats.error + 1;
 } else {
 stats.ok = stats.ok + 1;
 }
 ended = ended + 1;
 });
 });
 }).on('error', function (err) {
 launched = launched + 1;
 while(ended > ((percent * count)/100)) {
 process.stdout.write('-');
 percent = percent + 1;
 }
 ended = ended + 1;
 }).on('close', function () {
 process.nextTick(function (){
 //Wait for all transfert to end
 var wait = setInterval(function () {
 if(ended === launched) {
 clearInterval(wait);
 console.log('\nTransfert lancé: ' + launched);
 console.log('Transfert terminé: ' + ended);
 callback();
 }
 }, 1000);
 });
 });
 });
}
function connect(callback) {
 db.read = mongoose.createConnection(config.read.url);
 db.read.on('error', console.error.bind(console, 'connection error:'));
 db.read.once('open', function () {
 db.write = mongoose.createConnection(config.write.url);
 db.write.on('error', console.error.bind(console, 'connection error:'));
 db.write.once('open', function () {
 S_logs.read = new mongoose.Schema(
 config.read.data,
 {
 strict: false,
 collection: config.read.base
 }
 );
 M_logs.read = db.read.model(config.read.base, S_logs.read, config.read.base);
 S_logs.write = new mongoose.Schema(
 config.write.data,
 {
 collection: config.write.base
 }
 );
 S_logs.write.index(config.write.index, {unique: true});
 M_logs.write = db.write.model(config.write.base, S_logs.write, config.write.base);
 callback();
 });
 });
}
process.stdout.write('Reading config');
fs.readFile(process.argv[2] || 'config.js', function (err, data) {
 if (err) {
 console.log('Error in config.js:' + err);
 process.exit();
 }
 config = JSON.parse(data);
 console.log('...OK');
 console.log('From ' + config.read.url + ' / ' + config.read.base);
 console.log('To ' + config.write.url + ' / ' + config.write.base);
 process.stdout.write('Connecting to Mongo')
 connect(function () {
 console.log('...OK');
 getCollection(function () {
 console.log('OK: ' + stats.ok + ' / Error: ' + stats.error);
 process.exit();
 });
 });
});

Exemple of config file:

{
 "read":
 {
 "url" : "mongodb://localhost/read",
 "base" : "read_collection",
 "data": 
 {
 "user_ip" : "String",
 "user_id" : "String",
 "user_agent" : "String",
 "canal_id" : "String",
 "theme_id" : "String",
 "video_id" : "String",
 "time" : "Number",
 "action" : "String",
 "is_newuser" : "String",
 "operator" : "String",
 "template" : "String",
 "catalogue" : "String",
 "referer" : "String",
 "from" : "String",
 "request" : "String",
 "smil" : "String",
 "smil_path" : "String"
 }
 },
 "write":
 {
 "url" : "mongodb://localhost/write",
 "base" : "write_collection",
 "data": 
 {
 "user_ip" : "String",
 "user_id" : "String",
 "user_agent" : "String",
 "canal_id" : "String",
 "theme_id" : "String",
 "pays" : "String",
 "lang" : "String",
 "video_id" : "String",
 "time" : "Number",
 "action" : "String",
 "is_newuser" : "String",
 "operator" : "String",
 "template" : "String",
 "catalogue" : "String",
 "referer" : "String",
 "from" : "String",
 "request" : "String",
 "smil" : "String",
 "smil_path" : "String"
 },
 "index" : 
 {
 "user_ip" : 1,
 "user_id" : 1,
 "time" : 1
 }
 }
}

Looking at the CPU and RAM usage while the script try to transfer a 500k documents collection, I can see that the CPU is not really a issue (oscillating between 50% and 75% max), but RAM slowly but steadily grow. My guess is I should find a way to regulate the number of document waiting for a answer of the mongoDB server, and pause the streaming until that number as fall to a reasonably low level...

Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Nov 24, 2014 at 17:04
\$\endgroup\$

2 Answers 2

2
\$\begingroup\$

Interesting question,

It seems you are using a ton of closures. 13 out of 15 function definitions are anonymous functions. I cant find a single one that is capturing anything useful since you have a ton of global variables on top.

I would simply declare every one of those functions as a named function, it should already help a ton. And then indeed build some queue so that you are not DOS'ing your own machine.

answered Nov 25, 2014 at 18:55
\$\endgroup\$
2
  • \$\begingroup\$ For separating the code, I can do that. But I don't really see how I can make the queueing. Should calculate the difference between launched and ended and pause the streaming if the difference is over X will be enough? Or should I search more advanced algorithm? (Or should I ask thoses question on Programmers instead of Code Review?) \$\endgroup\$ Commented Nov 26, 2014 at 8:06
  • \$\begingroup\$ Please look into functional reactive programming, there are some excellent js libraries to help you out with this dilemma. en.wikipedia.org/wiki/Functional_reactive_programming \$\endgroup\$ Commented Nov 26, 2014 at 15:29
0
\$\begingroup\$

As a way of closing this question:

I named all function, here the result:

var fs = require('fs'),
 mongoose = require('mongoose'),
 db = {},
 S_logs = {},
 M_logs = {},
 config,
 percent = 0;
function updateProgressBar(launched, ended, percent, count) {
 // while(ended > ((percent * count)/100)) {
 // process.stdout.write('-');
 // percent = percent + 1;
 // }
 console.log(launched + ' / ' + ended + ' / ' + count);
}
function migrate(stream, count, read, write, callback) {
 var launched = 0,
 ended = 0,
 stats = {
 ok: 0,
 error: 0
 },
 wait,
 progressBar,
 queue;
 console.log('|0% |10% |20% |30% |40% |50% |60% |70% |80% |90% |100%');
 progressBar = setInterval(function () {
 updateProgressBar(launched, ended, percent, count);
 }, 1000);
 queue = setInterval(function () {
 if(!stream.paused && (launched - ended) > 50000) {
 stream.pause();
 console.log('PAUSE');
 } else if(stream.paused && (launched - ended) <= 25000) {
 stream.resume();
 console.log('RESUME');
 }
 }, 1000);
 stream.on('data', function (document) {
 launched = launched + 1;
 var P_log = {};
 for (name in config.write.data) {
 if (config.write.data.hasOwnProperty(name)) {
 P_log[name] = document[name] || '';
 }
 }
 process.nextTick(function () {
 new write(P_log).save(function (err) {
 if(err) {
 stats.error = stats.error + 1;
 } else {
 stats.ok = stats.ok + 1;
 }
 ended = ended + 1;
 });
 });
 }).on('error', function (err) {
 console.log('ERR: ' + err);
 process.exit();
 }).on('close', function () {
 if(launched === count) {
 console.log('CLOSE');
 process.nextTick(function (){
 wait = setInterval(function () {
 if(ended === launched) {
 clearInterval(wait);
 clearInterval(progressBar);
 clearInterval(queue);
 console.log('\nTransfert lancé: ' + launched);
 console.log('Transfert terminé: ' + ended);
 callback(stats);
 }
 }, 1000);
 });
 }
 });
}
function getCollection(callback) {
 M_logs.read.count({}, function (err, count) {
 var stream = M_logs.read.find().stream();
 callback(stream, count);
 });
}
function connect(callback) {
 //Se connecte a mongoDB au travers de mongoose, crée les shemas et modeles mongoose neccessaire, et lance l'ecooute sur le port 8080
 db.read = mongoose.createConnection(config.read.url); //Creer la connexion a mongodb
 db.read.on('error', console.error.bind(console, 'connection error:'));
 db.read.once('open', function () { //Une fois connecte
 db.write = mongoose.createConnection(config.write.url); //Creer la connexion a mongodb
 db.write.on('error', console.error.bind(console, 'connection error:'));
 db.write.once('open', function () { //Une fois connecte
 S_logs.read = new mongoose.Schema(
 config.read.data,
 {
 strict: false,
 collection: config.read.base
 }
 );
 M_logs.read = db.read.model(config.read.base, S_logs.read, config.read.base);
 S_logs.write = new mongoose.Schema(
 config.write.data,
 {
 collection: config.write.base
 }
 );
 S_logs.write.index(config.write.index, {unique: true});
 M_logs.write = db.write.model(config.write.base, S_logs.write, config.write.base);
 callback();
 });
 });
}
process.stdout.write('Reading config');
fs.readFile(process.argv[2] || 'config.js', function (err, data) {
 if (err) {
 console.log('Error in config.js:' + err);
 process.exit();
 }
 // console.log('' + data);
 config = JSON.parse(data);
 console.log('...OK');
 console.log('From ' + config.read.url + ' / ' + config.read.base);
 console.log('To ' + config.write.url + ' / ' + config.write.base);
 process.stdout.write('Connecting to Mongo')
 //Se connecter a MongoDB
 connect(function () {
 console.log('...OK');
 //Parser les fichiers
 getCollection(function (stream, count) {
 migrate(stream, count, M_logs.read, M_logs.write, function (stats) {
 console.log('OK: ' + stats.ok + ' / Error: ' + stats.error);
 process.exit();
 });
 });
 });
});

But even after that, I still had to makje the queue, and had problem with QueryStream.pause() and QueryStream.resume(), so I dumped QueryStream altogether (it s a internal tool, I just need something that work most of the time), the end result is far more slow, but don t have any memory problem:

var fs = require('fs'),
 mongoose = require('mongoose'),
 db = {},
 S_logs = {},
 M_logs = {},
 config,
 launched = 0,
 ended = 0,
 percent = 0,
 stats = {
 ok: 0,
 error: 0
 };
function migrate(i, count, callback) {
 // console.log('migrate ' + i);
 // console.log(ended + '/' + count);
 var query = M_logs.read.find().limit(5000).skip(i * 5000),
 wait,
 end = 0;
 query.exec(function (err, docs) {
 docs.forEach(function (document) {
 var P_log = {};
 for (name in config.write.data) {
 if (config.write.data.hasOwnProperty(name)) {
 P_log[name] = document[name] || '';
 }
 }
 process.nextTick(function () {
 new M_logs.write(P_log).save(function (err) {
 while(ended > ((percent * count)/100)) {
 process.stdout.write('-');
 percent = percent + 1;
 }
 if(err) {
 stats.error = stats.error + 1;
 } else {
 stats.ok = stats.ok + 1;
 }
 ended = ended + 1;
 end = end + 1;
 });
 });
 });
 wait = setInterval(function () {
 if(end === docs.length) {
 clearInterval(wait);
 callback(i);
 }
 }, 1000);
 });
}
function migrateAll(i, count, callback) {
 if(ended < count) {
 migrate(i, count, function () {
 migrateAll(i + 1, count, callback);
 });
 } else {
 callback();
 }
}
function getCollection(callback) {
 console.log('|0% |10% |20% |30% |40% |50% |60% |70% |80% |90% |100%')
 M_logs.read.count({}, function (err, count) {
 migrateAll(0, count, function() {
 callback();
 });
 });
}
function connect(callback) {
 //Se connecte a mongoDB au travers de mongoose, crée les shemas et modeles mongoose neccessaire, et lance l'ecooute sur le port 8080
 db.read = mongoose.createConnection(config.read.url); //Creer la connexion a mongodb
 db.read.on('error', console.error.bind(console, 'connection error:'));
 db.read.once('open', function () { //Une fois connecte
 db.write = mongoose.createConnection(config.write.url); //Creer la connexion a mongodb
 db.write.on('error', console.error.bind(console, 'connection error:'));
 db.write.once('open', function () { //Une fois connecte
 S_logs.read = new mongoose.Schema(
 config.read.data,
 {
 strict: false,
 collection: config.read.base
 }
 );
 M_logs.read = db.read.model(config.read.base, S_logs.read, config.read.base);
 S_logs.write = new mongoose.Schema(
 config.write.data,
 {
 collection: config.write.base
 }
 );
 S_logs.write.index(config.write.index, {unique: true});
 M_logs.write = db.write.model(config.write.base, S_logs.write, config.write.base);
 callback();
 });
 });
}
process.stdout.write('Reading config');
fs.readFile(process.argv[2] || 'config.js', function (err, data) {
 if (err) {
 console.log('Error in config.js:' + err);
 process.exit();
 }
 // console.log('' + data);
 config = JSON.parse(data);
 console.log('...OK');
 console.log('From ' + config.read.url + ' / ' + config.read.base);
 console.log('To ' + config.write.url + ' / ' + config.write.base);
 process.stdout.write('Connecting to Mongo')
 //Se connecter a MongoDB
 connect(function () {
 console.log('...OK');
 //Parser les fichiers
 getCollection(function () {
 console.log('OK: ' + stats.ok + ' / Error: ' + stats.error);
 process.exit();
 });
 });
});
answered Dec 2, 2014 at 9:06
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.