Revert "Add workers for notes to leverage CPU intensive work loading"

This reverts commit 4ccfdfa538.
This commit is contained in:
Wu Cheng-Han 2016-11-16 13:58:59 +08:00
parent 7adb78aba8
commit c58162a2e7
3 changed files with 91 additions and 192 deletions

View file

@ -9,7 +9,6 @@ var randomcolor = require("randomcolor");
var Chance = require('chance'), var Chance = require('chance'),
chance = new Chance(); chance = new Chance();
var moment = require('moment'); var moment = require('moment');
var childProcess = require('child_process');
//core //core
var config = require("./config.js"); var config = require("./config.js");
@ -20,9 +19,6 @@ var models = require("./models");
//ot //ot
var ot = require("./ot/index.js"); var ot = require("./ot/index.js");
// workers
var noteUpdater = require("./workers/noteUpdater");
//public //public
var realtime = { var realtime = {
io: null, io: null,
@ -83,62 +79,97 @@ function emitCheck(note) {
var users = {}; var users = {};
var notes = {}; var notes = {};
//update when the note is dirty //update when the note is dirty
var updaterIsBusy = false;
var updater = setInterval(function () { var updater = setInterval(function () {
if (updaterIsBusy) return; async.each(Object.keys(notes), function (key, callback) {
var _notes = {};
Object.keys(notes).forEach(function (key) {
var note = notes[key]; var note = notes[key];
if (!note.server || !note.server.isDirty) return; if (note.server.isDirty) {
_notes[key] = { if (config.debug) logger.info("updater found dirty note: " + key);
id: note.id, updateNote(note, function(err, _note) {
lastchangeuser: note.lastchangeuser, // handle when note already been clean up
authorship: note.authorship, if (!notes[key] || !notes[key].server) return callback(null, null);
document: note.server.document if (!_note) {
}; realtime.io.to(note.id).emit('info', {
note.server.isDirty = false; code: 404
}); });
if (Object.keys(_notes).length <= 0) return; logger.error('note not found: ', note.id);
updaterIsBusy = true;
var worker = childProcess.fork("./lib/workers/noteUpdater.js");
if (config.debug) logger.info('note updater worker process started');
worker.send({
msg: 'update note',
notes: _notes
});
worker.on('message', function (data) {
if (!data || !data.msg || !data.note) return;
var note = notes[data.note.id];
if (!note) return;
switch(data.msg) {
case 'error':
for (var i = 0, l = note.socks.length; i < l; i++) {
var sock = note.socks[i];
if (typeof sock !== 'undefined' && sock) {
setTimeout(function () {
sock.disconnect(true);
}, 0);
}
} }
break; if (err || !_note) {
case 'note not found': for (var i = 0, l = note.socks.length; i < l; i++) {
realtime.io.to(note.id).emit('info', { var sock = note.socks[i];
code: 404 if (typeof sock !== 'undefined' && sock) {
}); setTimeout(function () {
break; sock.disconnect(true);
case 'check': }, 0);
note.lastchangeuserprofile = data.note.lastchangeuserprofile; }
note.updatetime = data.note.updatetime; }
saverSleep = false; return callback(err, null);
}
note.server.isDirty = false;
note.updatetime = moment(_note.lastchangeAt).valueOf();
emitCheck(note); emitCheck(note);
break; return callback(null, null);
});
} else {
return callback(null, null);
} }
}); }, function (err) {
worker.on('close', function (code) { if (err) return logger.error('updater error', err);
updaterIsBusy = false;
if (config.debug) logger.info('note updater worker process exited with code ' + code);
}); });
}, 1000); }, 1000);
function updateNote(note, callback) {
models.Note.findOne({
where: {
id: note.id
}
}).then(function (_note) {
if (!_note) return callback(null, null);
if (note.lastchangeuser) {
if (_note.lastchangeuserId != note.lastchangeuser) {
models.User.findOne({
where: {
id: note.lastchangeuser
}
}).then(function (user) {
if (!user) return callback(null, null);
note.lastchangeuserprofile = models.User.parseProfile(user.profile);
return finishUpdateNote(note, _note, callback);
}).catch(function (err) {
logger.error(err);
return callback(err, null);
});
} else {
return finishUpdateNote(note, _note, callback);
}
} else {
note.lastchangeuserprofile = null;
return finishUpdateNote(note, _note, callback);
}
}).catch(function (err) {
logger.error(err);
return callback(err, null);
});
}
function finishUpdateNote(note, _note, callback) {
if (!note || !note.server) return callback(null, null);
var body = note.server.document;
var title = note.title = models.Note.parseNoteTitle(body);
title = LZString.compressToBase64(title);
body = LZString.compressToBase64(body);
var values = {
title: title,
content: body,
authorship: LZString.compressToBase64(JSON.stringify(note.authorship)),
lastchangeuserId: note.lastchangeuser,
lastchangeAt: Date.now()
};
_note.update(values).then(function (_note) {
saverSleep = false;
return callback(null, _note);
}).catch(function (err) {
logger.error(err);
return callback(err, null);
});
}
//clean when user not in any rooms or user not in connected list //clean when user not in any rooms or user not in connected list
var cleaner = setInterval(function () { var cleaner = setInterval(function () {
async.each(Object.keys(users), function (key, callback) { async.each(Object.keys(users), function (key, callback) {
@ -161,28 +192,16 @@ var cleaner = setInterval(function () {
}); });
}, 60000); }, 60000);
var saverSleep = false; var saverSleep = false;
var saverIsBusy = false;
// save note revision in interval // save note revision in interval
var saver = setInterval(function () { var saver = setInterval(function () {
if (saverSleep || saverIsBusy) return; if (saverSleep) return;
saverIsBusy = true; models.Revision.saveAllNotesRevision(function (err, notes) {
var worker = childProcess.fork("./lib/workers/noteRevisionSaver.js"); if (err) return logger.error('revision saver failed: ' + err);
if (config.debug) logger.info('note revision saver worker process started'); if (notes && notes.length <= 0) {
worker.send({ saverSleep = true;
msg: 'save note revision' return;
});
worker.on('message', function (data) {
if (!data || !data.msg) return;
switch(data.msg) {
case 'empty':
saverSleep = true;
break;
} }
}); });
worker.on('close', function (code) {
saverIsBusy = false;
if (config.debug) logger.info('note revision saver worker process exited with code ' + code);
});
}, 60000 * 5); }, 60000 * 5);
function getStatus(callback) { function getStatus(callback) {
@ -524,7 +543,7 @@ function disconnect(socket) {
// remove note in notes if no user inside // remove note in notes if no user inside
if (Object.keys(note.users).length <= 0) { if (Object.keys(note.users).length <= 0) {
if (note.server.isDirty) { if (note.server.isDirty) {
noteUpdater.updateNote(note, function (err, _note) { updateNote(note, function (err, _note) {
if (err) return logger.error('disconnect note failed: ' + err); if (err) return logger.error('disconnect note failed: ' + err);
// clear server before delete to avoid memory leaks // clear server before delete to avoid memory leaks
note.server.document = ""; note.server.document = "";

View file

@ -1,19 +0,0 @@
// core
var logger = require("../logger.js");
var models = require("../models");
process.on('message', function (data) {
if (!data || !data.msg || data.msg !== 'save note revision') return process.exit();
models.Revision.saveAllNotesRevision(function (err, notes) {
if (err) {
logger.error('note revision saver failed: ' + err);
return process.exit();
}
if (notes && notes.length <= 0) {
process.send({
msg: 'empty'
});
}
process.exit();
});
});

View file

@ -1,101 +0,0 @@
// external modules
var async = require('async');
var moment = require('moment');
var LZString = require('lz-string');
// core
var config = require("../config.js");
var logger = require("../logger.js");
var models = require("../models");
process.on('message', function (data) {
if (!data || !data.msg || data.msg !== 'update note' || !data.notes) return process.exit();
var notes = data.notes;
async.each(Object.keys(notes), function (key, callback) {
var note = notes[key];
if (config.debug) logger.info("note updater found dirty note: " + key);
updateNote(note, function(err, _note) {
if (!_note) {
process.send({
msg: 'note not found',
note: note
});
logger.error('note not found: ', note.id);
}
if (err || !_note) {
process.send({
msg: 'error',
note: note
});
return callback(err, null);
}
note.updatetime = moment(_note.lastchangeAt).valueOf();
process.send({
msg: 'check',
note: note
});
return callback(null, null);
});
}, function (err) {
if (err) logger.error('note updater error', err);
process.exit();
});
});
function updateNote(note, callback) {
models.Note.findOne({
where: {
id: note.id
}
}).then(function (_note) {
if (!_note) return callback(null, null);
if (note.lastchangeuser) {
if (_note.lastchangeuserId != note.lastchangeuser) {
models.User.findOne({
where: {
id: note.lastchangeuser
}
}).then(function (user) {
if (!user) return callback(null, null);
note.lastchangeuserprofile = models.User.parseProfile(user.profile);
return finishUpdateNote(note, _note, callback);
}).catch(function (err) {
logger.error(err);
return callback(err, null);
});
} else {
return finishUpdateNote(note, _note, callback);
}
} else {
note.lastchangeuserprofile = null;
return finishUpdateNote(note, _note, callback);
}
}).catch(function (err) {
logger.error(err);
return callback(err, null);
});
}
function finishUpdateNote(note, _note, callback) {
var body = note.document;
var title = note.title = models.Note.parseNoteTitle(body);
title = LZString.compressToBase64(title);
body = LZString.compressToBase64(body);
var values = {
title: title,
content: body,
authorship: LZString.compressToBase64(JSON.stringify(note.authorship)),
lastchangeuserId: note.lastchangeuser,
lastchangeAt: Date.now()
};
_note.update(values).then(function (_note) {
return callback(null, _note);
}).catch(function (err) {
logger.error(err);
return callback(err, null);
});
}
module.exports = {
updateNote: updateNote
};