Bug 1174674 - Clear out all pending pings when FHR is deactived. r=gfritzsche, a=lizzard
authorAlessio Placitelli <alessio.placitelli@gmail.com>
Fri, 02 Oct 2015 05:50:00 +0200
changeset 296691 e7f899e48fc6
parent 296690 9576cf65c2b5
child 296692 5ffeff840220
push id5292
push usercbook@mozilla.com
push date2015-11-11 10:02 +0000
treeherdermozilla-beta@5ffeff840220 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersgfritzsche, lizzard
bugs1174674
milestone43.0
Bug 1174674 - Clear out all pending pings when FHR is deactived. r=gfritzsche, a=lizzard
toolkit/components/telemetry/TelemetryController.jsm
toolkit/components/telemetry/TelemetrySend.jsm
toolkit/components/telemetry/TelemetryStorage.jsm
toolkit/components/telemetry/tests/unit/test_TelemetryController.js
--- a/toolkit/components/telemetry/TelemetryController.jsm
+++ b/toolkit/components/telemetry/TelemetryController.jsm
@@ -903,19 +903,35 @@ var Impl = {
    * the preferences panel), this triggers sending the deletion ping.
    */
   _onUploadPrefChange: function() {
     const uploadEnabled = Preferences.get(PREF_FHR_UPLOAD_ENABLED, false);
     if (uploadEnabled) {
       // There's nothing we should do if we are enabling upload.
       return;
     }
-    // Send the deletion ping.
-    this._log.trace("_onUploadPrefChange - Sending deletion ping.");
-    this.submitExternalPing(PING_TYPE_DELETION, {}, { addClientId: true });
+
+    let p = Task.spawn(function*() {
+      try {
+        // Clear the current pings.
+        yield TelemetrySend.clearCurrentPings();
+
+        // Remove all the pending pings, but not the deletion ping.
+        yield TelemetryStorage.runRemovePendingPingsTask();
+      } catch (e) {
+        this._log.error("_onUploadPrefChange - error clearing pending pings", e);
+      } finally {
+        // Always send the deletion ping.
+        this._log.trace("_onUploadPrefChange - Sending deletion ping.");
+        this.submitExternalPing(PING_TYPE_DELETION, {}, { addClientId: true });
+      }
+    }.bind(this));
+
+    this._shutdownBarrier.client.addBlocker(
+      "TelemetryController: removing pending pings after data upload was disabled", p);
   },
 
   _attachObservers: function() {
     if (IS_UNIFIED_TELEMETRY) {
       // Watch the FHR upload setting to trigger deletion pings.
       Preferences.observe(PREF_FHR_UPLOAD_ENABLED, this._onUploadPrefChange, this);
     }
   },
--- a/toolkit/components/telemetry/TelemetrySend.jsm
+++ b/toolkit/components/telemetry/TelemetrySend.jsm
@@ -230,16 +230,23 @@ this.TelemetrySend = {
   /**
    * Only used in tests.
    */
   setServer: function(server) {
     return TelemetrySendImpl.setServer(server);
   },
 
   /**
+   * Clear out unpersisted, yet to be sent, pings and cancel outgoing ping requests.
+   */
+  clearCurrentPings: function() {
+    return TelemetrySendImpl.clearCurrentPings();
+  },
+
+  /**
    * Only used in tests to wait on outgoing pending pings.
    */
   testWaitOnOutgoingPings: function() {
     return TelemetrySendImpl.promisePendingPingActivity();
   },
 
   /**
    * Test-only - this allows overriding behavior to enable ping sending in debug builds.
@@ -321,26 +328,29 @@ var SendScheduler = {
 
   shutdown: function() {
     this._log.trace("shutdown");
     this._shutdown = true;
     CancellableTimeout.cancelTimeout();
     return Promise.resolve(this._sendTask);
   },
 
+  start: function() {
+    this._log.trace("start");
+    this._sendsFailed = false;
+    this._backoffDelay = SEND_TICK_DELAY;
+    this._shutdown = false;
+  },
+
   /**
    * Only used for testing, resets the state to emulate a restart.
    */
   reset: function() {
     this._log.trace("reset");
-    return this.shutdown().then(() => {
-      this._sendsFailed = false;
-      this._backoffDelay = SEND_TICK_DELAY;
-      this._shutdown = false;
-    });
+    return this.shutdown().then(() => this.start());
   },
 
   /**
    * Notify the scheduler of a failure in sending out pings that warrants retrying.
    * This will trigger the exponential backoff timer behavior on the next tick.
    */
   notifySendsFailed: function() {
     this._log.trace("notifySendsFailed");
@@ -516,16 +526,18 @@ var SendScheduler = {
       sendTaskState: this._sendTaskState,
       backoffDelay: this._backoffDelay,
     };
   },
  };
 
 var TelemetrySendImpl = {
   _sendingEnabled: false,
+  // Tracks the shutdown state.
+  _shutdown: false,
   _logger: null,
   // This tracks all pending ping requests to the server.
   _pendingPingRequests: new Map(),
   // This tracks all the pending async ping activity.
   _pendingPingActivity: new Set(),
   // This is true when running in the test infrastructure.
   _testMode: false,
   // This holds pings that we currently try and haven't persisted yet.
@@ -611,16 +623,18 @@ var TelemetrySendImpl = {
     for (let pingInfo of infos) {
       const ageInDays =
         Utils.millisecondsToDays(Math.abs(now.getTime() - pingInfo.lastModificationDate));
       Telemetry.getHistogramById("TELEMETRY_PENDING_PINGS_AGE").add(ageInDays);
     }
    }),
 
   shutdown: Task.async(function*() {
+    this._shutdown = true;
+
     for (let topic of this.OBSERVER_TOPICS) {
       try {
         Services.obs.removeObserver(this, topic);
       } catch (ex) {
         this._log.error("shutdown - failed to remove observer for " + topic, ex);
       }
     }
 
@@ -638,18 +652,18 @@ var TelemetrySendImpl = {
 
     // Save any outstanding pending pings to disk.
     yield this._persistCurrentPings();
   }),
 
   reset: function() {
     this._log.trace("reset");
 
+    this._shutdown = false;
     this._currentPings = new Map();
-
     this._overduePingCount = 0;
 
     const histograms = [
       "TELEMETRY_SUCCESS",
       "TELEMETRY_SEND",
       "TELEMETRY_PING",
     ];
 
@@ -701,16 +715,48 @@ var TelemetrySendImpl = {
   /**
    * Only used in tests.
    */
   setServer: function (server) {
     this._log.trace("setServer", server);
     this._server = server;
   },
 
+  /**
+   * Clear out unpersisted, yet to be sent, pings and cancel outgoing ping requests.
+   */
+  clearCurrentPings: Task.async(function*() {
+    if (this._shutdown) {
+      this._log.trace("clearCurrentPings - in shutdown, bailing out");
+      return;
+    }
+
+    // Temporarily disable the scheduler. It must not try to reschedule ping sending
+    // while we're deleting them.
+    yield SendScheduler.shutdown();
+
+    // Now that the ping activity has settled, abort outstanding ping requests.
+    this._cancelOutgoingRequests();
+
+    // Also, purge current pings.
+    this._currentPings.clear();
+
+    // We might have been interrupted and shutdown could have been started.
+    // We need to bail out in that case to avoid triggering send activity etc.
+    // at unexpected times.
+    if (this._shutdown) {
+      this._log.trace("clearCurrentPings - in shutdown, not spinning SendScheduler up again");
+      return;
+    }
+
+    // Enable the scheduler again and spin the send task.
+    SendScheduler.start();
+    SendScheduler.triggerSendingPings(true);
+  }),
+
   _cancelOutgoingRequests: function() {
     // Abort any pending ping XHRs.
     for (let [id, request] of this._pendingPingRequests) {
       this._log.trace("_cancelOutgoingRequests - aborting ping request for id " + id);
       try {
         request.abort();
       } catch (e) {
         this._log.error("_cancelOutgoingRequests - failed to abort request for id " + id, e);
--- a/toolkit/components/telemetry/TelemetryStorage.jsm
+++ b/toolkit/components/telemetry/TelemetryStorage.jsm
@@ -111,32 +111,19 @@ var Policy = {
                                 : PENDING_PINGS_QUOTA_BYTES_DESKTOP,
 };
 
 /**
  * Wait for all promises in iterable to resolve or reject. This function
  * always resolves its promise with undefined, and never rejects.
  */
 function waitForAll(it) {
-  let list = Array.from(it);
-  let pending = list.length;
-  if (pending == 0) {
-    return Promise.resolve();
-  }
-  return new Promise(function(resolve, reject) {
-    let rfunc = () => {
-      --pending;
-      if (pending == 0) {
-        resolve();
-      }
-    };
-    for (let p of list) {
-      p.then(rfunc, rfunc);
-    }
-  });
+  let dummy = () => {};
+  let promises = [for (p of it) p.catch(dummy)];
+  return Promise.all(promises);
 }
 
 this.TelemetryStorage = {
   get pingDirectoryPath() {
     return OS.Path.join(OS.Constants.Path.profileDir, "saved-telemetry-pings");
   },
 
   /**
@@ -200,16 +187,25 @@ this.TelemetryStorage = {
    *
    * @return {Promise} Resolved when the cleanup task completes.
    */
   runEnforcePendingPingsQuotaTask: function() {
     return TelemetryStorageImpl.runEnforcePendingPingsQuotaTask();
   },
 
   /**
+   * Run the task to remove all the pending pings (except the deletion ping).
+   *
+   * @return {Promise} Resolved when the pings are removed.
+   */
+  runRemovePendingPingsTask: function() {
+    return TelemetryStorageImpl.runRemovePendingPingsTask();
+  },
+
+  /**
    * Reset the storage state in tests.
    */
   reset: function() {
     return TelemetryStorageImpl.reset();
   },
 
   /**
    * Test method that allows waiting on the archive clean task to finish.
@@ -558,16 +554,22 @@ var TelemetryStorageImpl = {
   _activelyArchiving: new Set(),
   // Track the archive loading task to prevent multiple tasks from being executed.
   _scanArchiveTask: null,
   // Track the archive cleanup task.
   _cleanArchiveTask: null,
   // Whether we already scanned the archived pings on disk.
   _scannedArchiveDirectory: false,
 
+  // Track the pending ping removal task.
+  _removePendingPingsTask: null,
+
+  // This tracks all the pending async ping save activity.
+  _activePendingPingSaves: new Set(),
+
   // Tracks the pending pings in a Map of (id -> {timestampCreated, type}).
   // We use this to cache info on pending pings to avoid scanning the disk more than once.
   _pendingPings: new Map(),
 
   // Track the pending pings enforce quota task.
   _enforcePendingPingsQuotaTask: null,
 
   // Track the shutdown process to bail out of the clean up task quickly.
@@ -583,22 +585,50 @@ var TelemetryStorageImpl = {
 
   /**
    * Shutdown & block on any outstanding async activity in this module.
    *
    * @return {Promise} Promise that is resolved when shutdown is complete.
    */
   shutdown: Task.async(function*() {
     this._shutdown = true;
-    yield this._abortedSessionSerializer.flushTasks();
-    yield this._deletionPingSerializer.flushTasks();
-    // If the tasks for archive cleaning or pending ping quota are still running, block on
-    // them. They will bail out as soon as possible.
-    yield this._cleanArchiveTask;
-    yield this._enforcePendingPingsQuotaTask;
+
+    // If the following tasks are still running, block on them. They will bail out as soon
+    // as possible.
+    yield this._abortedSessionSerializer.flushTasks().catch(ex => {
+      this._log.error("shutdown - failed to flush aborted-session writes", ex);
+    });
+
+    yield this._deletionPingSerializer.flushTasks().catch(ex => {
+      this._log.error("shutdown - failed to flush deletion ping writes", ex);
+    });
+
+    if (this._cleanArchiveTask) {
+      yield this._cleanArchiveTask.catch(ex => {
+        this._log.error("shutdown - the archive cleaning task failed", ex);
+      });
+    }
+
+    if (this._enforcePendingPingsQuotaTask) {
+      yield this._enforcePendingPingsQuotaTask.catch(ex => {
+        this._log.error("shutdown - the pending pings quota task failed", ex);
+      });
+    }
+
+    if (this._removePendingPingsTask) {
+      yield this._removePendingPingsTask.catch(ex => {
+        this._log.error("shutdown - the pending pings removal task failed", ex);
+      });
+    }
+
+    // Wait on pending pings still being saved. While OS.File should have shutdown
+    // blockers in place, we a) have seen weird errors being reported that might
+    // indicate a bad shutdown path and b) might have completion handlers hanging
+    // off the save operations that don't expect to be late in shutdown.
+    yield this.promisePendingPingSaves();
   }),
 
   /**
    * Save an archived ping to disk.
    *
    * @param {object} ping The ping data to archive.
    * @return {promise} Promise that is resolved when the ping is successfully archived.
    */
@@ -1213,23 +1243,25 @@ var TelemetryStorageImpl = {
    * @param {object} ping The ping.
    * @returns {promise}
    */
   cleanupPingFile: function(ping) {
     return OS.File.remove(pingFilePath(ping));
   },
 
   savePendingPing: function(ping) {
-    return this.savePing(ping, true).then((path) => {
+    let p = this.savePing(ping, true).then((path) => {
       this._pendingPings.set(ping.id, {
         path: path,
         lastModificationDate: Policy.now().getTime(),
       });
       this._log.trace("savePendingPing - saved ping with id " + ping.id);
     });
+    this._trackPendingPingSaveTask(p);
+    return p;
   },
 
   loadPendingPing: Task.async(function*(id) {
     this._log.trace("loadPendingPing - id: " + id);
     let info = this._pendingPings.get(id);
     if (!info) {
       this._log.trace("loadPendingPing - unknown id " + id);
       throw new Error("TelemetryStorage.loadPendingPing - no ping with id " + id);
@@ -1281,16 +1313,90 @@ var TelemetryStorageImpl = {
 
     this._log.trace("removePendingPing - deleting ping with id: " + id +
                     ", path: " + info.path);
     this._pendingPings.delete(id);
     return OS.File.remove(info.path).catch((ex) =>
       this._log.error("removePendingPing - failed to remove ping", ex));
   },
 
+  /**
+   * Track any pending ping save tasks through the promise passed here.
+   * This is needed to block on any outstanding ping save activity.
+   *
+   * @param {Object<Promise>} The save promise to track.
+   */
+  _trackPendingPingSaveTask: function (promise) {
+    let clear = () => this._activePendingPingSaves.delete(promise);
+    promise.then(clear, clear);
+    this._activePendingPingSaves.add(promise);
+  },
+
+  /**
+   * Return a promise that allows to wait on pending pings being saved.
+   * @return {Object<Promise>} A promise resolved when all the pending pings save promises
+   *         are resolved.
+   */
+  promisePendingPingSaves: function () {
+    // Make sure to wait for all the promises, even if they reject. We don't need to log
+    // the failures here, as they are already logged elsewhere.
+    return waitForAll(this._activePendingPingSaves);
+  },
+
+  /**
+   * Run the task to remove all the pending pings (except the deletion ping).
+   *
+   * @return {Promise} Resolved when the pings are removed.
+   */
+  runRemovePendingPingsTask: Task.async(function*() {
+    // If we already have a pending pings removal task active, return that.
+    if (this._removePendingPingsTask) {
+      return this._removePendingPingsTask;
+    }
+
+    // Start the task to remove all pending pings. Also make sure to clear the task once done.
+    try {
+      this._removePendingPingsTask = this.removePendingPings();
+      yield this._removePendingPingsTask;
+    } finally {
+      this._removePendingPingsTask = null;
+    }
+  }),
+
+  removePendingPings: Task.async(function*() {
+    this._log.trace("removePendingPings - removing all pending pings");
+
+    // Wait on pending pings still being saved, so so we don't miss removing them.
+    yield this.promisePendingPingSaves();
+
+    // Individually remove existing pings, so we don't interfere with operations expecting
+    // the pending pings directory to exist.
+    const directory = TelemetryStorage.pingDirectoryPath;
+    let iter = new OS.File.DirectoryIterator(directory);
+
+    try {
+      if (!(yield iter.exists())) {
+        this._log.trace("removePendingPings - the pending pings directory doesn't exist");
+        return;
+      }
+
+      let files = (yield iter.nextBatch()).filter(e => !e.isDir);
+      for (let file of files) {
+        try {
+          yield OS.File.remove(file.path);
+        } catch (ex) {
+          this._log.error("removePendingPings - failed to remove file " + file.path, ex);
+          continue;
+        }
+      }
+    } finally {
+      yield iter.close();
+    }
+  }),
+
   loadPendingPingList: function() {
     // If we already have a pending scanning task active, return that.
     if (this._scanPendingPingsTask) {
       return this._scanPendingPingsTask;
     }
 
     if (this._scannedPendingDirectory) {
       this._log.trace("loadPendingPingList - Pending already scanned, hitting cache.");
@@ -1559,18 +1665,20 @@ var TelemetryStorageImpl = {
    * Save the deletion ping.
    * @param ping The deletion ping.
    * @return {Promise} Resolved when the ping is saved.
    */
   saveDeletionPing: Task.async(function*(ping) {
     this._log.trace("saveDeletionPing - ping path: " + gDeletionPingFilePath);
     yield OS.File.makeDir(gDataReportingDir, { ignoreExisting: true });
 
-    return this._deletionPingSerializer.enqueueTask(() =>
+    let p = this._deletionPingSerializer.enqueueTask(() =>
       this.savePingToFile(ping, gDeletionPingFilePath, true));
+    this._trackPendingPingSaveTask(p);
+    return p;
   }),
 
   /**
    * Remove the deletion ping.
    * @return {Promise} Resolved when the ping is deleted from the disk.
    */
   removeDeletionPing: Task.async(function*() {
     return this._deletionPingSerializer.enqueueTask(Task.async(function*() {
--- a/toolkit/components/telemetry/tests/unit/test_TelemetryController.js
+++ b/toolkit/components/telemetry/tests/unit/test_TelemetryController.js
@@ -138,17 +138,17 @@ add_task(function* test_simplePing() {
   // Make sure the version in the query string matches the new ping format version.
   let params = request.queryString.split("&");
   Assert.ok(params.find(p => p == ("v=" + PING_FORMAT_VERSION)));
 
   let ping = decodeRequestPayload(request);
   checkPingFormat(ping, TEST_PING_TYPE, false, false);
 });
 
-add_task(function* test_deletionPing() {
+add_task(function* test_disableDataUpload() {
   const isUnified = Preferences.get(PREF_UNIFIED, false);
   if (!isUnified) {
     // Skipping the test if unified telemetry is off, as no deletion ping will
     // be generated.
     return;
   }
 
   const PREF_TELEMETRY_SERVER = "toolkit.telemetry.server";
@@ -161,22 +161,36 @@ add_task(function* test_deletionPing() {
   // Wait on ping activity to settle.
   yield TelemetrySend.testWaitOnOutgoingPings();
 
   // Restore FHR Upload.
   Preferences.set(PREF_FHR_UPLOAD_ENABLED, true);
 
   // Simulate a failure in sending the deletion ping by disabling the HTTP server.
   yield PingServer.stop();
+
+  // Try to send a ping. It will be saved as pending  and get deleted when disabling upload.
+  TelemetryController.submitExternalPing(TEST_PING_TYPE, {});
+
   // Disable FHR upload to send a deletion ping again.
   Preferences.set(PREF_FHR_UPLOAD_ENABLED, false);
-  // Wait for the send task to terminate, flagging it to do so at the next opportunity and
-  // cancelling any timeouts.
+
+  // Wait on sending activity to settle, as |TelemetryController.reset()| doesn't do that.
+  yield TelemetrySend.testWaitOnOutgoingPings();
+  // Wait for the pending pings to be deleted. Resetting TelemetryController doesn't
+  // trigger the shutdown, so we need to call it ourselves.
+  yield TelemetryStorage.shutdown();
+  // Simulate a restart, and spin the send task.
   yield TelemetryController.reset();
 
+  // Disabling Telemetry upload must clear out all the pending pings.
+  let pendingPings = yield TelemetryStorage.loadPendingPingList();
+  Assert.equal(pendingPings.length, 1,
+               "All the pending pings but the deletion ping should have been deleted");
+
   // Enable the ping server again.
   PingServer.start();
   // We set the new server using the pref, otherwise it would get reset with
   // |TelemetryController.reset|.
   Preferences.set(PREF_TELEMETRY_SERVER, "http://localhost:" + PingServer.port);
 
   // Reset the controller to spin the ping sending task.
   yield TelemetryController.reset();
@@ -264,17 +278,17 @@ add_task(function* test_archivePings() {
   let promise = TelemetryArchive.promiseArchivedPingById(pingId);
   Assert.ok((yield promiseRejects(promise)),
     "TelemetryController should not archive pings if the archive pref is disabled.");
 
   // Enable archiving and the upload so that pings get sent and archived again.
   Preferences.set(uploadPref, true);
   Preferences.set(PREF_ARCHIVE_ENABLED, true);
 
-  now = new Date(2014, 06, 18, 22, 0, 0);
+  now = new Date(2014, 6, 18, 22, 0, 0);
   fakeNow(now);
   // Restore the non asserting ping handler.
   PingServer.resetPingHandler();
   pingId = yield sendPing(true, true);
 
   // Check that we archive pings when successfully sending them.
   yield PingServer.promiseNextPing();
   ping = yield TelemetryArchive.promiseArchivedPingById(pingId);