Bug 1246066 - Clear PushService timeout tasks on uninitialization. r=kitcambridge
authorWilliam Chen <wchen@mozilla.com>
Wed, 16 Mar 2016 10:03:02 -0700
changeset 289001 605b761af96ff00b8c23d5285ee1096e8424f700
parent 289000 824d6ff63d10b7cfae8e849d19e59ef99ffb160d
child 289002 480b307d65e6d1cd9bd0132884d8b66f3611ba81
push id73662
push userwchen@mozilla.com
push dateWed, 16 Mar 2016 17:03:27 +0000
treeherdermozilla-inbound@605b761af96f [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerskitcambridge
bugs1246066
milestone48.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1246066 - Clear PushService timeout tasks on uninitialization. r=kitcambridge
dom/push/PushService.jsm
dom/push/PushServiceHttp2.jsm
--- a/dom/push/PushService.jsm
+++ b/dom/push/PushService.jsm
@@ -101,16 +101,19 @@ this.PushService = {
   _db: null,
   _options: null,
   _visibleNotifications: new Map(),
 
   // Callback that is called after attempting to
   // reduce the quota for a record. Used for testing purposes.
   _updateQuotaTestCallback: null,
 
+  // Set of timeout ID of tasks to reduce quota.
+  _updateQuotaTimeouts: new Set(),
+
   // When serverURI changes (this is used for testing), db is cleaned up and a
   // a new db is started. This events must be sequential.
   _stateChangeProcessQueue: null,
   _stateChangeProcessEnqueue: function(op) {
     if (!this._stateChangeProcessQueue) {
       this._stateChangeProcessQueue = Promise.resolve();
     }
 
@@ -234,17 +237,17 @@ this.PushService = {
 
   observe: function observe(aSubject, aTopic, aData) {
     switch (aTopic) {
       /*
        * We need to call uninit() on shutdown to clean up things that modules
        * aren't very good at automatically cleaning up, so we don't get shutdown
        * leaks on browser shutdown.
        */
-      case "xpcom-shutdown":
+      case "quit-application":
         this.uninit();
         break;
       case "network-active-changed":         /* On B2G. */
       case "network:offline-status-changed": /* On desktop. */
         this._stateChangeProcessEnqueue(_ =>
           this._changeStateOfflineEvent(aData === "offline", false)
         );
         break;
@@ -418,17 +421,17 @@ this.PushService = {
             return this._stopService(STOPPING_SERVICE_EVENT);
           }
         }
     }
   },
 
   /**
    * PushService initialization is divided into 4 parts:
-   * init() - start listening for xpcom-shutdown and serverURL changes.
+   * init() - start listening for quit-application and serverURL changes.
    *          state is change to PUSH_SERVICE_INIT
    * startService() - if serverURL is present this function is called. It starts
    *                  listening for broadcasted messages, starts db and
    *                  PushService connection (WebSocket).
    *                  state is change to PUSH_SERVICE_ACTIVATING.
    * startObservers() - start other observers.
    * changeStateConnectionEnabledEvent  - checks prefs and offline state.
    *                                      It changes state to:
@@ -440,17 +443,17 @@ this.PushService = {
     console.debug("init()");
 
     if (this._state > PUSH_SERVICE_UNINIT) {
       return;
     }
 
     this._setState(PUSH_SERVICE_ACTIVATING);
 
-    Services.obs.addObserver(this, "xpcom-shutdown", false);
+    Services.obs.addObserver(this, "quit-application", false);
 
     if (options.serverURI) {
       // this is use for xpcshell test.
 
       let [service, uri] = this._findService(options.serverURI);
       if (!service) {
         this._setState(PUSH_SERVICE_INIT);
         return;
@@ -536,32 +539,35 @@ this.PushService = {
   },
 
   /**
    * PushService uninitialization is divided into 3 parts:
    * stopObservers() - stot observers started in startObservers.
    * stopService() - It stops listening for broadcasted messages, stops db and
    *                 PushService connection (WebSocket).
    *                 state is changed to PUSH_SERVICE_INIT.
-   * uninit() - stop listening for xpcom-shutdown and serverURL changes.
+   * uninit() - stop listening for quit-application and serverURL changes.
    *            state is change to PUSH_SERVICE_UNINIT
    */
   _stopService: function(event) {
     console.debug("stopService()");
 
     if (this._state < PUSH_SERVICE_ACTIVATING) {
       return;
     }
 
     this._stopObservers();
 
     this._service.disconnect();
     this._service.uninit();
     this._service = null;
 
+    this._updateQuotaTimeouts.forEach((timeoutID) => clearTimeout(timeoutID));
+    this._updateQuotaTimeouts.clear();
+
     if (!this._db) {
       return Promise.resolve();
     }
     if (event == UNINIT_EVENT) {
       // If it is uninitialized just close db.
       this._db.close();
       this._db = null;
       return Promise.resolve();
@@ -595,17 +601,17 @@ this.PushService = {
   uninit: function() {
     console.debug("uninit()");
 
     if (this._state == PUSH_SERVICE_UNINIT) {
       return;
     }
 
     prefs.ignore("serverURL", this);
-    Services.obs.removeObserver(this, "xpcom-shutdown");
+    Services.obs.removeObserver(this, "quit-application");
 
     this._stateChangeProcessEnqueue(_ =>
       {
         this._changeServerURL("", UNINIT_EVENT);
         this._setState(PUSH_SERVICE_UNINIT);
         console.debug("uninit: shutdown complete!");
       });
   },
@@ -796,18 +802,24 @@ this.PushService = {
         decodedPromise = Promise.resolve(null);
       }
       return decodedPromise.then(message => {
         if (shouldNotify) {
           notified = this._notifyApp(record, message);
         }
         // Update quota after the delay, at which point
         // we check for visible notifications.
-        setTimeout(() => this._updateQuota(keyID),
-          prefs.get("quotaUpdateDelay"));
+        let timeoutID = setTimeout(_ =>
+          {
+            this._updateQuota(keyID);
+            if (!this._updateQuotaTimeouts.delete(timeoutID)) {
+              console.debug("receivedPushMessage: quota update timeout missing?");
+            }
+          }, prefs.get("quotaUpdateDelay"));
+        this._updateQuotaTimeouts.add(timeoutID);
         return notified;
       }, error => {
         console.error("receivedPushMessage: Error decrypting message", error);
       });
     }).catch(error => {
       console.error("receivedPushMessage: Error notifying app", error);
     });
   },
--- a/dom/push/PushServiceHttp2.jsm
+++ b/dom/push/PushServiceHttp2.jsm
@@ -214,16 +214,17 @@ var SubscriptionListener = function(aSub
   console.debug("SubscriptionListener()");
   this._subInfo = aSubInfo;
   this._resolve = aResolve;
   this._reject = aReject;
   this._data = '';
   this._serverURI = aServerURI;
   this._service = aPushServiceHttp2;
   this._ctime = Date.now();
+  this._retryTimeoutID = null;
 };
 
 SubscriptionListener.prototype = {
 
   onStartRequest: function(aRequest, aContext) {},
 
   onDataAvailable: function(aRequest, aContext, aStream, aOffset, aCount) {
     console.debug("SubscriptionListener: onDataAvailable()");
@@ -256,22 +257,27 @@ SubscriptionListener.prototype = {
     }
 
     var statusCode = aRequest.QueryInterface(Ci.nsIHttpChannel).responseStatus;
 
     if (Math.floor(statusCode / 100) == 5) {
       if (this._subInfo.retries < prefs.get("http2.maxRetries")) {
         this._subInfo.retries++;
         var retryAfter = retryAfterParser(aRequest);
-        setTimeout(_ => this._reject(
+        this._retryTimeoutID = setTimeout(_ =>
           {
-            retry: true,
-            subInfo: this._subInfo
-          }),
-          retryAfter);
+            this._reject(
+              {
+                retry: true,
+                subInfo: this._subInfo
+              });
+            this._service.removeListenerPendingRetry(this);
+            this._retryTimeoutID = null;
+          }, retryAfter);
+        this._service.addListenerPendingRetry(this);
       } else {
         this._reject(new Error("Unexpected server response: " + statusCode));
       }
       return;
     } else if (statusCode != 201) {
       this._reject(new Error("Unexpected server response: " + statusCode));
       return;
     }
@@ -324,16 +330,25 @@ SubscriptionListener.prototype = {
       originAttributes: this._subInfo.record.originAttributes,
       systemRecord: this._subInfo.record.systemRecord,
       ctime: Date.now(),
     });
 
     Services.telemetry.getHistogramById("PUSH_API_SUBSCRIBE_HTTP2_TIME").add(Date.now() - this._ctime);
     this._resolve(reply);
   },
+
+  abortRetry: function() {
+    if (this._retryTimeoutID != null) {
+      clearTimeout(this._retryTimeoutID);
+      this._retryTimeoutID = null;
+    } else {
+      console.debug("SubscriptionListener.abortRetry: aborting non-existent retry?");
+    }
+  },
 };
 
 function retryAfterParser(aRequest) {
   var retryAfter = 0;
   try {
     var retryField = aRequest.getResponseHeader("retry-after");
     if (isNaN(retryField)) {
       retryAfter = Date.parse(retryField) - (new Date().getTime());
@@ -397,16 +412,19 @@ function linkParser(linkHeader, serverUR
 this.PushServiceHttp2 = {
   _mainPushService: null,
   _serverURI: null,
 
   // Keep information about all connections, e.g. the channel, listener...
   _conns: {},
   _started: false,
 
+  // Set of SubscriptionListeners that are pending a subscription retry attempt.
+  _listenersPendingRetry: new Set(),
+
   newPushDB: function() {
     return new PushDB(kPUSHHTTP2DB_DB_NAME,
                       kPUSHHTTP2DB_DB_VERSION,
                       kPUSHHTTP2DB_STORE_NAME,
                       "subscriptionUri",
                       PushRecordHttp2);
   },
 
@@ -673,20 +691,25 @@ this.PushServiceHttp2 = {
         } catch (e) {}
       }
       delete this._conns[aSubscriptionUri];
     }
   },
 
   uninit: function() {
     console.debug("uninit()");
+    this._abortPendingSubscriptionRetries();
     this._shutdownConnections(true);
     this._mainPushService = null;
   },
 
+  _abortPendingSubscriptionRetries: function() {
+    this._listenersPendingRetry.forEach((listener) => listener.abortRetry());
+    this._listenersPendingRetry.clear();
+  },
 
   request: function(action, aRecord) {
     switch (action) {
       case "register":
         return this._subscribeResource(aRecord);
      case "unregister":
         this._shutdownSubscription(aRecord.subscriptionUri);
         return this._unsubscribeResource(aRecord.subscriptionUri);
@@ -747,16 +770,26 @@ this.PushServiceHttp2 = {
       this._resubscribe(aSubscriptionUri);
     } else if (Math.floor(aRequest.responseStatus / 100) == 2) { // This should be 204
       setTimeout(_ => this._listenForMsgs(aSubscriptionUri), 0);
     } else {
       this._retryAfterBackoff(aSubscriptionUri, -1);
     }
   },
 
+  addListenerPendingRetry: function(aListener) {
+    this._listenersPendingRetry.add(aListener);
+  },
+
+  removeListenerPendingRetry: function(aListener) {
+    if (!this._listenersPendingRetry.remove(aListener)) {
+      console.debug("removeListenerPendingRetry: listener not in list?");
+    }
+  },
+
   _pushChannelOnStop: function(aUri, aAckUri, aMessage, cryptoParams) {
     console.debug("pushChannelOnStop()");
 
     this._mainPushService.receivedPushMessage(
       aUri, aMessage, cryptoParams, record => {
         // Always update the stored record.
         return record;
       }