From b5920fbbd1ceb595456da18f7d458b63d1a960bf Mon Sep 17 00:00:00 2001 From: Wu Cheng-Han Date: Mon, 7 Nov 2016 21:30:53 +0800 Subject: [PATCH] Add workers for history to leverage CPU intensive work loading --- lib/history.js | 81 +++++++++++++++-------------------- lib/workers/historyUpdater.js | 66 ++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 47 deletions(-) create mode 100644 lib/workers/historyUpdater.js diff --git a/lib/history.js b/lib/history.js index 4a3bbe1..bdc922d 100644 --- a/lib/history.js +++ b/lib/history.js @@ -2,6 +2,7 @@ //external modules var async = require('async'); var moment = require('moment'); +var childProcess = require('child_process'); //core var config = require("./config.js"); @@ -9,6 +10,9 @@ var logger = require("./logger.js"); var response = require("./response.js"); var models = require("./models"); +// workers +var historyUpdater = require("./workers/historyUpdater"); + //public var History = { historyGet: historyGet, @@ -20,49 +24,50 @@ var History = { var caches = {}; //update when the history is dirty +var updaterIsBusy = false; var updater = setInterval(function () { + if (updaterIsBusy) return; var deleted = []; - async.each(Object.keys(caches), function (key, callback) { + var _caches = {}; + Object.keys(caches).forEach(function (key) { var cache = caches[key]; if (cache.isDirty) { - if (config.debug) logger.info("history updater found dirty history: " + key); - var history = parseHistoryToArray(cache.history); - finishUpdateHistory(key, history, function (err, count) { - if (err) return callback(err, null); - if (!count) return callback(null, null); - cache.isDirty = false; - cache.updateAt = Date.now(); - return callback(null, null); - }); + _caches[key] = cache.history; + cache.isDirty = false; } else { if (moment().isAfter(moment(cache.updateAt).add(5, 'minutes'))) { deleted.push(key); } - return callback(null, null); } - }, function (err) { - if (err) return logger.error('history updater error', err); }); // delete specified caches for (var i = 0, l = deleted.length; i < l; i++) { caches[deleted[i]].history = {}; delete caches[deleted[i]]; } -}, 1000); - -function finishUpdateHistory(userid, history, callback) { - models.User.update({ - history: JSON.stringify(history) - }, { - where: { - id: userid - } - }).then(function (count) { - return callback(null, count); - }).catch(function (err) { - return callback(err, null); + if (Object.keys(_caches).length <= 0) return; + updaterIsBusy = true; + var worker = childProcess.fork("./lib/workers/historyUpdater.js"); + if (config.debug) logger.info('history updater worker process started'); + worker.send({ + msg: 'update history', + caches: _caches }); -} + worker.on('message', function (data) { + if (!data || !data.msg || !data.userid) return; + var cache = caches[data.userid]; + if (!cache) return; + switch(data.msg) { + case 'check': + cache.updateAt = Date.now(); + break; + } + }); + worker.on('close', function (code) { + updaterIsBusy = false; + if (config.debug) logger.info('history updater worker process exited with code ' + code); + }); +}, 1000); function isReady() { var dirtyCount = 0; @@ -101,7 +106,7 @@ function getHistory(userid, callback) { } function setHistory(userid, history) { - if (Array.isArray(history)) history = parseHistoryToObject(history); + if (Array.isArray(history)) history = historyUpdater.parseHistoryToObject(history); if (!caches[userid]) { caches[userid] = { history: {}, @@ -130,31 +135,13 @@ function updateHistory(userid, noteId, document) { } } -function parseHistoryToArray(history) { - var _history = []; - Object.keys(history).forEach(function (key) { - var item = history[key]; - _history.push(item); - }); - return _history; -} - -function parseHistoryToObject(history) { - var _history = {}; - for (var i = 0, l = history.length; i < l; i++) { - var item = history[i]; - _history[item.id] = item; - } - return _history; -} - function historyGet(req, res) { if (req.isAuthenticated()) { getHistory(req.user.id, function (err, history) { if (err) return response.errorInternalError(res); if (!history) return response.errorNotFound(res); res.send({ - history: parseHistoryToArray(history) + history: historyUpdater.parseHistoryToArray(history) }); }); } else { diff --git a/lib/workers/historyUpdater.js b/lib/workers/historyUpdater.js new file mode 100644 index 0000000..df80e92 --- /dev/null +++ b/lib/workers/historyUpdater.js @@ -0,0 +1,66 @@ +// external modules +var async = require('async'); + +// core +var config = require("../config.js"); +var logger = require("../logger.js"); +var models = require("../models"); + +process.on('message', function (data) { + if (!data || !data.msg || data.msg !== 'update history' || !data.caches) return process.exit(); + var caches = data.caches; + async.each(Object.keys(caches), function (key, callback) { + var cache = caches[key]; + if (config.debug) logger.info("history updater found dirty history: " + key); + var history = parseHistoryToArray(cache); + finishUpdateHistory(key, history, function (err, count) { + if (err) return callback(err, null); + if (!count) return callback(null, null); + process.send({ + msg: 'check', + userid: key + }); + return callback(null, null); + }); + }, function (err) { + if (err) logger.error('history updater error', err); + process.exit(); + }); +}); + +function finishUpdateHistory(userid, history, callback) { + models.User.update({ + history: JSON.stringify(history) + }, { + where: { + id: userid + } + }).then(function (count) { + return callback(null, count); + }).catch(function (err) { + return callback(err, null); + }); +} + +function parseHistoryToArray(history) { + var _history = []; + Object.keys(history).forEach(function (key) { + var item = history[key]; + _history.push(item); + }); + return _history; +} + +function parseHistoryToObject(history) { + var _history = {}; + for (var i = 0, l = history.length; i < l; i++) { + var item = history[i]; + _history[item.id] = item; + } + return _history; +} + +module.exports = { + parseHistoryToArray: parseHistoryToArray, + parseHistoryToObject: parseHistoryToObject +}; \ No newline at end of file