Bug 1135588 - Ensure gloda IM cache saves are correct and happen when they should. r=clokep, a=rkent
authoraleth <aleth@instantbird.org>
Sat, 21 Mar 2015 20:50:02 +0100
changeset 25817 08f97de69ea305a7c635b3869f13b98cbdc0dfb6
parent 25816 091960adda435c9d5f07449d42febe73164c6de9
child 25818 f7ff3d2c4596a771df03bf7704c3142aed0f5518
push id1850
push userclokep@gmail.com
push dateWed, 08 Mar 2017 19:29:12 +0000
treeherdercomm-esr52@028df196b2d9 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersclokep, rkent
bugs1135588
Bug 1135588 - Ensure gloda IM cache saves are correct and happen when they should. r=clokep, a=rkent
mail/components/im/modules/index_im.js
--- a/mail/components/im/modules/index_im.js
+++ b/mail/components/im/modules/index_im.js
@@ -13,16 +13,18 @@ Cu.import("resource:///modules/imService
 Cu.import("resource:///modules/imXPCOMUtils.jsm");
 Cu.import("resource:///modules/iteratorUtils.jsm");
 Cu.import("resource:///modules/mailServices.js");
 Cu.import("resource://gre/modules/FileUtils.jsm");
 Cu.import("resource://gre/modules/NetUtil.jsm");
 
 XPCOMUtils.defineLazyModuleGetter(this, "OS", "resource://gre/modules/osfile.jsm");
 XPCOMUtils.defineLazyModuleGetter(this, "Task", "resource://gre/modules/Task.jsm");
+XPCOMUtils.defineLazyModuleGetter(this, "AsyncShutdown",
+                                  "resource://gre/modules/AsyncShutdown.jsm");
 
 const kCacheFileName = "indexedFiles.json";
 
 const FileInputStream = CC("@mozilla.org/network/file-input-stream;1",
                            "nsIFileInputStream",
                            "init");
 const ScriptableInputStream = CC("@mozilla.org/scriptableinputstream;1",
                                  "nsIScriptableInputStream",
@@ -228,23 +230,65 @@ Gloda.defineAttribute({
     type: "date"
   },
   canQuery: true
 });
 
 var GlodaIMIndexer = {
   name: "index_im",
   enable: function() {
-    Services.obs.addObserver(this, "new-text", false);
     Services.obs.addObserver(this, "conversation-closed", false);
     Services.obs.addObserver(this, "new-ui-conversation", false);
     Services.obs.addObserver(this, "ui-conversation-closed", false);
+
+    // The shutdown blocker ensures pending saves happen even if the app
+    // gets shut down before the timer fires.
+    if (this._shutdownBlockerAdded)
+      return;
+    this._shutdownBlockerAdded = true;
+    AsyncShutdown.profileBeforeChange.addBlocker("GlodaIMIndexer cache save",
+      () => {
+        if (!this._cacheSaveTimer)
+          return;
+        clearTimeout(this._cacheSaveTimer);
+        return this._saveCacheNow();
+      });
+
+    this._knownFiles = {};
+
+    let dir = FileUtils.getFile("ProfD", ["logs"]);
+    if (!dir.exists() || !dir.isDirectory())
+      return;
+    let cacheFile = dir.clone();
+    cacheFile.append(kCacheFileName);
+    if (!cacheFile.exists())
+      return;
+
+    const PR_RDONLY = 0x01;
+    let fis = new FileInputStream(cacheFile, PR_RDONLY, parseInt("0444", 8),
+                                  Ci.nsIFileInputStream.CLOSE_ON_EOF);
+    let sis = new ScriptableInputStream(fis);
+    let text = sis.read(sis.available());
+    sis.close();
+
+    let data = JSON.parse(text);
+
+    // Check to see if the Gloda datastore ID matches the one that we saved
+    // in the cache. If so, we can trust it. If not, that means that the
+    // cache is likely invalid now, so we ignore it (and eventually
+    // overwrite it).
+    if ("datastoreID" in data &&
+        Gloda.datastoreID &&
+        data.datastoreID === Gloda.datastoreID) {
+      // Ok, the cache's datastoreID matches the one we expected, so it's
+      // still valid.
+      this._knownFiles = data.knownFiles;
+    }
   },
   disable: function() {
-    Services.obs.removeObserver(this, "new-text");
     Services.obs.removeObserver(this, "conversation-closed");
     Services.obs.removeObserver(this, "new-ui-conversation");
     Services.obs.removeObserver(this, "ui-conversation-closed");
   },
 
   /* _knownFiles is a tree whose leaves are the last modified times of
    * log files when they were last indexed.
    * Each level of the tree is stored as an object. The root node is an
@@ -254,41 +298,35 @@ var GlodaIMIndexer = {
    * The corresponding keys of the above objects are:
    * protocol names -> account names -> conv names  -> file names -> last modified time
    * convObj maps ALL previously indexed log files of a chat buddy or MUC to
    * their last modified times. Note that gloda knows nothing about log grouping
    * done by logger.js.
    */
   _knownFiles: {},
   _cacheSaveTimer: null,
+  _shutdownBlockerAdded: false,
   _scheduleCacheSave: function() {
     if (this._cacheSaveTimer)
       return;
     this._cacheSaveTimer = setTimeout(this._saveCacheNow, 5000);
   },
   _saveCacheNow: function() {
+    GlodaIMIndexer._cacheSaveTimer = null;
+
     let data = {
       knownFiles: GlodaIMIndexer._knownFiles,
       datastoreID: Gloda.datastoreID,
     };
 
-    let file = FileUtils.getFile("ProfD", ["logs", kCacheFileName]);
-    let ostream = FileUtils.openSafeFileOutputStream(file);
-
-    // Obtain a converter to convert our data to a UTF-8 encoded input stream.
-    let converter = Cc["@mozilla.org/intl/scriptableunicodeconverter"].createInstance(Ci.nsIScriptableUnicodeConverter);
-    converter.charset = "UTF-8";
-
     // Asynchronously copy the data to the file.
-    let istream = converter.convertToInputStream(JSON.stringify(data));
-    NetUtil.asyncCopy(istream, ostream, function(rc) {
-      if (!Components.isSuccessCode(rc)) {
-        Cu.reportError("Failed to write cache file");
-      }
-    });
+    let path = OS.Path.join(OS.Constants.Path.profileDir, "logs", kCacheFileName);
+    return OS.File.writeAtomic(path, JSON.stringify(data),
+                               {encoding: "utf-8", tmpPath: path + ".tmp"})
+             .catch(aError => Cu.reportError("Failed to write cache file: " + aError));
   },
 
   _knownConversations: {},
   // Promise queue for indexing jobs. The next indexing job is queued using this
   // promise's then() to ensure we only load logs for one conv at a time.
   _indexingJobPromise: null,
   // Maps a conv id to the function that resolves the promise representing the
   // ongoing indexing job on it. This is called from indexIMConversation when it
@@ -304,36 +342,41 @@ var GlodaIMIndexer = {
       this._knownConversations[convId] = {
         id: convId,
         scheduledIndex: null,
         logFile: null,
         convObj: {}
       };
     }
 
-    if (this._knownConversations[convId].scheduledIndex == null) {
+    if (!this._knownConversations[convId].scheduledIndex) {
       // Ok, let's schedule the job.
       this._knownConversations[convId].scheduledIndex = setTimeout(
         this._beginIndexingJob.bind(this, aConversation),
         kIndexingDelay);
     }
   },
 
   _beginIndexingJob: function(aConversation) {
     let convId = aConversation.id;
 
     // In the event that we're triggering this indexing job manually, without
     // bothering to schedule it (for example, when a conversation is closed),
     // we give the conversation an entry in _knownConversations, which would
     // normally have been done in _scheduleIndexingJob.
-    if (!(convId in this._knownConversations))
-      this._knownConversations[convId] = {id: convId};
+    if (!(convId in this._knownConversations)) {
+      this._knownConversations[convId] = {
+        id: convId,
+        scheduledIndex: null,
+        logFile: null,
+        convObj: {}
+      };
+    }
 
     let conv = this._knownConversations[convId];
-
     Task.spawn(function* () {
       if (!conv.logFile) {
         let logFile =
           yield Services.logs.getLogPathForConversation(aConversation);
         if (!logFile) {
           // Log file doesn't exist yet, nothing to do!
           return;
         }
@@ -365,24 +408,27 @@ var GlodaIMIndexer = {
       let fileName = OS.Path.basename(logPath);
       let lastModifiedTime =
         (yield OS.File.stat(logPath)).lastModificationDate.valueOf();
       if (Object.prototype.hasOwnProperty.call(conv.convObj, fileName) &&
           conv.convObj[fileName] == lastModifiedTime) {
         // The file hasn't changed since we last indexed it, so we're done.
         return;
       }
+
       if (this._indexingJobPromise)
         yield this._indexingJobPromise;
       this._indexingJobPromise = new Promise(aResolve => {
         this._indexingJobCallbacks.set(convId, aResolve);
       });
+
       let job = new IndexingJob("indexIMConversation", null);
       job.conversation = conv;
       job.path = logPath;
+      job.lastModifiedTime = lastModifiedTime;
       GlodaIndexer.indexJob(job);
     }.bind(this)).catch(Components.utils.reportError);
 
     // Now clear the job, so we can index in the future.
     this._knownConversations[convId].scheduledIndex = null;
   },
 
   observe: function logger_observe(aSubject, aTopic, aData) {
@@ -391,16 +437,17 @@ var GlodaIMIndexer = {
       // unread-message-count-changed notification.
       // For this notification, aSubject is the ui-conversation that is opened.
       aSubject.addObserver(this);
       return;
     }
 
     if (aTopic == "ui-conversation-closed") {
       aSubject.removeObserver(this);
+      return;
     }
 
     if (aTopic == "unread-message-count-changed") {
       // We get this notification by attaching observers to conversations
       // directly (see the new-ui-conversation handler for when we attach).
       if (aSubject.unreadIncomingMessageCount == 0) {
         // The unread message count changed to 0, meaning that a conversation
         // that had been in the background and receiving messages was suddenly
@@ -461,17 +508,16 @@ var GlodaIMIndexer = {
     let log = yield Services.logs.getLogFromFile(aLogPath);
     let logConv = yield log.getConversation();
 
     // Ignore corrupted log files.
     if (!logConv)
       return Gloda.kWorkDone;
 
     let fileName = OS.Path.basename(aLogPath);
-
     let content = logConv.getMessages()
                          // Some messages returned, e.g. sessionstart messages,
                          // may have the noLog flag set. Ignore these.
                          .filter(m => !m.noLog)
                          .map(m => {
                            let who = m.alias || m.who;
                            // Some messages like topic change notifications may
                            // not have a source.
@@ -488,75 +534,57 @@ var GlodaIMIndexer = {
       // Get the path of the file relative to the logs directory - the last 4
       // components of the path.
       let relativePath = OS.Path.split(aLogPath).components.slice(-4).join("/");
       glodaConv = new GlodaIMConversation(logConv.title, log.time, relativePath, content);
       if (aGlodaConv)
         aGlodaConv.value = glodaConv;
     }
 
+    if (!aCache)
+      throw("indexIMConversation called without aCache parameter.");
     let isNew = !Object.prototype.hasOwnProperty.call(aCache, fileName);
     let rv = aCallbackHandle.pushAndGo(
       Gloda.grokNounItem(glodaConv, {}, true, isNew, aCallbackHandle));
-    aCache[fileName] = aLastModifiedTime;
+
+    if (!aLastModifiedTime)
+      Cu.reportError("indexIMConversation called without lastModifiedTime parameter.");
+    aCache[fileName] = aLastModifiedTime || 1;
     this._scheduleCacheSave();
+
     return rv;
   }),
 
   _worker_indexIMConversation: function(aJob, aCallbackHandle) {
     let glodaConv = {};
     if (aJob.conversation.glodaConv)
       glodaConv.value = aJob.conversation.glodaConv;
+
     // indexIMConversation may initiate an async grokNounItem sub-job.
     this.indexIMConversation(aCallbackHandle, aJob.path, aJob.lastModifiedTime,
                              aJob.conversation.convObj, glodaConv)
         .then(() => GlodaIndexer.callbackDriver());
     // Tell the Indexer that we're doing async indexing. We'll be left alone
     // until callbackDriver() is called above.
     yield Gloda.kWorkAsync;
+
     // Resolve the promise for this job.
     this._indexingJobCallbacks.get(aJob.conversation.id)();
     this._indexingJobCallbacks.delete(aJob.conversation.id);
     this._indexingJobPromise = null;
     aJob.conversation.indexPending = false;
     aJob.conversation.glodaConv = glodaConv.value;
-
     yield Gloda.kWorkDone;
   },
 
   _worker_logsFolderSweep: function(aJob) {
     let dir = FileUtils.getFile("ProfD", ["logs"]);
     if (!dir.exists() || !dir.isDirectory())
       return;
 
-    let cacheFile = dir.clone();
-    cacheFile.append(kCacheFileName);
-    if (cacheFile.exists()) {
-      const PR_RDONLY = 0x01;
-      let fis = new FileInputStream(cacheFile, PR_RDONLY, parseInt("0444", 8),
-                                    Ci.nsIFileInputStream.CLOSE_ON_EOF);
-      let sis = new ScriptableInputStream(fis);
-      let text = sis.read(sis.available());
-      sis.close();
-
-      let data = JSON.parse(text);
-
-      // Check to see if the Gloda datastore ID matches the one that we saved
-      // in the cache. If so, we can trust it. If not, that means that the
-      // cache is likely invalid now, so we ignore it (and eventually
-      // overwrite it).
-      if ("datastoreID" in data &&
-          Gloda.datastoreID &&
-          data.datastoreID === Gloda.datastoreID) {
-        // Ok, the cache's datastoreID matches the one we expected, so it's
-        // still valid.
-        this._knownFiles = data.knownFiles;
-      }
-    }
-
     // Sweep the logs directory for log files, adding any new entries to the
     // _knownFiles tree as we traverse.
     let children = dir.directoryEntries;
     while (children.hasMoreElements()) {
       let proto = children.getNext().QueryInterface(Ci.nsIFile);
       if (!proto.isDirectory())
         continue;
       let protoName = proto.leafName;