Bug 1139460 - Part 3 - Make TelemetryPing shutdown block on pending submissions. r=yoric, a=lmandel
authorGeorg Fritzsche <georg.fritzsche@googlemail.com>
Mon, 23 Mar 2015 12:45:29 +0100
changeset 265747 ec115d86ddc8287469810a4b88ed2e9e539c6b22
parent 265746 c5aacab55dde7ee0a201dc6c6fb17c70a1297844
child 265748 4ae61b09eaefc93c30a0cf1cee60ba533a3bdcf3
push id4718
push userraliiev@mozilla.com
push dateMon, 11 May 2015 18:39:53 +0000
treeherdermozilla-beta@c20c4ef55f08 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersyoric, lmandel
bugs1139460
milestone39.0a2
Bug 1139460 - Part 3 - Make TelemetryPing shutdown block on pending submissions. r=yoric, a=lmandel
toolkit/components/telemetry/TelemetryPing.jsm
--- a/toolkit/components/telemetry/TelemetryPing.jsm
+++ b/toolkit/components/telemetry/TelemetryPing.jsm
@@ -13,16 +13,17 @@ const myScope = this;
 
 Cu.import("resource://gre/modules/Log.jsm");
 Cu.import("resource://gre/modules/debug.js", this);
 Cu.import("resource://gre/modules/Services.jsm", this);
 Cu.import("resource://gre/modules/XPCOMUtils.jsm", this);
 Cu.import("resource://gre/modules/osfile.jsm", this);
 Cu.import("resource://gre/modules/Promise.jsm", this);
 Cu.import("resource://gre/modules/PromiseUtils.jsm", this);
+Cu.import("resource://gre/modules/Task.jsm", this);
 Cu.import("resource://gre/modules/DeferredTask.jsm", this);
 Cu.import("resource://gre/modules/Preferences.jsm");
 
 const LOGGER_NAME = "Toolkit.Telemetry";
 const LOGGER_PREFIX = "TelemetryPing::";
 
 const PREF_BRANCH = "toolkit.telemetry.";
 const PREF_BRANCH_LOG = PREF_BRANCH + "log.";
@@ -265,17 +266,22 @@ let Impl = {
   // Undefined if this is not the first run, or the previous build ID is unknown.
   _previousBuildID: undefined,
   _clientID: null,
   // A task performing delayed initialization
   _delayedInitTask: null,
   // The deferred promise resolved when the initialization task completes.
   _delayedInitTaskDeferred: null,
 
+  // This is a public barrier Telemetry clients can use to add blockers to the shutdown
+  // of TelemetryPing.
+  // After this barrier, clients can not submit Telemetry pings anymore.
   _shutdownBarrier: new AsyncShutdown.Barrier("TelemetryPing: Waiting for clients."),
+  // This is a private barrier blocked by pending async ping activity (sending & saving).
+  _connectionsBarrier: new AsyncShutdown.Barrier("TelemetryPing: Waiting for pending ping activity"),
 
   /**
    * Get the data for the "application" section of the ping.
    */
   _getApplicationSection: function() {
     // Querying architecture and update channel can throw. Make sure to recover and null
     // those fields.
     let arch = null;
@@ -369,16 +375,24 @@ let Impl = {
   /**
    * Only used in tests.
    */
   setServer: function (aServer) {
     this._server = aServer;
   },
 
   /**
+   * Track any pending ping send and save tasks through the promise passed here.
+   * This is needed to block shutdown on any outstanding ping activity.
+   */
+  _trackPendingPingTask: function (aPromise) {
+    this._connectionsBarrier.client.addBlocker("Waiting for ping task", aPromise);
+  },
+
+  /**
    * Adds a ping to the pending ping list by moving it to the saved pings directory
    * and adding it to the pending ping list.
    *
    * @param {String} aPingPath The path of the ping to add to the pending ping list.
    * @param {Boolean} [aRemoveOriginal] If true, deletes the ping at aPingPath after adding
    *                  it to the saved pings directory.
    * @return {Promise} Resolved when the ping is correctly moved to the saved pings directory.
    */
@@ -405,38 +419,49 @@ let Impl = {
    *                  environment data.
    *
    * @returns {Promise} A promise that resolves when the ping is sent.
    */
   send: function send(aType, aPayload, aOptions) {
     this._log.trace("send - Type " + aType + ", Server " + this._server +
                     ", aOptions " + JSON.stringify(aOptions));
 
-    return this.assemblePing(aType, aPayload, aOptions)
+    let promise = this.assemblePing(aType, aPayload, aOptions)
         .then(pingData => {
           // Once ping is assembled, send it along with the persisted ping in the backlog.
           let p = [
             // Persist the ping if sending it fails.
             this.doPing(pingData, false)
                 .catch(() => TelemetryFile.savePing(pingData, true)),
             this.sendPersistedPings(),
           ];
           return Promise.all(p);
         },
         error => this._log.error("send - Rejection", error));
+
+    this._trackPendingPingTask(promise);
+    return promise;
   },
 
   /**
    * Send the persisted pings to the server.
+   *
+   * @return Promise A promise that is resolved when all pings finished sending or failed.
    */
   sendPersistedPings: function sendPersistedPings() {
     this._log.trace("sendPersistedPings");
+
     let pingsIterator = Iterator(this.popPayloads());
-    let p = [data for (data in pingsIterator)].map(data => this.doPing(data, true));
-    return Promise.all(p);
+    let p = [for (data of pingsIterator) this.doPing(data, true).catch((e) => {
+      this._log.error("sendPersistedPings - doPing rejected", e);
+    })];
+
+    let promise = Promise.all(p);
+    this._trackPendingPingTask(promise);
+    return promise;
   },
 
   /**
    * Saves all the pending pings, plus the passed one, to disk.
    *
    * @param {String} aType The type of the ping.
    * @param {Object} aPayload The actual data payload for the ping.
    * @param {Object} aOptions Options object.
@@ -783,51 +808,64 @@ let Impl = {
     AsyncShutdown.sendTelemetry.addBlocker("TelemetryPing: shutting down",
                                            () => this.shutdown(),
                                            () => this._getState());
 
     this._delayedInitTask.arm();
     return this._delayedInitTaskDeferred.promise;
   },
 
+  // Do proper shutdown waiting and cleanup.
+  _cleanupOnShutdown: Task.async(function*() {
+    if (!this._initialized) {
+      return;
+    }
+
+    try {
+      // First wait for clients processing shutdown.
+      yield this._shutdownBarrier.wait();
+      // Then wait for any outstanding async ping activity.
+      yield this._connectionsBarrier.wait();
+
+      // Should down dependent components.
+      try {
+        yield TelemetryEnvironment.shutdown();
+      } catch (e) {
+        this._log.error("shutdown - environment shutdown failure", e);
+      }
+    } finally {
+      // Reset state.
+      this._initialized = false;
+      this._initStarted = false;
+    }
+  }),
+
   shutdown: function() {
     this._log.trace("shutdown");
 
-    let cleanup = () => {
-      if (!this._initialized) {
-        return;
-      }
-      let reset = () => {
-        this._initialized = false;
-        this._initStarted = false;
-      };
-      return this._shutdownBarrier.wait().then(
-               () => TelemetryEnvironment.shutdown().then(reset, reset));
-    };
-
     // We can be in one the following states here:
     // 1) setupTelemetry was never called
     // or it was called and
     //   2) _delayedInitTask was scheduled, but didn't run yet.
     //   3) _delayedInitTask is running now.
     //   4) _delayedInitTask finished running already.
 
     // This handles 1).
     if (!this._initStarted) {
       return Promise.resolve();
     }
 
     // This handles 4).
     if (!this._delayedInitTask) {
       // We already ran the delayed initialization.
-      return cleanup();
+      return this._cleanupOnShutdown();
     }
 
     // This handles 2) and 3).
-    return this._delayedInitTask.finalize().then(cleanup);
+    return this._delayedInitTask.finalize().then(() => this._cleanupOnShutdown());
   },
 
   /**
    * This observer drives telemetry.
    */
   observe: function (aSubject, aTopic, aData) {
     // The logger might still be not available at this point.
     if (!this._log) {
@@ -862,11 +900,13 @@ let Impl = {
   /**
    * Get an object describing the current state of this module for AsyncShutdown diagnostics.
    */
   _getState: function() {
     return {
       initialized: this._initialized,
       initStarted: this._initStarted,
       haveDelayedInitTask: !!this._delayedInitTask,
+      shutdownBarrier: this._shutdownBarrier.state,
+      connectionsBarrier: this._connectionsBarrier.state,
     };
   },
 };