I use this node.js script to migrate MongoDB collection to a schema to another. It does work if the collection is <20k documents, but slows down to a crawl and sometime throw a
FATAL ERROR: JS Allocation failed - process out of memory
Is it because of a memory leak? Or the intensive call to process.nextTick()
(once per document)?
My guess is that if the mongoDB server fast enough to the save/too many documents, callback piles up and eat the CPU/RAM away and end up crashing the script.
Is my guess right? Is there a memory leak? What can I do to speed up the script/fix existing error(s)?
var fs = require('fs'),
mongoose = require('mongoose'),
db = {}, //db.read will be the connection to the collection to migrate, db.write to the collection to migrate to
S_logs = {}, //Mongoose schema (read and write)
M_logs = {}, //Mongoose models (read and write)
config, //Config file
launched = 0,//Number of document migration started
ended = 0, //Number of document migration finished
percent = 0, //Used to calculate the progress bar
stats = {
ok: 0, //Number of succesful migration
error: 0 //Number of failed migration
};
function getCollection(callback) {
console.log('|0% |10% |20% |30% |40% |50% |60% |70% |80% |90% |100%')
M_logs.read.count({}, function (err, count) {
var stream = M_logs.read.find().stream(); //Stream the entire collection, document by document
stream.on('data', function (document) {
launched = launched + 1;
//Adapt the document to the new schema
var P_log = {};
for (name in config.write.data) {
if (config.write.data.hasOwnProperty(name)) {
P_log[name] = document[name] || '';
}
}
//Clear the queue
process.nextTick(function () {
//Save the new document
new M_logs.write(P_log).save(function (err) {
//Update the progress bar
while(ended > ((percent * count)/100)) {
process.stdout.write('-');
percent = percent + 1;
}
//Update the stats
if(err) {
stats.error = stats.error + 1;
} else {
stats.ok = stats.ok + 1;
}
ended = ended + 1;
});
});
}).on('error', function (err) {
launched = launched + 1;
while(ended > ((percent * count)/100)) {
process.stdout.write('-');
percent = percent + 1;
}
ended = ended + 1;
}).on('close', function () {
process.nextTick(function (){
//Wait for all transfert to end
var wait = setInterval(function () {
if(ended === launched) {
clearInterval(wait);
console.log('\nTransfert lancé: ' + launched);
console.log('Transfert terminé: ' + ended);
callback();
}
}, 1000);
});
});
});
}
function connect(callback) {
db.read = mongoose.createConnection(config.read.url);
db.read.on('error', console.error.bind(console, 'connection error:'));
db.read.once('open', function () {
db.write = mongoose.createConnection(config.write.url);
db.write.on('error', console.error.bind(console, 'connection error:'));
db.write.once('open', function () {
S_logs.read = new mongoose.Schema(
config.read.data,
{
strict: false,
collection: config.read.base
}
);
M_logs.read = db.read.model(config.read.base, S_logs.read, config.read.base);
S_logs.write = new mongoose.Schema(
config.write.data,
{
collection: config.write.base
}
);
S_logs.write.index(config.write.index, {unique: true});
M_logs.write = db.write.model(config.write.base, S_logs.write, config.write.base);
callback();
});
});
}
process.stdout.write('Reading config');
fs.readFile(process.argv[2] || 'config.js', function (err, data) {
if (err) {
console.log('Error in config.js:' + err);
process.exit();
}
config = JSON.parse(data);
console.log('...OK');
console.log('From ' + config.read.url + ' / ' + config.read.base);
console.log('To ' + config.write.url + ' / ' + config.write.base);
process.stdout.write('Connecting to Mongo')
connect(function () {
console.log('...OK');
getCollection(function () {
console.log('OK: ' + stats.ok + ' / Error: ' + stats.error);
process.exit();
});
});
});
Exemple of config file:
{
"read":
{
"url" : "mongodb://localhost/read",
"base" : "read_collection",
"data":
{
"user_ip" : "String",
"user_id" : "String",
"user_agent" : "String",
"canal_id" : "String",
"theme_id" : "String",
"video_id" : "String",
"time" : "Number",
"action" : "String",
"is_newuser" : "String",
"operator" : "String",
"template" : "String",
"catalogue" : "String",
"referer" : "String",
"from" : "String",
"request" : "String",
"smil" : "String",
"smil_path" : "String"
}
},
"write":
{
"url" : "mongodb://localhost/write",
"base" : "write_collection",
"data":
{
"user_ip" : "String",
"user_id" : "String",
"user_agent" : "String",
"canal_id" : "String",
"theme_id" : "String",
"pays" : "String",
"lang" : "String",
"video_id" : "String",
"time" : "Number",
"action" : "String",
"is_newuser" : "String",
"operator" : "String",
"template" : "String",
"catalogue" : "String",
"referer" : "String",
"from" : "String",
"request" : "String",
"smil" : "String",
"smil_path" : "String"
},
"index" :
{
"user_ip" : 1,
"user_id" : 1,
"time" : 1
}
}
}
Looking at the CPU and RAM usage while the script try to transfer a 500k documents collection, I can see that the CPU is not really a issue (oscillating between 50% and 75% max), but RAM slowly but steadily grow. My guess is I should find a way to regulate the number of document waiting for a answer of the mongoDB server, and pause the streaming until that number as fall to a reasonably low level...
2 Answers 2
Interesting question,
It seems you are using a ton of closures. 13 out of 15 function
definitions are anonymous functions. I cant find a single one that is capturing anything useful since you have a ton of global variables on top.
I would simply declare every one of those functions as a named function, it should already help a ton. And then indeed build some queue so that you are not DOS'ing your own machine.
-
\$\begingroup\$ For separating the code, I can do that. But I don't really see how I can make the queueing. Should calculate the difference between
launched
andended
and pause the streaming if the difference is over X will be enough? Or should I search more advanced algorithm? (Or should I ask thoses question on Programmers instead of Code Review?) \$\endgroup\$DrakaSAN– DrakaSAN2014年11月26日 08:06:24 +00:00Commented Nov 26, 2014 at 8:06 -
\$\begingroup\$ Please look into functional reactive programming, there are some excellent js libraries to help you out with this dilemma. en.wikipedia.org/wiki/Functional_reactive_programming \$\endgroup\$konijn– konijn2014年11月26日 15:29:26 +00:00Commented Nov 26, 2014 at 15:29
As a way of closing this question:
I named all function, here the result:
var fs = require('fs'),
mongoose = require('mongoose'),
db = {},
S_logs = {},
M_logs = {},
config,
percent = 0;
function updateProgressBar(launched, ended, percent, count) {
// while(ended > ((percent * count)/100)) {
// process.stdout.write('-');
// percent = percent + 1;
// }
console.log(launched + ' / ' + ended + ' / ' + count);
}
function migrate(stream, count, read, write, callback) {
var launched = 0,
ended = 0,
stats = {
ok: 0,
error: 0
},
wait,
progressBar,
queue;
console.log('|0% |10% |20% |30% |40% |50% |60% |70% |80% |90% |100%');
progressBar = setInterval(function () {
updateProgressBar(launched, ended, percent, count);
}, 1000);
queue = setInterval(function () {
if(!stream.paused && (launched - ended) > 50000) {
stream.pause();
console.log('PAUSE');
} else if(stream.paused && (launched - ended) <= 25000) {
stream.resume();
console.log('RESUME');
}
}, 1000);
stream.on('data', function (document) {
launched = launched + 1;
var P_log = {};
for (name in config.write.data) {
if (config.write.data.hasOwnProperty(name)) {
P_log[name] = document[name] || '';
}
}
process.nextTick(function () {
new write(P_log).save(function (err) {
if(err) {
stats.error = stats.error + 1;
} else {
stats.ok = stats.ok + 1;
}
ended = ended + 1;
});
});
}).on('error', function (err) {
console.log('ERR: ' + err);
process.exit();
}).on('close', function () {
if(launched === count) {
console.log('CLOSE');
process.nextTick(function (){
wait = setInterval(function () {
if(ended === launched) {
clearInterval(wait);
clearInterval(progressBar);
clearInterval(queue);
console.log('\nTransfert lancé: ' + launched);
console.log('Transfert terminé: ' + ended);
callback(stats);
}
}, 1000);
});
}
});
}
function getCollection(callback) {
M_logs.read.count({}, function (err, count) {
var stream = M_logs.read.find().stream();
callback(stream, count);
});
}
function connect(callback) {
//Se connecte a mongoDB au travers de mongoose, crée les shemas et modeles mongoose neccessaire, et lance l'ecooute sur le port 8080
db.read = mongoose.createConnection(config.read.url); //Creer la connexion a mongodb
db.read.on('error', console.error.bind(console, 'connection error:'));
db.read.once('open', function () { //Une fois connecte
db.write = mongoose.createConnection(config.write.url); //Creer la connexion a mongodb
db.write.on('error', console.error.bind(console, 'connection error:'));
db.write.once('open', function () { //Une fois connecte
S_logs.read = new mongoose.Schema(
config.read.data,
{
strict: false,
collection: config.read.base
}
);
M_logs.read = db.read.model(config.read.base, S_logs.read, config.read.base);
S_logs.write = new mongoose.Schema(
config.write.data,
{
collection: config.write.base
}
);
S_logs.write.index(config.write.index, {unique: true});
M_logs.write = db.write.model(config.write.base, S_logs.write, config.write.base);
callback();
});
});
}
process.stdout.write('Reading config');
fs.readFile(process.argv[2] || 'config.js', function (err, data) {
if (err) {
console.log('Error in config.js:' + err);
process.exit();
}
// console.log('' + data);
config = JSON.parse(data);
console.log('...OK');
console.log('From ' + config.read.url + ' / ' + config.read.base);
console.log('To ' + config.write.url + ' / ' + config.write.base);
process.stdout.write('Connecting to Mongo')
//Se connecter a MongoDB
connect(function () {
console.log('...OK');
//Parser les fichiers
getCollection(function (stream, count) {
migrate(stream, count, M_logs.read, M_logs.write, function (stats) {
console.log('OK: ' + stats.ok + ' / Error: ' + stats.error);
process.exit();
});
});
});
});
But even after that, I still had to makje the queue, and had problem with QueryStream.pause()
and QueryStream.resume()
, so I dumped QueryStream
altogether (it s a internal tool, I just need something that work most of the time), the end result is far more slow, but don t have any memory problem:
var fs = require('fs'),
mongoose = require('mongoose'),
db = {},
S_logs = {},
M_logs = {},
config,
launched = 0,
ended = 0,
percent = 0,
stats = {
ok: 0,
error: 0
};
function migrate(i, count, callback) {
// console.log('migrate ' + i);
// console.log(ended + '/' + count);
var query = M_logs.read.find().limit(5000).skip(i * 5000),
wait,
end = 0;
query.exec(function (err, docs) {
docs.forEach(function (document) {
var P_log = {};
for (name in config.write.data) {
if (config.write.data.hasOwnProperty(name)) {
P_log[name] = document[name] || '';
}
}
process.nextTick(function () {
new M_logs.write(P_log).save(function (err) {
while(ended > ((percent * count)/100)) {
process.stdout.write('-');
percent = percent + 1;
}
if(err) {
stats.error = stats.error + 1;
} else {
stats.ok = stats.ok + 1;
}
ended = ended + 1;
end = end + 1;
});
});
});
wait = setInterval(function () {
if(end === docs.length) {
clearInterval(wait);
callback(i);
}
}, 1000);
});
}
function migrateAll(i, count, callback) {
if(ended < count) {
migrate(i, count, function () {
migrateAll(i + 1, count, callback);
});
} else {
callback();
}
}
function getCollection(callback) {
console.log('|0% |10% |20% |30% |40% |50% |60% |70% |80% |90% |100%')
M_logs.read.count({}, function (err, count) {
migrateAll(0, count, function() {
callback();
});
});
}
function connect(callback) {
//Se connecte a mongoDB au travers de mongoose, crée les shemas et modeles mongoose neccessaire, et lance l'ecooute sur le port 8080
db.read = mongoose.createConnection(config.read.url); //Creer la connexion a mongodb
db.read.on('error', console.error.bind(console, 'connection error:'));
db.read.once('open', function () { //Une fois connecte
db.write = mongoose.createConnection(config.write.url); //Creer la connexion a mongodb
db.write.on('error', console.error.bind(console, 'connection error:'));
db.write.once('open', function () { //Une fois connecte
S_logs.read = new mongoose.Schema(
config.read.data,
{
strict: false,
collection: config.read.base
}
);
M_logs.read = db.read.model(config.read.base, S_logs.read, config.read.base);
S_logs.write = new mongoose.Schema(
config.write.data,
{
collection: config.write.base
}
);
S_logs.write.index(config.write.index, {unique: true});
M_logs.write = db.write.model(config.write.base, S_logs.write, config.write.base);
callback();
});
});
}
process.stdout.write('Reading config');
fs.readFile(process.argv[2] || 'config.js', function (err, data) {
if (err) {
console.log('Error in config.js:' + err);
process.exit();
}
// console.log('' + data);
config = JSON.parse(data);
console.log('...OK');
console.log('From ' + config.read.url + ' / ' + config.read.base);
console.log('To ' + config.write.url + ' / ' + config.write.base);
process.stdout.write('Connecting to Mongo')
//Se connecter a MongoDB
connect(function () {
console.log('...OK');
//Parser les fichiers
getCollection(function () {
console.log('OK: ' + stats.ok + ' / Error: ' + stats.error);
process.exit();
});
});
});
Explore related questions
See similar questions with these tags.