Bug 1025464 - Refactor log sweeping code to use Task.jsm and move it to logger.js. r=aleth
authorNihanth Subramanya <nhnt11@gmail.com>
Fri, 27 Jun 2014 05:15:03 +0530
changeset 20432 de14c1c218bcb58ae98f448265da77f9ccf3c0fb
parent 20431 946e17f54a9516a77e23f2526eb300eb804e4949
child 20433 ae54459b97a507163e4c5f43bedab73cb0641620
push id1209
push usermbanner@mozilla.com
push dateTue, 02 Sep 2014 16:59:36 +0000
treeherdercomm-beta@842e0fd167ee [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersaleth
bugs1025464
Bug 1025464 - Refactor log sweeping code to use Task.jsm and move it to logger.js. r=aleth
chat/components/public/imILogger.idl
chat/components/src/logger.js
chat/components/src/test/test_logger.js
im/components/ibConvStatsService.js
--- a/chat/components/public/imILogger.idl
+++ b/chat/components/public/imILogger.idl
@@ -46,16 +46,24 @@ interface imILog: nsISupports {
   // Value in seconds.
   readonly attribute PRTime time;
   readonly attribute AUTF8String format;
   // Returns a promise that resolves to an imILogConversation instance, or null
   // if the log format isn't JSON.
   jsval getConversation();
 };
 
+[scriptable, function, uuid(2ab5f8ac-4b89-4954-9a4a-7c167f1e3b0d)]
+interface imIProcessLogsCallback: nsISupports {
+  // The callback can return a promise. If it does, then it will not be called
+  // on the next log until this promise resolves. If it throws (or rejects),
+  // iteration will stop.
+  jsval processLog(in AUTF8String aLogPath);
+};
+
 [scriptable, uuid(b9d5701a-df53-4e0e-99b7-706e0118e075)]
 interface imILogger: nsISupports {
   // Returns a promise that resolves to an imILog instance.
   jsval getLogFromFile(in AUTF8String aFilePath, [optional] in boolean aGroupByDay);
   // Returns a promise that resolves to the log file path if a log writer
   // exists for the conversation, or null otherwise. The promise resolves
   // after any pending I/O operations on the file complete.
   jsval getLogPathForConversation(in prplIConversation aConversation);
@@ -74,9 +82,16 @@ interface imILogger: nsISupports {
   jsval getLogsForContact(in imIContact aContact,
                           [optional] in boolean aGroupByDay);
 
   jsval getLogsForConversation(in prplIConversation aConversation,
                                [optional] in boolean aGroupByDay);
   jsval getSystemLogsForAccount(in imIAccount aAccount);
   jsval getSimilarLogs(in imILog aLog,
                        [optional] in boolean aGroupByDay);
+
+  // Asynchronously iterates through log folders for all prpls and accounts and
+  // invokes the callback on every log file. Returns a promise that resolves when
+  // iteration is complete. If the callback returns a promise, iteration pauses
+  // until the promise resolves. If the callback throws (or rejects), iteration
+  // will stop and the returned promise will reject with the same error.
+  jsval forEach(in imIProcessLogsCallback aCallback);
 };
--- a/chat/components/src/logger.js
+++ b/chat/components/src/logger.js
@@ -742,16 +742,60 @@ Logger.prototype = {
     } catch (aError) {
       Cu.reportError("Error getting similar logs for \"" +
                      aLog.path + "\":\n" + aError);
     }
     // If there was an error, this will return an EmptyEnumerator.
     return this._getEnumerator(entries, aGroupByDay);
   }),
 
+  forEach: Task.async(function* (aCallback) {
+    let getAllSubdirs = Task.async(function* (aPaths, aErrorMsg) {
+      let entries = [];
+      for (let path of aPaths) {
+        let iterator = new OS.File.DirectoryIterator(path);
+        try {
+          entries = entries.concat(yield iterator.nextBatch());
+        } catch (aError) {
+          if (aErrorMsg)
+            Cu.reportError(aErrorMsg + "\n" + aError);
+        } finally {
+          iterator.close();
+        }
+      }
+      entries = entries.filter(aEntry => aEntry.isDir)
+                       .map(aEntry => aEntry.path);
+      return entries;
+    });
+
+    let logsPath = OS.Path.join(OS.Constants.Path.profileDir, "logs");
+    let prpls = yield getAllSubdirs([logsPath]);
+    let accounts =
+      yield getAllSubdirs(prpls, "Error while sweeping prpl folder:");
+    let logFolders =
+      yield getAllSubdirs(accounts, "Error while sweeping account folder:");
+    for (let folder of logFolders) {
+      let iterator = new OS.File.DirectoryIterator(folder);
+      try {
+        yield iterator.forEach(aEntry => {
+          if (aEntry.isDir || !aEntry.name.endsWith(".json"))
+            return null;
+          return aCallback.processLog(aEntry.path);
+        });
+      } catch (aError) {
+        // If the callback threw, reject the promise and let the caller handle it.
+        if (!(aError instanceof OS.File.Error))
+          throw aError;
+        Cu.reportError("Error sweeping log folder:\n" + aError);
+      } finally {
+        iterator.close();
+      }
+    }
+  }),
+
   observe: function logger_observe(aSubject, aTopic, aData) {
     switch (aTopic) {
     case "profile-after-change":
       Services.obs.addObserver(this, "final-ui-startup", false);
       break;
     case "final-ui-startup":
       Services.obs.removeObserver(this, "final-ui-startup");
       ["new-text", "conversation-closed", "conversation-left-chat",
--- a/chat/components/src/test/test_logger.js
+++ b/chat/components/src/test/test_logger.js
@@ -398,16 +398,29 @@ let test_logging = function* () {
   logs = yield logger.getLogsForConversation(dummyConv, true);
   while (logs.hasMoreElements()) {
     let log = logs.getNext();
     let conv = yield log.getConversation();
     let date = reduceTimeToDate(log.time);
     // 3 session messages - for daily logs, bad files are included.
     testMsgs(conv.getMessages(), messagesByDay.get(date), 3);
   }
+
+  // Remove the created log files, testing forEach in the process.
+  yield logger.forEach({
+    processLog: Task.async(function* (aLog) {
+      let info = yield OS.File.stat(aLog);
+      ok(!info.isDir);
+      ok(aLog.endsWith(".json"));
+      yield OS.File.remove(aLog);
+    })
+  });
+  let logFolder = OS.Path.dirname(gLogger.getLogFilePathForConversation(dummyConv));
+  // The folder should now be empty - this will throw if it isn't.
+  yield OS.File.removeEmptyDir(logFolder, {ignoreAbsent: false});
 }
 
 function run_test() {
   // Test encodeName().
   for (let i = 0; i < encodeName_input.length; ++i)
     equal(gLogger.encodeName(encodeName_input[i]), encodeName_output[i]);
 
   // Test convIsRealMUC().
--- a/im/components/ibConvStatsService.js
+++ b/im/components/ibConvStatsService.js
@@ -37,18 +37,16 @@ var gStatsByConvId = {};
 // Initialized when gStatsByConvId is ready (i.e. all log files have been parsed
 // or it was loaded from the JSON cache file).
 var gStatsByContactId;
 
 // Recursively sweeps log folders and parses log files for conversation statistics.
 var gLogParser = {
   _statsService: null,
   _accountMap: null,
-  _accounts: [],
-  _logFolders: [],
   inProgress: false,
   error: false,
 
   // The general path of a log is logs/prpl/account/conv/date.json.
   // First, sweep the logs folder for prpl folders.
   sweep: function(aStatsService) {
     initLogModule("stats-service-log-sweeper", this);
     this.inProgress = true;
@@ -60,160 +58,76 @@ var gLogParser = {
 
     this._accountMap = new Map();
     let accounts = Services.accounts.getAccounts();
     while (accounts.hasMoreElements()) {
       let account = accounts.getNext();
       this._accountMap.set(account.normalizedName, account);
     }
 
-    let logsPath = OS.Path.join(OS.Constants.Path.profileDir, "logs");
-    let iterator = new OS.File.DirectoryIterator(logsPath);
-    iterator.nextBatch().then(
-      aEntries => {
-        iterator.close();
-        // Filter out any stray files (e.g. system files).
-        aEntries = aEntries.filter(function(e) e.isDir);
-        this._sweepPrpls(aEntries);
-      },
-      aError => {
-        iterator.close();
-        if (aError instanceof OS.File.Error && !aError.becauseNoSuchFile) {
-          Cu.reportError("Error while sweeping logs folder: " + logsPath + "\n" + aError);
+    let decoder = new TextDecoder();
+
+    Services.logs.forEach(aLog => {
+      return OS.File.read(aLog).then(
+        aArray => {
+          // Try to parse the log file. If anything goes wrong here, the log file
+          // has likely been tampered with so we ignore it.
+          try {
+            let lines = decoder.decode(aArray).split("\n");
+            // The first line is the header which identifies the conversation.
+            let header = JSON.parse(lines.shift());
+            let accountName = header.account;
+            let name = header.normalizedName;
+            if (!name) {
+              // normalizedName was added for IB 1.5, so we normalize
+              // manually if it is not found for backwards compatibility.
+              name = header.name;
+              let account = this._accountMap.get(accountName);
+              if (account)
+                name = account.normalize(name);
+            }
+            let id = getConversationId(header.protocol, accountName,
+                                       name, header.isChat);
+            if (!(id in gStatsByConvId))
+              gStatsByConvId[id] = new ConversationStats(id);
+            let stats = gStatsByConvId[id];
+            lines.pop(); // Ignore the final line break.
+            for (let line of lines) {
+              line = JSON.parse(line);
+              if (line.flags[0] == "system") // Ignore system messages.
+                continue;
+              line.flags[0] == "incoming" ?
+                ++stats.incomingCount : ++stats.outgoingCount;
+            }
+            let date = Date.parse(header.date);
+            if (date > stats.lastDate)
+              stats.lastDate = date;
+            delete stats._computedScore;
+          }
+          catch(e) {
+            this.WARN("Error parsing log file: " + aLog + "\n" + e);
+          }
+        },
+        aError => {
+          Cu.reportError("Error reading log file: " + aLog + "\n" + aError);
           this.error = true;
         }
-        this._sweepingComplete();
-      });
-  },
-
-  _sweepingComplete: function() {
-    delete this.inProgress;
-    delete this._accountMap;
-    let statsService = this._statsService;
-    statsService._cacheAllStats(); // Flush stats to JSON cache.
-    statsService._convs.sort(statsService._sortComparator);
-    statsService._notifyObservers("log-sweeping", "done");
-    gStatsByContactId = {}; // Initialize stats cache for contacts.
-  },
-
-  // Sweep each prpl folder for account folders, and add them to this._accounts.
-  _sweepPrpls: function(aPrpls) {
-    if (!aPrpls.length)
-      this._sweepAccounts();
-    let path = aPrpls.shift().path;
-    let iterator = new OS.File.DirectoryIterator(path);
-    iterator.nextBatch().then(
-      aEntries => {
-        iterator.close();
-        // Filter out any stray files (e.g. system files).
-        aEntries = aEntries.filter(function(e) e.isDir);
-        this._accounts = this._accounts.concat(aEntries);
-        this._sweepPrpls(aPrpls);
-      },
-      aError => {
-        iterator.close();
-        Cu.reportError("Error while sweeping prpl folder: " + path + "\n" + aError);
-        this.error = true;
-        this._sweepPrpls(aPrpls);
-      });
-  },
-
-  // Sweep each account folder for conversation log folders, add them to this._logFolders.
-  _sweepAccounts: function() {
-    if (!this._accounts.length)
-      this._sweepLogFolders();
-    let path = this._accounts.shift().path;
-    let iterator = new OS.File.DirectoryIterator(path);
-    iterator.nextBatch().then(
-      aEntries => {
-        iterator.close();
-        // Filter out any stray files (e.g. system files).
-        aEntries = aEntries.filter(function(e) e.isDir);
-        this._logFolders = this._logFolders.concat(aEntries);
-        this._sweepAccounts();
-      },
-      aError => {
-        iterator.close();
-        Cu.reportError("Error while sweeping account folder: " + path + "\n" + aError);
-        this.error = true;
-        this._sweepAccounts();
-      });
+      );
+    }).catch(aError => {
+      this.error = true;
+    }).then(() => {
+      delete this.inProgress;
+      delete this._accountMap;
+      let statsService = this._statsService;
+      statsService._cacheAllStats(); // Flush stats to JSON cache.
+      statsService._convs.sort(statsService._sortComparator);
+      statsService._notifyObservers("log-sweeping", "done");
+      gStatsByContactId = {}; // Initialize stats cache for contacts.
+    });
   },
-
-  // Sweep the log folders.
-  _sweepLogFolders: function() {
-    if (!this._logFolders.length)
-      this._sweepingComplete();
-    let path = this._logFolders.shift().path;
-    let iterator = new OS.File.DirectoryIterator(path);
-    let decoder = new TextDecoder();
-    iterator.forEach(aEntry => {
-      if (aEntry.name.endsWith(".json"))
-        return this._sweepJSONLog(aEntry.path, decoder);
-      return undefined;
-    }).then(
-      () => {
-        iterator.close();
-        this._sweepLogFolders();
-      },
-      aError => {
-        iterator.close();
-        Cu.reportError("Error while sweeping log folder: " + path + "\n" + aError);
-        this.error = true;
-        this._sweepLogFolders();
-      });
-  },
-
-  // Goes through a JSON log file and parses it for stats.
-  _sweepJSONLog: function(aLog, aDecoder) {
-    return OS.File.read(aLog).then(
-      aArray => {
-        // Try to parse the log file. If anything goes wrong here, the log file
-        // has likely been tampered with so we ignore it and move on.
-        try {
-          let lines = aDecoder.decode(aArray).split("\n");
-          // The first line is the header which identifies the conversation.
-          let header = JSON.parse(lines.shift());
-          let accountName = header.account;
-          let name = header.normalizedName;
-          if (!name) {
-            // normalizedName was added for IB 1.5, so we normalize
-            // manually if it is not found for backwards compatibility.
-            name = header.name;
-            let account = this._accountMap.get(accountName);
-            if (account)
-              name = account.normalize(name);
-          }
-          let id = getConversationId(header.protocol, accountName,
-                                     name, header.isChat);
-          if (!(id in gStatsByConvId))
-            gStatsByConvId[id] = new ConversationStats(id);
-          let stats = gStatsByConvId[id];
-          lines.pop(); // Ignore the final line break.
-          for (let line of lines) {
-            line = JSON.parse(line);
-            if (line.flags[0] == "system") // Ignore system messages.
-              continue;
-            line.flags[0] == "incoming" ?
-              ++stats.incomingCount : ++stats.outgoingCount;
-          }
-          let date = Date.parse(header.date);
-          if (date > stats.lastDate)
-            stats.lastDate = date;
-          delete stats._computedScore;
-        }
-        catch(e) {
-          this.WARN("Error parsing log file: " + aLog + "\n" + e);
-        }
-      },
-      aError => {
-        Cu.reportError("Error reading log file: " + aLog + "\n" + aError);
-        this.error = true;
-      });
-  }
 };
 
 function ConvStatsService() {
   this._observers = [];
 }
 ConvStatsService.prototype = {
   // Sorted list of conversations, stored as PossibleConversations.
   _convs: [],