Update to remove history cache to lower application coupling

This commit is contained in:
Wu Cheng-Han 2017-02-03 21:39:08 +08:00
parent a261c8e812
commit 92ad67b813
3 changed files with 64 additions and 114 deletions

6
app.js
View file

@ -26,7 +26,6 @@ var validator = require('validator');
var config = require("./lib/config.js"); var config = require("./lib/config.js");
var logger = require("./lib/logger.js"); var logger = require("./lib/logger.js");
var auth = require("./lib/auth.js"); var auth = require("./lib/auth.js");
var history = require("./lib/history.js");
var response = require("./lib/response.js"); var response = require("./lib/response.js");
var models = require("./lib/models"); var models = require("./lib/models");
@ -443,6 +442,7 @@ app.get('/logout', function (req, res) {
req.logout(); req.logout();
res.redirect(config.serverurl + '/'); res.redirect(config.serverurl + '/');
}); });
var history = require("./lib/history.js");
//get history //get history
app.get('/history', history.historyGet); app.get('/history', history.historyGet);
//post history //post history
@ -608,7 +608,7 @@ function startListen() {
// sync db then start listen // sync db then start listen
models.sequelize.sync().then(function () { models.sequelize.sync().then(function () {
// check if realtime is ready // check if realtime is ready
if (history.isReady() && realtime.isReady()) { if (realtime.isReady()) {
models.Revision.checkAllNotesRevision(function (err, notes) { models.Revision.checkAllNotesRevision(function (err, notes) {
if (err) throw new Error(err); if (err) throw new Error(err);
if (!notes || notes.length <= 0) return startListen(); if (!notes || notes.length <= 0) return startListen();
@ -639,7 +639,7 @@ function handleTermSignals() {
}, 0); }, 0);
}); });
var checkCleanTimer = setInterval(function () { var checkCleanTimer = setInterval(function () {
if (history.isReady() && realtime.isReady()) { if (realtime.isReady()) {
models.Revision.checkAllNotesRevision(function (err, notes) { models.Revision.checkAllNotesRevision(function (err, notes) {
if (err) return logger.error(err); if (err) return logger.error(err);
if (!notes || notes.length <= 0) { if (!notes || notes.length <= 0) {

View file

@ -1,7 +1,6 @@
//history //history
//external modules //external modules
var async = require('async'); var async = require('async');
var moment = require('moment');
//core //core
var config = require("./config.js"); var config = require("./config.js");
@ -14,45 +13,32 @@ var History = {
historyGet: historyGet, historyGet: historyGet,
historyPost: historyPost, historyPost: historyPost,
historyDelete: historyDelete, historyDelete: historyDelete,
isReady: isReady,
updateHistory: updateHistory updateHistory: updateHistory
}; };
var caches = {}; function getHistory(userid, callback) {
//update when the history is dirty models.User.findOne({
var updater = setInterval(function () { where: {
var deleted = []; id: userid
async.each(Object.keys(caches), function (key, callback) {
var cache = caches[key];
if (cache.isDirty) {
if (config.debug) logger.info("history updater found dirty history: " + key);
var history = parseHistoryToArray(cache.history);
cache.isDirty = false;
finishUpdateHistory(key, history, function (err, count) {
if (err) return callback(err, null);
if (!count) return callback(null, null);
cache.updateAt = Date.now();
return callback(null, null);
});
} else {
if (moment().isAfter(moment(cache.updateAt).add(5, 'minutes'))) {
deleted.push(key);
}
return callback(null, null);
} }
}, function (err) { }).then(function (user) {
if (err) return logger.error('history updater error', err); if (!user)
return callback(null, null);
var history = {};
if (user.history)
history = parseHistoryToObject(JSON.parse(user.history));
if (config.debug)
logger.info('read history success: ' + user.id);
return callback(null, history);
}).catch(function (err) {
logger.error('read history failed: ' + err);
return callback(err, null);
}); });
// delete specified caches }
for (var i = 0, l = deleted.length; i < l; i++) {
caches[deleted[i]].history = {};
delete caches[deleted[i]];
}
}, 1000);
function finishUpdateHistory(userid, history, callback) { function setHistory(userid, history, callback) {
models.User.update({ models.User.update({
history: JSON.stringify(history) history: JSON.stringify(parseHistoryToArray(history))
}, { }, {
where: { where: {
id: userid id: userid
@ -60,72 +46,27 @@ function finishUpdateHistory(userid, history, callback) {
}).then(function (count) { }).then(function (count) {
return callback(null, count); return callback(null, count);
}).catch(function (err) { }).catch(function (err) {
logger.error('set history failed: ' + err);
return callback(err, null); return callback(err, null);
}); });
} }
function isReady() { function updateHistory(userid, noteId, document, time) {
var dirtyCount = 0;
async.each(Object.keys(caches), function (key, callback) {
if (caches[key].isDirty) dirtyCount++;
return callback(null, null);
}, function (err) {
if (err) return logger.error('history ready check error', err);
});
return dirtyCount > 0 ? false : true;
}
function getHistory(userid, callback) {
if (caches[userid]) {
return callback(null, caches[userid].history);
} else {
models.User.findOne({
where: {
id: userid
}
}).then(function (user) {
if (!user)
return callback(null, null);
var history = [];
if (user.history)
history = JSON.parse(user.history);
if (config.debug)
logger.info('read history success: ' + user.id);
setHistory(userid, history);
return callback(null, history);
}).catch(function (err) {
logger.error('read history failed: ' + err);
return callback(err, null);
});
}
}
function setHistory(userid, history) {
if (Array.isArray(history)) history = parseHistoryToObject(history);
if (!caches[userid]) {
caches[userid] = {
history: {},
isDirty: false,
updateAt: Date.now()
};
}
caches[userid].history = history;
}
function updateHistory(userid, noteId, document) {
if (userid && noteId && typeof document !== 'undefined') { if (userid && noteId && typeof document !== 'undefined') {
getHistory(userid, function (err, history) { getHistory(userid, function (err, history) {
if (err || !history) return; if (err || !history) return;
if (!caches[userid].history[noteId]) { if (!history[noteId]) {
caches[userid].history[noteId] = {}; history[noteId] = {};
} }
var noteHistory = caches[userid].history[noteId]; var noteHistory = history[noteId];
var noteInfo = models.Note.parseNoteInfo(document); var noteInfo = models.Note.parseNoteInfo(document);
noteHistory.id = noteId; noteHistory.id = noteId;
noteHistory.text = noteInfo.title; noteHistory.text = noteInfo.title;
noteHistory.time = moment().valueOf(); noteHistory.time = time || Date.now();
noteHistory.tags = noteInfo.tags; noteHistory.tags = noteInfo.tags;
caches[userid].isDirty = true; setHistory(userid, history, function (err, count) {
return;
});
}); });
} }
} }
@ -175,9 +116,10 @@ function historyPost(req, res) {
return response.errorBadRequest(res); return response.errorBadRequest(res);
} }
if (Array.isArray(history)) { if (Array.isArray(history)) {
setHistory(req.user.id, history); setHistory(req.user.id, history, function (err, count) {
caches[req.user.id].isDirty = true; if (err) return response.errorInternalError(res);
res.end(); res.end();
});
} else { } else {
return response.errorBadRequest(res); return response.errorBadRequest(res);
} }
@ -186,11 +128,13 @@ function historyPost(req, res) {
getHistory(req.user.id, function (err, history) { getHistory(req.user.id, function (err, history) {
if (err) return response.errorInternalError(res); if (err) return response.errorInternalError(res);
if (!history) return response.errorNotFound(res); if (!history) return response.errorNotFound(res);
if (!caches[req.user.id].history[noteId]) return response.errorNotFound(res); if (!history[noteId]) return response.errorNotFound(res);
if (req.body.pinned === 'true' || req.body.pinned === 'false') { if (req.body.pinned === 'true' || req.body.pinned === 'false') {
caches[req.user.id].history[noteId].pinned = (req.body.pinned === 'true'); history[noteId].pinned = (req.body.pinned === 'true');
caches[req.user.id].isDirty = true; setHistory(req.user.id, history, function (err, count) {
res.end(); if (err) return response.errorInternalError(res);
res.end();
});
} else { } else {
return response.errorBadRequest(res); return response.errorBadRequest(res);
} }
@ -205,16 +149,19 @@ function historyDelete(req, res) {
if (req.isAuthenticated()) { if (req.isAuthenticated()) {
var noteId = req.params.noteId; var noteId = req.params.noteId;
if (!noteId) { if (!noteId) {
setHistory(req.user.id, []); setHistory(req.user.id, [], function (err, count) {
caches[req.user.id].isDirty = true; if (err) return response.errorInternalError(res);
res.end(); res.end();
});
} else { } else {
getHistory(req.user.id, function (err, history) { getHistory(req.user.id, function (err, history) {
if (err) return response.errorInternalError(res); if (err) return response.errorInternalError(res);
if (!history) return response.errorNotFound(res); if (!history) return response.errorNotFound(res);
delete caches[req.user.id].history[noteId]; delete history[noteId];
caches[req.user.id].isDirty = true; setHistory(req.user.id, history, function (err, count) {
res.end(); if (err) return response.errorInternalError(res);
res.end();
});
}); });
} }
} else { } else {

View file

@ -122,6 +122,12 @@ function updateNote(note, callback) {
} }
}).then(function (_note) { }).then(function (_note) {
if (!_note) return callback(null, null); if (!_note) return callback(null, null);
// update user note history
var tempUsers = Object.assign({}, note.tempUsers);
note.tempUsers = {};
Object.keys(tempUsers).forEach(function (key) {
updateHistory(key, note, tempUsers[key]);
});
if (note.lastchangeuser) { if (note.lastchangeuser) {
if (_note.lastchangeuserId != note.lastchangeuser) { if (_note.lastchangeuserId != note.lastchangeuser) {
models.User.findOne({ models.User.findOne({
@ -405,10 +411,7 @@ function finishConnection(socket, note, user) {
note.server.setColor(socket, user.color); note.server.setColor(socket, user.color);
// update user note history // update user note history
setTimeout(function () { updateHistory(user.userid, note);
var noteId = note.alias ? note.alias : LZString.compressToBase64(note.id);
if (note.server) history.updateHistory(user.userid, noteId, note.server.document);
}, 0);
emitOnlineUsers(socket); emitOnlineUsers(socket);
emitRefresh(socket); emitRefresh(socket);
@ -497,6 +500,7 @@ function startConnection(socket) {
lastchangeuserprofile: lastchangeuserprofile, lastchangeuserprofile: lastchangeuserprofile,
socks: [], socks: [],
users: {}, users: {},
tempUsers: {},
createtime: moment(createtime).valueOf(), createtime: moment(createtime).valueOf(),
updatetime: moment(updatetime).valueOf(), updatetime: moment(updatetime).valueOf(),
server: server, server: server,
@ -687,15 +691,14 @@ function operationCallback(socket, operation) {
return logger.error('operation callback failed: ' + err); return logger.error('operation callback failed: ' + err);
}); });
} }
// update user note history note.tempUsers[userId] = Date.now();
setTimeout(function() {
var noteId = note.alias ? note.alias : LZString.compressToBase64(note.id);
if (note.server) history.updateHistory(userId, noteId, note.server.document);
}, 0);
} }
// save authorship // save authorship
note.authorship = models.Note.updateAuthorshipByOperation(operation, userId, note.authorship); note.authorship = models.Note.updateAuthorshipByOperation(operation, userId, note.authorship);
function updateHistory(userId, note, time) {
var noteId = note.alias ? note.alias : LZString.compressToBase64(note.id);
if (note.server) history.updateHistory(userId, noteId, note.server.document, time);
} }
function connection(socket) { function connection(socket) {
@ -925,4 +928,4 @@ function connection(socket) {
}); });
} }
module.exports = realtime; module.exports = realtime;