Bug 1169691 - Make TelemetryStorage wait for pings that have been submitted for archiving but haven't finished submitting to disk, so that tests can reliably fetch submitted ping lists, r=gfritzsche a=sledru
authorBenjamin Smedberg <benjamin@smedbergs.us>
Fri, 29 May 2015 12:55:04 -0400
changeset 275143 ea083e8a04a22e582ec3bb93eba50739f04e8e9e
parent 275142 a40529984065c032e915ff74608991114f81f610
child 275144 5f544f1f1c63b5447a1c701847edcb61a96b0487
push id863
push userraliiev@mozilla.com
push dateMon, 03 Aug 2015 13:22:43 +0000
treeherdermozilla-release@f6321b14228d [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersgfritzsche, sledru
bugs1169691
milestone40.0a2
Bug 1169691 - Make TelemetryStorage wait for pings that have been submitted for archiving but haven't finished submitting to disk, so that tests can reliably fetch submitted ping lists, r=gfritzsche a=sledru
toolkit/components/telemetry/TelemetryStorage.jsm
--- a/toolkit/components/telemetry/TelemetryStorage.jsm
+++ b/toolkit/components/telemetry/TelemetryStorage.jsm
@@ -83,16 +83,39 @@ let isPingDirectoryCreated = false;
 /**
  * This is a policy object used to override behavior for testing.
  */
 let Policy = {
   now: () => new Date(),
   getArchiveQuota: () => ARCHIVE_QUOTA_BYTES,
 };
 
+/**
+ * Wait for all promises in iterable to resolve or reject. This function
+ * always resolves its promise with undefined, and never rejects.
+ */
+function waitForAll(it) {
+  let list = Array.from(it);
+  let pending = list.length;
+  if (pending == 0) {
+    return Promise.resolve();
+  }
+  return new Promise(function(resolve, reject) {
+    let rfunc = () => {
+      --pending;
+      if (pending == 0) {
+        resolve();
+      }
+    };
+    for (let p of list) {
+      p.then(rfunc, rfunc);
+    }
+  });
+}
+
 this.TelemetryStorage = {
   get MAX_PING_FILE_AGE() {
     return MAX_PING_FILE_AGE;
   },
 
   get OVERDUE_PING_FILE_AGE() {
     return OVERDUE_PING_FILE_AGE;
   },
@@ -452,16 +475,18 @@ SaveSerializer.prototype = {
 let TelemetryStorageImpl = {
   _logger: null,
   // Used to serialize aborted session ping writes to disk.
   _abortedSessionSerializer: new SaveSerializer(),
 
   // Tracks the archived pings in a Map of (id -> {timestampCreated, type}).
   // We use this to cache info on archived pings to avoid scanning the disk more than once.
   _archivedPings: new Map(),
+  // A set of promises for pings currently being archived
+  _activelyArchiving: new Set(),
   // Track the archive loading task to prevent multiple tasks from being executed.
   _scanArchiveTask: null,
   // Track the archive cleanup task.
   _cleanArchiveTask: null,
   // Whether we already scanned the archived pings on disk.
   _scannedArchiveDirectory: false,
 
   // Track the shutdown process to bail out of the clean up task quickly.
@@ -490,17 +515,25 @@ let TelemetryStorageImpl = {
   }),
 
   /**
    * Save an archived ping to disk.
    *
    * @param {object} ping The ping data to archive.
    * @return {promise} Promise that is resolved when the ping is successfully archived.
    */
-  saveArchivedPing: Task.async(function*(ping) {
+  saveArchivedPing: function(ping) {
+    let promise = this._saveArchivedPingTask(ping);
+    this._activelyArchiving.add(promise);
+    promise.then((r) => { this._activelyArchiving.delete(promise); },
+                 (e) => { this._activelyArchiving.delete(promise); });
+    return promise;
+  },
+
+  _saveArchivedPingTask: Task.async(function*(ping) {
     const creationDate = new Date(ping.creationDate);
     if (this._archivedPings.has(ping.id)) {
       const data = this._archivedPings.get(ping.id);
       if (data.timestampCreated > creationDate.getTime()) {
         this._log.error("saveArchivedPing - trying to overwrite newer ping with the same id");
         return Promise.reject(new Error("trying to overwrite newer ping with the same id"));
       } else {
         this._log.warn("saveArchivedPing - overwriting older ping with the same id");
@@ -790,36 +823,39 @@ let TelemetryStorageImpl = {
 
   /**
    * Get a list of info on the archived pings.
    * This will scan the archive directory and grab basic data about the existing
    * pings out of their filename.
    *
    * @return {promise<sequence<object>>}
    */
-  loadArchivedPingList: function() {
+  loadArchivedPingList: Task.async(function*() {
     // If there's an archive loading task already running, return it.
     if (this._scanArchiveTask) {
       return this._scanArchiveTask;
     }
 
+    yield waitForAll(this._activelyArchiving);
+
     if (this._scannedArchiveDirectory) {
       this._log.trace("loadArchivedPingList - Archive already scanned, hitting cache.");
-      return Promise.resolve(this._archivedPings);
+      return this._archivedPings;
     }
 
-    // Make sure to clear |_scanArchiveTask| once done.
-    let clear = pingList => {
+    // Since there's no archive loading task running, start it.
+    let result;
+    try {
+      this._scanArchiveTask = this._scanArchive();
+      result = yield this._scanArchiveTask;
+    } finally {
       this._scanArchiveTask = null;
-      return pingList;
-    };
-    // Since there's no archive loading task running, start it.
-    this._scanArchiveTask = this._scanArchive().then(clear, clear);
-    return this._scanArchiveTask;
-  },
+    }
+    return result;
+  }),
 
   _scanArchive: Task.async(function*() {
     this._log.trace("_scanArchive");
 
     let submitProbes = (pingCount, dirCount) => {
       Telemetry.getHistogramById("TELEMETRY_ARCHIVE_SCAN_PING_COUNT")
                .add(pingCount);
       Telemetry.getHistogramById("TELEMETRY_ARCHIVE_DIRECTORIES_COUNT")