diff --git a/app.js b/app.js index 5857864..31bf868 100644 --- a/app.js +++ b/app.js @@ -484,16 +484,26 @@ function startListen() { if (config.usessl) { server.listen(config.port, function () { logger.info('HTTPS Server listening at port %d', config.port); + config.maintenance = false; }); } else { server.listen(config.port, function () { logger.info('HTTP Server listening at port %d', config.port); + config.maintenance = false; }); } } // sync db then start listen -models.sequelize.sync().then(startListen); +models.sequelize.sync().then(function () { + // check if realtime is ready + if (realtime.isReady()) { + models.Revision.checkAllNotesRevision(function (err, notes) { + if (err) return new Error(err); + if (notes.length <= 0) return startListen(); + }); + } +}); // log uncaught exception process.on('uncaughtException', function (err) { @@ -510,21 +520,18 @@ process.on('SIGINT', function () { Object.keys(io.sockets.sockets).forEach(function (key) { var socket = io.sockets.sockets[key]; // notify client server going into maintenance status - socket.emit('maintenance', config.version); + socket.emit('maintenance'); socket.disconnect(true); }); var checkCleanTimer = setInterval(function () { - var usersCount = Object.keys(realtime.users).length; - var notesCount = Object.keys(realtime.notes).length; - // check if all users and notes array are empty - if (usersCount == 0 && notesCount == 0) { - // close db connection - models.sequelize.close(); - clearInterval(checkCleanTimer); - // wait for a while before exit - setTimeout(function () { - process.exit(0); - }, 100); + if (realtime.isReady()) { + models.Revision.checkAllNotesRevision(function (err, notes) { + if (err) return new Error(err); + if (notes.length <= 0) { + clearInterval(checkCleanTimer); + return process.exit(0); + } + }); } }, 100); }); \ No newline at end of file diff --git a/lib/config.js b/lib/config.js index 1ba1763..3828e2d 100644 --- a/lib/config.js +++ b/lib/config.js @@ -78,7 +78,7 @@ function getserverurl() { } var version = '0.4.2'; -var maintenance = config.maintenance || false; +var maintenance = true; var cwd = path.join(__dirname, '..'); module.exports = { diff --git a/lib/migrations/20160607060246-support-revision.js b/lib/migrations/20160607060246-support-revision.js new file mode 100644 index 0000000..9721d7f --- /dev/null +++ b/lib/migrations/20160607060246-support-revision.js @@ -0,0 +1,24 @@ +'use strict'; + +module.exports = { + up: function (queryInterface, Sequelize) { + queryInterface.addColumn('Notes', 'savedAt', Sequelize.DATE); + queryInterface.createTable('Revisions', { + id: Sequelize.UUID, + noteId: Sequelize.UUID, + patch: Sequelize.TEXT, + lastContent: Sequelize.TEXT, + content: Sequelize.TEXT, + length: Sequelize.INTEGER, + createdAt: Sequelize.DATE, + updatedAt: Sequelize.DATE + }); + return; + }, + + down: function (queryInterface, Sequelize) { + queryInterface.dropTable('Revisions'); + queryInterface.removeColumn('Notes', 'savedAt'); + return; + } +}; diff --git a/lib/models/note.js b/lib/models/note.js index 2b51c87..ace072a 100644 --- a/lib/models/note.js +++ b/lib/models/note.js @@ -52,6 +52,9 @@ module.exports = function (sequelize, DataTypes) { }, lastchangeAt: { type: DataTypes.DATE + }, + savedAt: { + type: DataTypes.DATE } }, { classMethods: { @@ -66,6 +69,10 @@ module.exports = function (sequelize, DataTypes) { as: "lastchangeuser", constraints: false }); + Note.hasMany(models.Revision, { + foreignKey: "noteId", + constraints: false + }); }, checkFileExist: function (filePath) { try { @@ -100,11 +107,15 @@ module.exports = function (sequelize, DataTypes) { var dbModifiedTime = moment(note.lastchangeAt || note.createdAt); if (fsModifiedTime.isAfter(dbModifiedTime)) { var body = fs.readFileSync(filePath, 'utf8'); - note.title = LZString.compressToBase64(Note.parseNoteTitle(body)); - note.content = LZString.compressToBase64(body); - note.lastchangeAt = fsModifiedTime; - note.save().then(function (note) { - return callback(null, note.id); + note.update({ + title: LZString.compressToBase64(Note.parseNoteTitle(body)), + content: LZString.compressToBase64(body), + lastchangeAt: fsModifiedTime + }).then(function (note) { + sequelize.models.Revision.saveNoteRevision(note, function (err, revision) { + if (err) return _callback(err, null); + return callback(null, note.id); + }); }).catch(function (err) { return _callback(err, null); }); @@ -224,6 +235,11 @@ module.exports = function (sequelize, DataTypes) { } } return callback(null, note); + }, + afterCreate: function (note, options, callback) { + sequelize.models.Revision.saveNoteRevision(note, function (err, revision) { + callback(err, note); + }); } } }); diff --git a/lib/models/revision.js b/lib/models/revision.js new file mode 100644 index 0000000..5300d72 --- /dev/null +++ b/lib/models/revision.js @@ -0,0 +1,276 @@ +"use strict"; + +// external modules +var Sequelize = require("sequelize"); +var LZString = require('lz-string'); +var async = require('async'); +var moment = require('moment'); +var DiffMatchPatch = require('diff-match-patch'); +var dmp = new DiffMatchPatch(); + +// core +var config = require("../config.js"); +var logger = require("../logger.js"); + +module.exports = function (sequelize, DataTypes) { + var Revision = sequelize.define("Revision", { + id: { + type: DataTypes.UUID, + primaryKey: true, + defaultValue: Sequelize.UUIDV4 + }, + patch: { + type: DataTypes.TEXT + }, + lastContent: { + type: DataTypes.TEXT + }, + content: { + type: DataTypes.TEXT + }, + length: { + type: DataTypes.INTEGER + } + }, { + classMethods: { + associate: function (models) { + Revision.belongsTo(models.User, { + foreignKey: "noteId", + as: "note", + constraints: false + }); + }, + createPatch: function (lastDoc, CurrDoc) { + var ms_start = (new Date()).getTime(); + var diff = dmp.diff_main(lastDoc, CurrDoc); + dmp.diff_cleanupSemantic(diff); + var patch = dmp.patch_make(lastDoc, diff); + patch = dmp.patch_toText(patch); + var ms_end = (new Date()).getTime(); + if (config.debug) { + logger.info(patch); + logger.info((ms_end - ms_start) + 'ms'); + } + return patch; + }, + getNoteRevisions: function (note, callback) { + Revision.findAll({ + where: { + noteId: note.id + }, + order: '"createdAt" DESC' + }).then(function (revisions) { + var data = []; + for (var i = 0, l = revisions.length; i < l; i++) { + var revision = revisions[i]; + data.push({ + time: moment(revision.createdAt).valueOf(), + length: revision.length + }); + } + callback(null, data); + }).catch(function (err) { + callback(err, null); + }); + }, + getPatchedNoteRevisionByTime: function (note, time, callback) { + // find all revisions to prepare for all possible calculation + Revision.findAll({ + where: { + noteId: note.id + }, + order: '"createdAt" DESC' + }).then(function (revisions) { + if (revisions.length <= 0) return callback(null, null); + // measure target revision position + Revision.count({ + where: { + noteId: note.id, + createdAt: { + $gte: time + } + }, + order: '"createdAt" DESC' + }).then(function (count) { + if (count <= 0) return callback(null, null); + var ms_start = (new Date()).getTime(); + var startContent = null; + var lastPatch = []; + var applyPatches = []; + if (count <= Math.round(revisions.length / 2)) { + // start from top to target + for (var i = 0; i < count; i++) { + var revision = revisions[i]; + if (i == 0) { + startContent = LZString.decompressFromBase64(revision.content || revision.lastContent); + } + if (i != count - 1) { + var patch = dmp.patch_fromText(LZString.decompressFromBase64(revision.patch)); + applyPatches = applyPatches.concat(patch); + } + lastPatch = revision.patch; + } + // swap DIFF_INSERT and DIFF_DELETE to achieve unpatching + for (var i = 0, l = applyPatches.length; i < l; i++) { + for (var j = 0, m = applyPatches[i].diffs.length; j < m; j++) { + var diff = applyPatches[i].diffs[j]; + if (diff[0] == DiffMatchPatch.DIFF_INSERT) + diff[0] = DiffMatchPatch.DIFF_DELETE; + else if (diff[0] == DiffMatchPatch.DIFF_DELETE) + diff[0] = DiffMatchPatch.DIFF_INSERT; + } + } + } else { + // start from bottom to target + var l = revisions.length - 1; + for (var i = l; i >= count - 1; i--) { + var revision = revisions[i]; + if (i == l) { + startContent = LZString.decompressFromBase64(revision.lastContent); + } + if (revision.patch) { + var patch = dmp.patch_fromText(LZString.decompressFromBase64(revision.patch)); + applyPatches = applyPatches.concat(patch); + } + lastPatch = revision.patch; + } + } + try { + var finalContent = dmp.patch_apply(applyPatches, startContent)[0]; + } catch (err) { + return callback(err, null); + } + var data = { + content: finalContent, + patch: dmp.patch_fromText(LZString.decompressFromBase64(lastPatch)) + }; + var ms_end = (new Date()).getTime(); + if (config.debug) { + logger.info((ms_end - ms_start) + 'ms'); + } + return callback(null, data); + }).catch(function (err) { + return callback(err, null); + }); + }).catch(function (err) { + return callback(err, null); + }); + }, + checkAllNotesRevision: function (callback) { + Revision.saveAllNotesRevision(function (err, notes) { + if (err) return callback(err, null); + if (notes.length <= 0) { + return callback(null, notes); + } else { + Revision.checkAllNotesRevision(callback); + } + }); + }, + saveAllNotesRevision: function (callback) { + sequelize.models.Note.findAll({ + where: { + $and: [ + { + lastchangeAt: { + $or: { + $eq: null, + $and: { + $ne: null, + $gt: sequelize.col('createdAt') + } + } + } + }, + { + savedAt: { + $or: { + $eq: null, + $lt: sequelize.col('lastchangeAt') + } + } + } + ] + } + }).then(function (notes) { + if (notes.length <= 0) return callback(null, notes); + async.each(notes, function (note, _callback) { + Revision.saveNoteRevision(note, _callback); + }, function (err) { + if (err) return callback(err, null); + return callback(null, notes); + }); + }).catch(function (err) { + return callback(err, null); + }); + }, + saveNoteRevision: function (note, callback) { + Revision.findAll({ + where: { + noteId: note.id + }, + order: '"createdAt" DESC' + }).then(function (revisions) { + if (revisions.length <= 0) { + // if no revision available + Revision.create({ + noteId: note.id, + lastContent: note.content, + length: LZString.decompressFromBase64(note.content).length + }).then(function (revision) { + Revision.finishSaveNoteRevision(note, revision, callback); + }).catch(function (err) { + return callback(err, null); + }); + } else { + var latestRevision = revisions[0]; + var lastContent = LZString.decompressFromBase64(latestRevision.content || latestRevision.lastContent); + var content = LZString.decompressFromBase64(note.content); + var patch = Revision.createPatch(lastContent, content); + if (!patch) { + // if patch is empty (means no difference) then just update the latest revision updated time + latestRevision.changed('updatedAt', true); + latestRevision.update({ + updatedAt: Date.now() + }).then(function (revision) { + Revision.finishSaveNoteRevision(note, revision, callback); + }).catch(function (err) { + return callback(err, null); + }); + } else { + Revision.create({ + noteId: note.id, + patch: LZString.compressToBase64(patch), + content: note.content, + length: LZString.decompressFromBase64(note.content).length + }).then(function (revision) { + // clear last revision content to reduce db size + latestRevision.update({ + content: null + }).then(function () { + Revision.finishSaveNoteRevision(note, revision, callback); + }).catch(function (err) { + return callback(err, null); + }); + }).catch(function (err) { + return callback(err, null); + }); + } + } + }).catch(function (err) { + return callback(err, null); + }); + }, + finishSaveNoteRevision: function (note, revision, callback) { + note.update({ + savedAt: revision.updatedAt + }).then(function () { + return callback(null, revision); + }).catch(function (err) { + return callback(err, null); + }); + } + } + }); + + return Revision; +}; \ No newline at end of file diff --git a/lib/realtime.js b/lib/realtime.js index 1d14270..0edf647 100644 --- a/lib/realtime.js +++ b/lib/realtime.js @@ -26,8 +26,7 @@ var realtime = { secure: secure, connection: connection, getStatus: getStatus, - users: users, - notes: notes + isReady: isReady }; function onAuthorizeSuccess(data, accept) { @@ -72,9 +71,8 @@ function emitCheck(note) { } //actions -var users, notes; -realtime.users = users = {}; -realtime.notes = notes = {}; +var users = {}; +var notes = {}; //update when the note is dirty var updater = setInterval(function () { async.each(Object.keys(notes), function (key, callback) { @@ -152,6 +150,7 @@ function finishUpdateNote(note, _note, callback) { lastchangeAt: Date.now() }; _note.update(values).then(function (_note) { + saverSleep = false; return callback(null, _note); }).catch(function (err) { logger.error(err); @@ -179,6 +178,18 @@ var cleaner = setInterval(function () { if (err) return logger.error('cleaner error', err); }); }, 60000); +var saverSleep = true; +// save note revision in interval +var saver = setInterval(function () { + if (saverSleep) return; + models.Revision.saveAllNotesRevision(function (err, notes) { + if (err) return logger.error('revision saver failed: ' + err); + if (notes.length <= 0) { + saverSleep = true; + return; + } + }); +}, 60000 * 5); function getStatus(callback) { models.Note.count().then(function (notecount) { @@ -233,6 +244,13 @@ function getStatus(callback) { }); } +function isReady() { + return realtime.io + && Object.keys(notes).length == 0 && Object.keys(users).length == 0 + && connectionSocketQueue.length == 0 && !isConnectionBusy + && disconnectSocketQueue.length == 0 && !isDisconnectBusy; +} + function extractNoteIdFromSocket(socket) { if (!socket || !socket.handshake || !socket.handshake.headers) { return false; diff --git a/package.json b/package.json index ca7c0d1..6b9e5ea 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,7 @@ "cookie": "0.2.3", "cookie-parser": "1.4.1", "ejs": "^2.4.1", + "diff-match-patch": "^1.0.0", "emojify.js": "^1.1.0", "express": ">=4.13", "express-session": "^1.13.0",