author | William Chen <wchen@mozilla.com> |
Wed, 16 Mar 2016 10:03:02 -0700 | |
changeset 289001 | 605b761af96ff00b8c23d5285ee1096e8424f700 |
parent 289000 | 824d6ff63d10b7cfae8e849d19e59ef99ffb160d |
child 289002 | 480b307d65e6d1cd9bd0132884d8b66f3611ba81 |
push id | 73662 |
push user | wchen@mozilla.com |
push date | Wed, 16 Mar 2016 17:03:27 +0000 |
treeherder | mozilla-inbound@605b761af96f [default view] [failures only] |
perfherder | [talos] [build metrics] [platform microbench] (compared to previous push) |
reviewers | kitcambridge |
bugs | 1246066 |
milestone | 48.0a1 |
first release with | nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
|
last release without | nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
|
dom/push/PushService.jsm | file | annotate | diff | comparison | revisions | |
dom/push/PushServiceHttp2.jsm | file | annotate | diff | comparison | revisions |
--- a/dom/push/PushService.jsm +++ b/dom/push/PushService.jsm @@ -101,16 +101,19 @@ this.PushService = { _db: null, _options: null, _visibleNotifications: new Map(), // Callback that is called after attempting to // reduce the quota for a record. Used for testing purposes. _updateQuotaTestCallback: null, + // Set of timeout ID of tasks to reduce quota. + _updateQuotaTimeouts: new Set(), + // When serverURI changes (this is used for testing), db is cleaned up and a // a new db is started. This events must be sequential. _stateChangeProcessQueue: null, _stateChangeProcessEnqueue: function(op) { if (!this._stateChangeProcessQueue) { this._stateChangeProcessQueue = Promise.resolve(); } @@ -234,17 +237,17 @@ this.PushService = { observe: function observe(aSubject, aTopic, aData) { switch (aTopic) { /* * We need to call uninit() on shutdown to clean up things that modules * aren't very good at automatically cleaning up, so we don't get shutdown * leaks on browser shutdown. */ - case "xpcom-shutdown": + case "quit-application": this.uninit(); break; case "network-active-changed": /* On B2G. */ case "network:offline-status-changed": /* On desktop. */ this._stateChangeProcessEnqueue(_ => this._changeStateOfflineEvent(aData === "offline", false) ); break; @@ -418,17 +421,17 @@ this.PushService = { return this._stopService(STOPPING_SERVICE_EVENT); } } } }, /** * PushService initialization is divided into 4 parts: - * init() - start listening for xpcom-shutdown and serverURL changes. + * init() - start listening for quit-application and serverURL changes. * state is change to PUSH_SERVICE_INIT * startService() - if serverURL is present this function is called. It starts * listening for broadcasted messages, starts db and * PushService connection (WebSocket). * state is change to PUSH_SERVICE_ACTIVATING. * startObservers() - start other observers. * changeStateConnectionEnabledEvent - checks prefs and offline state. * It changes state to: @@ -440,17 +443,17 @@ this.PushService = { console.debug("init()"); if (this._state > PUSH_SERVICE_UNINIT) { return; } this._setState(PUSH_SERVICE_ACTIVATING); - Services.obs.addObserver(this, "xpcom-shutdown", false); + Services.obs.addObserver(this, "quit-application", false); if (options.serverURI) { // this is use for xpcshell test. let [service, uri] = this._findService(options.serverURI); if (!service) { this._setState(PUSH_SERVICE_INIT); return; @@ -536,32 +539,35 @@ this.PushService = { }, /** * PushService uninitialization is divided into 3 parts: * stopObservers() - stot observers started in startObservers. * stopService() - It stops listening for broadcasted messages, stops db and * PushService connection (WebSocket). * state is changed to PUSH_SERVICE_INIT. - * uninit() - stop listening for xpcom-shutdown and serverURL changes. + * uninit() - stop listening for quit-application and serverURL changes. * state is change to PUSH_SERVICE_UNINIT */ _stopService: function(event) { console.debug("stopService()"); if (this._state < PUSH_SERVICE_ACTIVATING) { return; } this._stopObservers(); this._service.disconnect(); this._service.uninit(); this._service = null; + this._updateQuotaTimeouts.forEach((timeoutID) => clearTimeout(timeoutID)); + this._updateQuotaTimeouts.clear(); + if (!this._db) { return Promise.resolve(); } if (event == UNINIT_EVENT) { // If it is uninitialized just close db. this._db.close(); this._db = null; return Promise.resolve(); @@ -595,17 +601,17 @@ this.PushService = { uninit: function() { console.debug("uninit()"); if (this._state == PUSH_SERVICE_UNINIT) { return; } prefs.ignore("serverURL", this); - Services.obs.removeObserver(this, "xpcom-shutdown"); + Services.obs.removeObserver(this, "quit-application"); this._stateChangeProcessEnqueue(_ => { this._changeServerURL("", UNINIT_EVENT); this._setState(PUSH_SERVICE_UNINIT); console.debug("uninit: shutdown complete!"); }); }, @@ -796,18 +802,24 @@ this.PushService = { decodedPromise = Promise.resolve(null); } return decodedPromise.then(message => { if (shouldNotify) { notified = this._notifyApp(record, message); } // Update quota after the delay, at which point // we check for visible notifications. - setTimeout(() => this._updateQuota(keyID), - prefs.get("quotaUpdateDelay")); + let timeoutID = setTimeout(_ => + { + this._updateQuota(keyID); + if (!this._updateQuotaTimeouts.delete(timeoutID)) { + console.debug("receivedPushMessage: quota update timeout missing?"); + } + }, prefs.get("quotaUpdateDelay")); + this._updateQuotaTimeouts.add(timeoutID); return notified; }, error => { console.error("receivedPushMessage: Error decrypting message", error); }); }).catch(error => { console.error("receivedPushMessage: Error notifying app", error); }); },
--- a/dom/push/PushServiceHttp2.jsm +++ b/dom/push/PushServiceHttp2.jsm @@ -214,16 +214,17 @@ var SubscriptionListener = function(aSub console.debug("SubscriptionListener()"); this._subInfo = aSubInfo; this._resolve = aResolve; this._reject = aReject; this._data = ''; this._serverURI = aServerURI; this._service = aPushServiceHttp2; this._ctime = Date.now(); + this._retryTimeoutID = null; }; SubscriptionListener.prototype = { onStartRequest: function(aRequest, aContext) {}, onDataAvailable: function(aRequest, aContext, aStream, aOffset, aCount) { console.debug("SubscriptionListener: onDataAvailable()"); @@ -256,22 +257,27 @@ SubscriptionListener.prototype = { } var statusCode = aRequest.QueryInterface(Ci.nsIHttpChannel).responseStatus; if (Math.floor(statusCode / 100) == 5) { if (this._subInfo.retries < prefs.get("http2.maxRetries")) { this._subInfo.retries++; var retryAfter = retryAfterParser(aRequest); - setTimeout(_ => this._reject( + this._retryTimeoutID = setTimeout(_ => { - retry: true, - subInfo: this._subInfo - }), - retryAfter); + this._reject( + { + retry: true, + subInfo: this._subInfo + }); + this._service.removeListenerPendingRetry(this); + this._retryTimeoutID = null; + }, retryAfter); + this._service.addListenerPendingRetry(this); } else { this._reject(new Error("Unexpected server response: " + statusCode)); } return; } else if (statusCode != 201) { this._reject(new Error("Unexpected server response: " + statusCode)); return; } @@ -324,16 +330,25 @@ SubscriptionListener.prototype = { originAttributes: this._subInfo.record.originAttributes, systemRecord: this._subInfo.record.systemRecord, ctime: Date.now(), }); Services.telemetry.getHistogramById("PUSH_API_SUBSCRIBE_HTTP2_TIME").add(Date.now() - this._ctime); this._resolve(reply); }, + + abortRetry: function() { + if (this._retryTimeoutID != null) { + clearTimeout(this._retryTimeoutID); + this._retryTimeoutID = null; + } else { + console.debug("SubscriptionListener.abortRetry: aborting non-existent retry?"); + } + }, }; function retryAfterParser(aRequest) { var retryAfter = 0; try { var retryField = aRequest.getResponseHeader("retry-after"); if (isNaN(retryField)) { retryAfter = Date.parse(retryField) - (new Date().getTime()); @@ -397,16 +412,19 @@ function linkParser(linkHeader, serverUR this.PushServiceHttp2 = { _mainPushService: null, _serverURI: null, // Keep information about all connections, e.g. the channel, listener... _conns: {}, _started: false, + // Set of SubscriptionListeners that are pending a subscription retry attempt. + _listenersPendingRetry: new Set(), + newPushDB: function() { return new PushDB(kPUSHHTTP2DB_DB_NAME, kPUSHHTTP2DB_DB_VERSION, kPUSHHTTP2DB_STORE_NAME, "subscriptionUri", PushRecordHttp2); }, @@ -673,20 +691,25 @@ this.PushServiceHttp2 = { } catch (e) {} } delete this._conns[aSubscriptionUri]; } }, uninit: function() { console.debug("uninit()"); + this._abortPendingSubscriptionRetries(); this._shutdownConnections(true); this._mainPushService = null; }, + _abortPendingSubscriptionRetries: function() { + this._listenersPendingRetry.forEach((listener) => listener.abortRetry()); + this._listenersPendingRetry.clear(); + }, request: function(action, aRecord) { switch (action) { case "register": return this._subscribeResource(aRecord); case "unregister": this._shutdownSubscription(aRecord.subscriptionUri); return this._unsubscribeResource(aRecord.subscriptionUri); @@ -747,16 +770,26 @@ this.PushServiceHttp2 = { this._resubscribe(aSubscriptionUri); } else if (Math.floor(aRequest.responseStatus / 100) == 2) { // This should be 204 setTimeout(_ => this._listenForMsgs(aSubscriptionUri), 0); } else { this._retryAfterBackoff(aSubscriptionUri, -1); } }, + addListenerPendingRetry: function(aListener) { + this._listenersPendingRetry.add(aListener); + }, + + removeListenerPendingRetry: function(aListener) { + if (!this._listenersPendingRetry.remove(aListener)) { + console.debug("removeListenerPendingRetry: listener not in list?"); + } + }, + _pushChannelOnStop: function(aUri, aAckUri, aMessage, cryptoParams) { console.debug("pushChannelOnStop()"); this._mainPushService.receivedPushMessage( aUri, aMessage, cryptoParams, record => { // Always update the stored record. return record; }