Add workers for notes to leverage CPU intensive work loading

This commit is contained in:
Wu Cheng-Han 2016-11-07 21:30:40 +08:00
parent 793aef0e2e
commit 4ccfdfa538
3 changed files with 191 additions and 90 deletions

View file

@ -9,6 +9,7 @@ var randomcolor = require("randomcolor");
var Chance = require('chance'),
chance = new Chance();
var moment = require('moment');
var childProcess = require('child_process');
//core
var config = require("./config.js");
@ -19,6 +20,9 @@ var models = require("./models");
//ot
var ot = require("./ot/index.js");
// workers
var noteUpdater = require("./workers/noteUpdater");
//public
var realtime = {
io: null,
@ -79,97 +83,62 @@ function emitCheck(note) {
var users = {};
var notes = {};
//update when the note is dirty
var updaterIsBusy = false;
var updater = setInterval(function () {
async.each(Object.keys(notes), function (key, callback) {
if (updaterIsBusy) return;
var _notes = {};
Object.keys(notes).forEach(function (key) {
var note = notes[key];
if (note.server.isDirty) {
if (config.debug) logger.info("updater found dirty note: " + key);
updateNote(note, function(err, _note) {
// handle when note already been clean up
if (!notes[key] || !notes[key].server) return callback(null, null);
if (!_note) {
realtime.io.to(note.id).emit('info', {
code: 404
});
logger.error('note not found: ', note.id);
}
if (err || !_note) {
for (var i = 0, l = note.socks.length; i < l; i++) {
var sock = note.socks[i];
if (typeof sock !== 'undefined' && sock) {
setTimeout(function () {
sock.disconnect(true);
}, 0);
}
if (!note.server || !note.server.isDirty) return;
_notes[key] = {
id: note.id,
lastchangeuser: note.lastchangeuser,
authorship: note.authorship,
document: note.server.document
};
note.server.isDirty = false;
});
if (Object.keys(_notes).length <= 0) return;
updaterIsBusy = true;
var worker = childProcess.fork("./lib/workers/noteUpdater.js");
if (config.debug) logger.info('note updater worker process started');
worker.send({
msg: 'update note',
notes: _notes
});
worker.on('message', function (data) {
if (!data || !data.msg || !data.note) return;
var note = notes[data.note.id];
if (!note) return;
switch(data.msg) {
case 'error':
for (var i = 0, l = note.socks.length; i < l; i++) {
var sock = note.socks[i];
if (typeof sock !== 'undefined' && sock) {
setTimeout(function () {
sock.disconnect(true);
}, 0);
}
return callback(err, null);
}
note.server.isDirty = false;
note.updatetime = moment(_note.lastchangeAt).valueOf();
break;
case 'note not found':
realtime.io.to(note.id).emit('info', {
code: 404
});
break;
case 'check':
note.lastchangeuserprofile = data.note.lastchangeuserprofile;
note.updatetime = data.note.updatetime;
saverSleep = false;
emitCheck(note);
return callback(null, null);
});
} else {
return callback(null, null);
break;
}
}, function (err) {
if (err) return logger.error('updater error', err);
});
worker.on('close', function (code) {
updaterIsBusy = false;
if (config.debug) logger.info('note updater worker process exited with code ' + code);
});
}, 1000);
function updateNote(note, callback) {
models.Note.findOne({
where: {
id: note.id
}
}).then(function (_note) {
if (!_note) return callback(null, null);
if (note.lastchangeuser) {
if (_note.lastchangeuserId != note.lastchangeuser) {
models.User.findOne({
where: {
id: note.lastchangeuser
}
}).then(function (user) {
if (!user) return callback(null, null);
note.lastchangeuserprofile = models.User.parseProfile(user.profile);
return finishUpdateNote(note, _note, callback);
}).catch(function (err) {
logger.error(err);
return callback(err, null);
});
} else {
return finishUpdateNote(note, _note, callback);
}
} else {
note.lastchangeuserprofile = null;
return finishUpdateNote(note, _note, callback);
}
}).catch(function (err) {
logger.error(err);
return callback(err, null);
});
}
function finishUpdateNote(note, _note, callback) {
if (!note || !note.server) return callback(null, null);
var body = note.server.document;
var title = note.title = models.Note.parseNoteTitle(body);
title = LZString.compressToBase64(title);
body = LZString.compressToBase64(body);
var values = {
title: title,
content: body,
authorship: LZString.compressToBase64(JSON.stringify(note.authorship)),
lastchangeuserId: note.lastchangeuser,
lastchangeAt: Date.now()
};
_note.update(values).then(function (_note) {
saverSleep = false;
return callback(null, _note);
}).catch(function (err) {
logger.error(err);
return callback(err, null);
});
}
//clean when user not in any rooms or user not in connected list
var cleaner = setInterval(function () {
async.each(Object.keys(users), function (key, callback) {
@ -192,16 +161,28 @@ var cleaner = setInterval(function () {
});
}, 60000);
var saverSleep = false;
var saverIsBusy = false;
// save note revision in interval
var saver = setInterval(function () {
if (saverSleep) return;
models.Revision.saveAllNotesRevision(function (err, notes) {
if (err) return logger.error('revision saver failed: ' + err);
if (notes && notes.length <= 0) {
saverSleep = true;
return;
if (saverSleep || saverIsBusy) return;
saverIsBusy = true;
var worker = childProcess.fork("./lib/workers/noteRevisionSaver.js");
if (config.debug) logger.info('note revision saver worker process started');
worker.send({
msg: 'save note revision'
});
worker.on('message', function (data) {
if (!data || !data.msg) return;
switch(data.msg) {
case 'empty':
saverSleep = true;
break;
}
});
worker.on('close', function (code) {
saverIsBusy = false;
if (config.debug) logger.info('note revision saver worker process exited with code ' + code);
});
}, 60000 * 5);
function getStatus(callback) {
@ -543,7 +524,7 @@ function disconnect(socket) {
// remove note in notes if no user inside
if (Object.keys(note.users).length <= 0) {
if (note.server.isDirty) {
updateNote(note, function (err, _note) {
noteUpdater.updateNote(note, function (err, _note) {
if (err) return logger.error('disconnect note failed: ' + err);
// clear server before delete to avoid memory leaks
note.server.document = "";

View file

@ -0,0 +1,19 @@
// core
var logger = require("../logger.js");
var models = require("../models");
process.on('message', function (data) {
if (!data || !data.msg || data.msg !== 'save note revision') return process.exit();
models.Revision.saveAllNotesRevision(function (err, notes) {
if (err) {
logger.error('note revision saver failed: ' + err);
return process.exit();
}
if (notes && notes.length <= 0) {
process.send({
msg: 'empty'
});
}
process.exit();
});
});

101
lib/workers/noteUpdater.js Normal file
View file

@ -0,0 +1,101 @@
// external modules
var async = require('async');
var moment = require('moment');
var LZString = require('lz-string');
// core
var config = require("../config.js");
var logger = require("../logger.js");
var models = require("../models");
process.on('message', function (data) {
if (!data || !data.msg || data.msg !== 'update note' || !data.notes) return process.exit();
var notes = data.notes;
async.each(Object.keys(notes), function (key, callback) {
var note = notes[key];
if (config.debug) logger.info("note updater found dirty note: " + key);
updateNote(note, function(err, _note) {
if (!_note) {
process.send({
msg: 'note not found',
note: note
});
logger.error('note not found: ', note.id);
}
if (err || !_note) {
process.send({
msg: 'error',
note: note
});
return callback(err, null);
}
note.updatetime = moment(_note.lastchangeAt).valueOf();
process.send({
msg: 'check',
note: note
});
return callback(null, null);
});
}, function (err) {
if (err) logger.error('note updater error', err);
process.exit();
});
});
function updateNote(note, callback) {
models.Note.findOne({
where: {
id: note.id
}
}).then(function (_note) {
if (!_note) return callback(null, null);
if (note.lastchangeuser) {
if (_note.lastchangeuserId != note.lastchangeuser) {
models.User.findOne({
where: {
id: note.lastchangeuser
}
}).then(function (user) {
if (!user) return callback(null, null);
note.lastchangeuserprofile = models.User.parseProfile(user.profile);
return finishUpdateNote(note, _note, callback);
}).catch(function (err) {
logger.error(err);
return callback(err, null);
});
} else {
return finishUpdateNote(note, _note, callback);
}
} else {
note.lastchangeuserprofile = null;
return finishUpdateNote(note, _note, callback);
}
}).catch(function (err) {
logger.error(err);
return callback(err, null);
});
}
function finishUpdateNote(note, _note, callback) {
var body = note.document;
var title = note.title = models.Note.parseNoteTitle(body);
title = LZString.compressToBase64(title);
body = LZString.compressToBase64(body);
var values = {
title: title,
content: body,
authorship: LZString.compressToBase64(JSON.stringify(note.authorship)),
lastchangeuserId: note.lastchangeuser,
lastchangeAt: Date.now()
};
_note.update(values).then(function (_note) {
return callback(null, _note);
}).catch(function (err) {
logger.error(err);
return callback(err, null);
});
}
module.exports = {
updateNote: updateNote
};