Add dmp worker to leverage CPU intensive calculation to child process

This commit is contained in:
Wu Cheng-Han 2016-11-18 12:09:58 +08:00
parent 55ae64035b
commit c671d54d67
2 changed files with 221 additions and 102 deletions

View file

@ -5,13 +5,53 @@ var Sequelize = require("sequelize");
var LZString = require('lz-string'); var LZString = require('lz-string');
var async = require('async'); var async = require('async');
var moment = require('moment'); var moment = require('moment');
var DiffMatchPatch = require('diff-match-patch'); var childProcess = require('child_process');
var dmp = new DiffMatchPatch(); var shortId = require('shortid');
// core // core
var config = require("../config.js"); var config = require("../config.js");
var logger = require("../logger.js"); var logger = require("../logger.js");
var dmpWorker = createDmpWorker();
var dmpCallbackCache = {};
function createDmpWorker() {
var worker = childProcess.fork("./lib/workers/dmpWorker.js", {
stdio: 'ignore'
});
if (config.debug) logger.info('dmp worker process started');
worker.on('message', function (data) {
if (!data || !data.msg || !data.cacheKey) {
return logger.error('dmp worker error: not enough data on message');
}
var cacheKey = data.cacheKey;
switch(data.msg) {
case 'error':
dmpCallbackCache[cacheKey](data.error, null);
break;
case 'check':
dmpCallbackCache[cacheKey](null, data.result);
break;
}
delete dmpCallbackCache[cacheKey];
});
worker.on('close', function (code) {
dmpWorker = null;
if (config.debug) logger.info('dmp worker process exited with code ' + code);
});
return worker;
}
function sendDmpWorker(data, callback) {
if (!dmpWorker) dmpWorker = createDmpWorker();
var cacheKey = Date.now() + '_' + shortId.generate();
dmpCallbackCache[cacheKey] = callback;
data = Object.assign(data, {
cacheKey: cacheKey
});
dmpWorker.send(data);
}
module.exports = function (sequelize, DataTypes) { module.exports = function (sequelize, DataTypes) {
var Revision = sequelize.define("Revision", { var Revision = sequelize.define("Revision", {
id: { id: {
@ -43,19 +83,6 @@ module.exports = function (sequelize, DataTypes) {
constraints: false constraints: false
}); });
}, },
createPatch: function (lastDoc, CurrDoc) {
var ms_start = (new Date()).getTime();
var diff = dmp.diff_main(lastDoc, CurrDoc);
dmp.diff_cleanupSemantic(diff);
var patch = dmp.patch_make(lastDoc, diff);
patch = dmp.patch_toText(patch);
var ms_end = (new Date()).getTime();
if (config.debug) {
logger.info(patch);
logger.info((ms_end - ms_start) + 'ms');
}
return patch;
},
getNoteRevisions: function (note, callback) { getNoteRevisions: function (note, callback) {
Revision.findAll({ Revision.findAll({
where: { where: {
@ -96,67 +123,11 @@ module.exports = function (sequelize, DataTypes) {
order: '"createdAt" DESC' order: '"createdAt" DESC'
}).then(function (count) { }).then(function (count) {
if (count <= 0) return callback(null, null); if (count <= 0) return callback(null, null);
var ms_start = (new Date()).getTime(); sendDmpWorker({
var startContent = null; msg: 'get revision',
var lastPatch = []; revisions: revisions,
var applyPatches = []; count: count
var authorship = []; }, callback);
if (count <= Math.round(revisions.length / 2)) {
// start from top to target
for (var i = 0; i < count; i++) {
var revision = revisions[i];
if (i == 0) {
startContent = LZString.decompressFromBase64(revision.content || revision.lastContent);
}
if (i != count - 1) {
var patch = dmp.patch_fromText(LZString.decompressFromBase64(revision.patch));
applyPatches = applyPatches.concat(patch);
}
lastPatch = revision.patch;
authorship = revision.authorship;
}
// swap DIFF_INSERT and DIFF_DELETE to achieve unpatching
for (var i = 0, l = applyPatches.length; i < l; i++) {
for (var j = 0, m = applyPatches[i].diffs.length; j < m; j++) {
var diff = applyPatches[i].diffs[j];
if (diff[0] == DiffMatchPatch.DIFF_INSERT)
diff[0] = DiffMatchPatch.DIFF_DELETE;
else if (diff[0] == DiffMatchPatch.DIFF_DELETE)
diff[0] = DiffMatchPatch.DIFF_INSERT;
}
}
} else {
// start from bottom to target
var l = revisions.length - 1;
for (var i = l; i >= count - 1; i--) {
var revision = revisions[i];
if (i == l) {
startContent = LZString.decompressFromBase64(revision.lastContent);
authorship = revision.authorship;
}
if (revision.patch) {
var patch = dmp.patch_fromText(LZString.decompressFromBase64(revision.patch));
applyPatches = applyPatches.concat(patch);
}
lastPatch = revision.patch;
authorship = revision.authorship;
}
}
try {
var finalContent = dmp.patch_apply(applyPatches, startContent)[0];
} catch (err) {
return callback(err, null);
}
var data = {
content: finalContent,
patch: dmp.patch_fromText(LZString.decompressFromBase64(lastPatch)),
authorship: authorship ? JSON.parse(LZString.decompressFromBase64(authorship)) : null
};
var ms_end = (new Date()).getTime();
if (config.debug) {
logger.info((ms_end - ms_start) + 'ms');
}
return callback(null, data);
}).catch(function (err) { }).catch(function (err) {
return callback(err, null); return callback(err, null);
}); });
@ -254,37 +225,43 @@ module.exports = function (sequelize, DataTypes) {
var latestRevision = revisions[0]; var latestRevision = revisions[0];
var lastContent = LZString.decompressFromBase64(latestRevision.content || latestRevision.lastContent); var lastContent = LZString.decompressFromBase64(latestRevision.content || latestRevision.lastContent);
var content = LZString.decompressFromBase64(note.content); var content = LZString.decompressFromBase64(note.content);
var patch = Revision.createPatch(lastContent, content); sendDmpWorker({
if (!patch) { msg: 'create patch',
// if patch is empty (means no difference) then just update the latest revision updated time lastDoc: lastContent,
latestRevision.changed('updatedAt', true); currDoc: content,
latestRevision.update({ }, function (err, patch) {
updatedAt: Date.now() if (err) logger.error('save note revision error', err);
}).then(function (revision) { if (!patch) {
Revision.finishSaveNoteRevision(note, revision, callback); // if patch is empty (means no difference) then just update the latest revision updated time
}).catch(function (err) { latestRevision.changed('updatedAt', true);
return callback(err, null);
});
} else {
Revision.create({
noteId: note.id,
patch: LZString.compressToBase64(patch),
content: note.content,
length: LZString.decompressFromBase64(note.content).length,
authorship: note.authorship
}).then(function (revision) {
// clear last revision content to reduce db size
latestRevision.update({ latestRevision.update({
content: null updatedAt: Date.now()
}).then(function () { }).then(function (revision) {
Revision.finishSaveNoteRevision(note, revision, callback); Revision.finishSaveNoteRevision(note, revision, callback);
}).catch(function (err) { }).catch(function (err) {
return callback(err, null); return callback(err, null);
}); });
}).catch(function (err) { } else {
return callback(err, null); Revision.create({
}); noteId: note.id,
} patch: LZString.compressToBase64(patch),
content: note.content,
length: LZString.decompressFromBase64(note.content).length,
authorship: note.authorship
}).then(function (revision) {
// clear last revision content to reduce db size
latestRevision.update({
content: null
}).then(function () {
Revision.finishSaveNoteRevision(note, revision, callback);
}).catch(function (err) {
return callback(err, null);
});
}).catch(function (err) {
return callback(err, null);
});
}
});
} }
}).catch(function (err) { }).catch(function (err) {
return callback(err, null); return callback(err, null);

142
lib/workers/dmpWorker.js Normal file
View file

@ -0,0 +1,142 @@
// external modules
var LZString = require('lz-string');
var DiffMatchPatch = require('diff-match-patch');
var dmp = new DiffMatchPatch();
// core
var config = require("../config.js");
var logger = require("../logger.js");
process.on('message', function(data) {
if (!data || !data.msg || !data.cacheKey) {
return logger.error('dmp worker error: not enough data');
}
switch (data.msg) {
case 'create patch':
if (!data.hasOwnProperty('lastDoc') || !data.hasOwnProperty('currDoc')) {
return logger.error('dmp worker error: not enough data on create patch');
}
try {
var patch = createPatch(data.lastDoc, data.currDoc);
process.send({
msg: 'check',
result: patch,
cacheKey: data.cacheKey
});
} catch (err) {
logger.error('dmp worker error', err);
process.send({
msg: 'error',
error: err,
cacheKey: data.cacheKey
});
}
break;
case 'get revision':
if (!data.hasOwnProperty('revisions') || !data.hasOwnProperty('count')) {
return logger.error('dmp worker error: not enough data on get revision');
}
try {
var result = getRevision(data.revisions, data.count);
process.send({
msg: 'check',
result: result,
cacheKey: data.cacheKey
});
} catch (err) {
logger.error('dmp worker error', err);
process.send({
msg: 'error',
error: err,
cacheKey: data.cacheKey
});
}
break;
}
});
function createPatch(lastDoc, currDoc) {
var ms_start = (new Date()).getTime();
var diff = dmp.diff_main(lastDoc, currDoc);
dmp.diff_cleanupSemantic(diff);
var patch = dmp.patch_make(lastDoc, diff);
patch = dmp.patch_toText(patch);
var ms_end = (new Date()).getTime();
if (config.debug) {
logger.info(patch);
logger.info((ms_end - ms_start) + 'ms');
}
return patch;
}
function getRevision(revisions, count) {
var ms_start = (new Date()).getTime();
var startContent = null;
var lastPatch = [];
var applyPatches = [];
var authorship = [];
if (count <= Math.round(revisions.length / 2)) {
// start from top to target
for (var i = 0; i < count; i++) {
var revision = revisions[i];
if (i == 0) {
startContent = LZString.decompressFromBase64(revision.content || revision.lastContent);
}
if (i != count - 1) {
var patch = dmp.patch_fromText(LZString.decompressFromBase64(revision.patch));
applyPatches = applyPatches.concat(patch);
}
lastPatch = revision.patch;
authorship = revision.authorship;
}
// swap DIFF_INSERT and DIFF_DELETE to achieve unpatching
for (var i = 0, l = applyPatches.length; i < l; i++) {
for (var j = 0, m = applyPatches[i].diffs.length; j < m; j++) {
var diff = applyPatches[i].diffs[j];
if (diff[0] == DiffMatchPatch.DIFF_INSERT)
diff[0] = DiffMatchPatch.DIFF_DELETE;
else if (diff[0] == DiffMatchPatch.DIFF_DELETE)
diff[0] = DiffMatchPatch.DIFF_INSERT;
}
}
} else {
// start from bottom to target
var l = revisions.length - 1;
for (var i = l; i >= count - 1; i--) {
var revision = revisions[i];
if (i == l) {
startContent = LZString.decompressFromBase64(revision.lastContent);
authorship = revision.authorship;
}
if (revision.patch) {
var patch = dmp.patch_fromText(LZString.decompressFromBase64(revision.patch));
applyPatches = applyPatches.concat(patch);
}
lastPatch = revision.patch;
authorship = revision.authorship;
}
}
try {
var finalContent = dmp.patch_apply(applyPatches, startContent)[0];
} catch (err) {
throw new Error(err);
}
var data = {
content: finalContent,
patch: dmp.patch_fromText(LZString.decompressFromBase64(lastPatch)),
authorship: authorship ? JSON.parse(LZString.decompressFromBase64(authorship)) : null
};
var ms_end = (new Date()).getTime();
if (config.debug) {
logger.info((ms_end - ms_start) + 'ms');
}
return data;
}
// log uncaught exception
process.on('uncaughtException', function (err) {
logger.error('An uncaught exception has occured.');
logger.error(err);
logger.error('Process will exit now.');
process.exit(1);
});