283 lines
No EOL
12 KiB
JavaScript
283 lines
No EOL
12 KiB
JavaScript
"use strict";
|
|
|
|
// external modules
|
|
var Sequelize = require("sequelize");
|
|
var LZString = require('lz-string');
|
|
var async = require('async');
|
|
var moment = require('moment');
|
|
var childProcess = require('child_process');
|
|
var shortId = require('shortid');
|
|
|
|
// core
|
|
var config = require("../config.js");
|
|
var logger = require("../logger.js");
|
|
|
|
var dmpWorker = createDmpWorker();
|
|
var dmpCallbackCache = {};
|
|
|
|
function createDmpWorker() {
|
|
var worker = childProcess.fork("./lib/workers/dmpWorker.js", {
|
|
stdio: 'ignore'
|
|
});
|
|
if (config.debug) logger.info('dmp worker process started');
|
|
worker.on('message', function (data) {
|
|
if (!data || !data.msg || !data.cacheKey) {
|
|
return logger.error('dmp worker error: not enough data on message');
|
|
}
|
|
var cacheKey = data.cacheKey;
|
|
switch(data.msg) {
|
|
case 'error':
|
|
dmpCallbackCache[cacheKey](data.error, null);
|
|
break;
|
|
case 'check':
|
|
dmpCallbackCache[cacheKey](null, data.result);
|
|
break;
|
|
}
|
|
delete dmpCallbackCache[cacheKey];
|
|
});
|
|
worker.on('close', function (code) {
|
|
dmpWorker = null;
|
|
if (config.debug) logger.info('dmp worker process exited with code ' + code);
|
|
});
|
|
return worker;
|
|
}
|
|
|
|
function sendDmpWorker(data, callback) {
|
|
if (!dmpWorker) dmpWorker = createDmpWorker();
|
|
var cacheKey = Date.now() + '_' + shortId.generate();
|
|
dmpCallbackCache[cacheKey] = callback;
|
|
data = Object.assign(data, {
|
|
cacheKey: cacheKey
|
|
});
|
|
dmpWorker.send(data);
|
|
}
|
|
|
|
module.exports = function (sequelize, DataTypes) {
|
|
var Revision = sequelize.define("Revision", {
|
|
id: {
|
|
type: DataTypes.UUID,
|
|
primaryKey: true,
|
|
defaultValue: Sequelize.UUIDV4
|
|
},
|
|
patch: {
|
|
type: DataTypes.TEXT
|
|
},
|
|
lastContent: {
|
|
type: DataTypes.TEXT
|
|
},
|
|
content: {
|
|
type: DataTypes.TEXT
|
|
},
|
|
length: {
|
|
type: DataTypes.INTEGER
|
|
},
|
|
authorship: {
|
|
type: DataTypes.TEXT
|
|
}
|
|
}, {
|
|
classMethods: {
|
|
associate: function (models) {
|
|
Revision.belongsTo(models.Note, {
|
|
foreignKey: "noteId",
|
|
as: "note",
|
|
constraints: false
|
|
});
|
|
},
|
|
getNoteRevisions: function (note, callback) {
|
|
Revision.findAll({
|
|
where: {
|
|
noteId: note.id
|
|
},
|
|
order: '"createdAt" DESC'
|
|
}).then(function (revisions) {
|
|
var data = [];
|
|
for (var i = 0, l = revisions.length; i < l; i++) {
|
|
var revision = revisions[i];
|
|
data.push({
|
|
time: moment(revision.createdAt).valueOf(),
|
|
length: revision.length
|
|
});
|
|
}
|
|
callback(null, data);
|
|
}).catch(function (err) {
|
|
callback(err, null);
|
|
});
|
|
},
|
|
getPatchedNoteRevisionByTime: function (note, time, callback) {
|
|
// find all revisions to prepare for all possible calculation
|
|
Revision.findAll({
|
|
where: {
|
|
noteId: note.id
|
|
},
|
|
order: '"createdAt" DESC'
|
|
}).then(function (revisions) {
|
|
if (revisions.length <= 0) return callback(null, null);
|
|
// measure target revision position
|
|
Revision.count({
|
|
where: {
|
|
noteId: note.id,
|
|
createdAt: {
|
|
$gte: time
|
|
}
|
|
},
|
|
order: '"createdAt" DESC'
|
|
}).then(function (count) {
|
|
if (count <= 0) return callback(null, null);
|
|
sendDmpWorker({
|
|
msg: 'get revision',
|
|
revisions: revisions,
|
|
count: count
|
|
}, callback);
|
|
}).catch(function (err) {
|
|
return callback(err, null);
|
|
});
|
|
}).catch(function (err) {
|
|
return callback(err, null);
|
|
});
|
|
},
|
|
checkAllNotesRevision: function (callback) {
|
|
Revision.saveAllNotesRevision(function (err, notes) {
|
|
if (err) return callback(err, null);
|
|
if (!notes || notes.length <= 0) {
|
|
return callback(null, notes);
|
|
} else {
|
|
Revision.checkAllNotesRevision(callback);
|
|
}
|
|
});
|
|
},
|
|
saveAllNotesRevision: function (callback) {
|
|
sequelize.models.Note.findAll({
|
|
// query all notes that need to save for revision
|
|
where: {
|
|
$and: [
|
|
{
|
|
lastchangeAt: {
|
|
$or: {
|
|
$eq: null,
|
|
$and: {
|
|
$ne: null,
|
|
$gt: sequelize.col('createdAt')
|
|
}
|
|
}
|
|
}
|
|
},
|
|
{
|
|
savedAt: {
|
|
$or: {
|
|
$eq: null,
|
|
$lt: sequelize.col('lastchangeAt')
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}).then(function (notes) {
|
|
if (notes.length <= 0) return callback(null, notes);
|
|
var savedNotes = [];
|
|
async.each(notes, function (note, _callback) {
|
|
// revision saving policy: note not been modified for 5 mins or not save for 10 mins
|
|
if (note.lastchangeAt && note.savedAt) {
|
|
var lastchangeAt = moment(note.lastchangeAt);
|
|
var savedAt = moment(note.savedAt);
|
|
if (moment().isAfter(lastchangeAt.add(5, 'minutes'))) {
|
|
savedNotes.push(note);
|
|
Revision.saveNoteRevision(note, _callback);
|
|
} else if (lastchangeAt.isAfter(savedAt.add(10, 'minutes'))) {
|
|
savedNotes.push(note);
|
|
Revision.saveNoteRevision(note, _callback);
|
|
} else {
|
|
return _callback(null, null);
|
|
}
|
|
} else {
|
|
savedNotes.push(note);
|
|
Revision.saveNoteRevision(note, _callback);
|
|
}
|
|
}, function (err) {
|
|
if (err) return callback(err, null);
|
|
// return null when no notes need saving at this moment but have delayed tasks to be done
|
|
var result = ((savedNotes.length == 0) && (notes.length > savedNotes.length)) ? null : savedNotes;
|
|
return callback(null, result);
|
|
});
|
|
}).catch(function (err) {
|
|
return callback(err, null);
|
|
});
|
|
},
|
|
saveNoteRevision: function (note, callback) {
|
|
Revision.findAll({
|
|
where: {
|
|
noteId: note.id
|
|
},
|
|
order: '"createdAt" DESC'
|
|
}).then(function (revisions) {
|
|
if (revisions.length <= 0) {
|
|
// if no revision available
|
|
Revision.create({
|
|
noteId: note.id,
|
|
lastContent: note.content,
|
|
length: LZString.decompressFromBase64(note.content).length,
|
|
authorship: note.authorship
|
|
}).then(function (revision) {
|
|
Revision.finishSaveNoteRevision(note, revision, callback);
|
|
}).catch(function (err) {
|
|
return callback(err, null);
|
|
});
|
|
} else {
|
|
var latestRevision = revisions[0];
|
|
var lastContent = LZString.decompressFromBase64(latestRevision.content || latestRevision.lastContent);
|
|
var content = LZString.decompressFromBase64(note.content);
|
|
sendDmpWorker({
|
|
msg: 'create patch',
|
|
lastDoc: lastContent,
|
|
currDoc: content,
|
|
}, function (err, patch) {
|
|
if (err) logger.error('save note revision error', err);
|
|
if (!patch) {
|
|
// if patch is empty (means no difference) then just update the latest revision updated time
|
|
latestRevision.changed('updatedAt', true);
|
|
latestRevision.update({
|
|
updatedAt: Date.now()
|
|
}).then(function (revision) {
|
|
Revision.finishSaveNoteRevision(note, revision, callback);
|
|
}).catch(function (err) {
|
|
return callback(err, null);
|
|
});
|
|
} else {
|
|
Revision.create({
|
|
noteId: note.id,
|
|
patch: LZString.compressToBase64(patch),
|
|
content: note.content,
|
|
length: LZString.decompressFromBase64(note.content).length,
|
|
authorship: note.authorship
|
|
}).then(function (revision) {
|
|
// clear last revision content to reduce db size
|
|
latestRevision.update({
|
|
content: null
|
|
}).then(function () {
|
|
Revision.finishSaveNoteRevision(note, revision, callback);
|
|
}).catch(function (err) {
|
|
return callback(err, null);
|
|
});
|
|
}).catch(function (err) {
|
|
return callback(err, null);
|
|
});
|
|
}
|
|
});
|
|
}
|
|
}).catch(function (err) {
|
|
return callback(err, null);
|
|
});
|
|
},
|
|
finishSaveNoteRevision: function (note, revision, callback) {
|
|
note.update({
|
|
savedAt: revision.updatedAt
|
|
}).then(function () {
|
|
return callback(null, revision);
|
|
}).catch(function (err) {
|
|
return callback(err, null);
|
|
});
|
|
}
|
|
}
|
|
});
|
|
|
|
return Revision;
|
|
}; |