Bug 1206163 - Retry failed register requests on reconnect. r=dragana
authorKit Cambridge <kcambridge@mozilla.com>
Thu, 22 Oct 2015 10:14:43 -0600
changeset 304152 32c3f77b40671d80e6fcbcedf7f6b69c6e5c7121
parent 304151 b859d76b04516eac20835c742573c3e53b5e26c0
child 304153 0c702af0b7328dbeb050812f64df016efd375925
push id1001
push userraliiev@mozilla.com
push dateMon, 18 Jan 2016 19:06:03 +0000
treeherdermozilla-release@8b89261f3ac4 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersdragana
bugs1206163
milestone44.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1206163 - Retry failed register requests on reconnect. r=dragana
dom/push/PushServiceWebSocket.jsm
dom/push/test/xpcshell/test_reconnect_retry.js
dom/push/test/xpcshell/xpcshell.ini
--- a/dom/push/PushServiceWebSocket.jsm
+++ b/dom/push/PushServiceWebSocket.jsm
@@ -168,42 +168,41 @@ this.PushServiceWebSocket = {
         gDebuggingEnabled = prefs.get("debug");
       } else if (aData == "dom.push.userAgentID") {
         this._shutdownWS();
         this._reconnectAfterBackoff();
       }
       break;
     case "timer-callback":
       if (aSubject == this._requestTimeoutTimer) {
-        if (Object.keys(this._pendingRequests).length === 0) {
+        if (Object.keys(this._registerRequests).length === 0) {
           this._requestTimeoutTimer.cancel();
         }
 
         // Set to true if at least one request timed out.
         let requestTimedOut = false;
-        for (let channelID in this._pendingRequests) {
-          let duration = Date.now() - this._pendingRequests[channelID].ctime;
+        for (let channelID in this._registerRequests) {
+          let duration = Date.now() - this._registerRequests[channelID].ctime;
           // If any of the registration requests time out, all the ones after it
           // also made to fail, since we are going to be disconnecting the
           // socket.
           if (requestTimedOut || duration > this._requestTimeout) {
             debug("Request timeout: Removing " + channelID);
             requestTimedOut = true;
-            this._pendingRequests[channelID]
+            this._registerRequests[channelID]
               .reject({status: 0, error: "TimeoutError"});
 
-            delete this._pendingRequests[channelID];
+            delete this._registerRequests[channelID];
           }
         }
 
         // The most likely reason for a registration request timing out is
         // that the socket has disconnected. Best to reconnect.
         if (requestTimedOut) {
-          this._shutdownWS();
-          this._reconnectAfterBackoff();
+          this._reconnect();
         }
       }
       break;
     }
   },
 
   checkServerURI: function(serverURL) {
     if (!serverURL) {
@@ -237,17 +236,17 @@ this.PushServiceWebSocket = {
             ". Not updating userAgentID");
       return;
     }
     debug("New _UAID: " + newID);
     prefs.set("userAgentID", newID);
   },
 
   _ws: null,
-  _pendingRequests: {},
+  _registerRequests: {},
   _currentState: STATE_SHUT_DOWN,
   _requestTimeout: 0,
   _requestTimeoutTimer: null,
   _retryFailCount: 0,
 
   /**
    * According to the WS spec, servers should immediately close the underlying
    * TCP connection after they close a WebSocket. This causes wsOnStop to be
@@ -339,17 +338,23 @@ this.PushServiceWebSocket = {
 
     this._requestTimeout = prefs.get("requestTimeout");
     this._adaptiveEnabled = prefs.get('adaptive.enabled');
     this._upperLimit = prefs.get('adaptive.upperLimit');
     gDebuggingEnabled = prefs.get("debug");
     prefs.observe("debug", this);
   },
 
-  _shutdownWS: function() {
+  _reconnect: function () {
+    debug("reconnect()");
+    this._shutdownWS(false);
+    this._reconnectAfterBackoff();
+  },
+
+  _shutdownWS: function(shouldCancelPending = true) {
     debug("shutdownWS()");
     this._currentState = STATE_SHUT_DOWN;
     this._willBeWokenUpByUDP = false;
 
     prefs.ignore("userAgentID", this);
 
     if (this._wsListener) {
       this._wsListener._pushService = null;
@@ -361,17 +366,19 @@ this.PushServiceWebSocket = {
 
     this._waitingForPong = false;
     if (this._mainPushService) {
       this._mainPushService.stopAlarm();
     } else {
       dump("This should not happend");
     }
 
-    this._cancelPendingRequests();
+    if (shouldCancelPending) {
+      this._cancelRegisterRequests();
+    }
 
     if (this._notifyRequestQueue) {
       this._notifyRequestQueue();
       this._notifyRequestQueue = null;
     }
   },
 
   uninit: function() {
@@ -631,18 +638,17 @@ this.PushServiceWebSocket = {
     try {
       // Grab a wakelock before we open the socket to ensure we don't go to
       // sleep before connection the is opened.
       this._ws.asyncOpen(uri, uri.spec, this._wsListener, null);
       this._acquireWakeLock();
       this._currentState = STATE_WAITING_FOR_WS_START;
     } catch(e) {
       debug("Error opening websocket. asyncOpen failed!");
-      this._shutdownWS();
-      this._reconnectAfterBackoff();
+      this._reconnect();
     }
   },
 
   connect: function(records) {
     debug("connect");
     // Check to see if we need to do anything.
     if (records.length > 0) {
       this._beginWSSetup();
@@ -679,18 +685,17 @@ this.PushServiceWebSocket = {
    *    connection being up, the ping alarm is reset every time data is
    *    received.
    */
   onAlarmFired: function() {
     // Conditions are arranged in decreasing specificity.
     // i.e. when _waitingForPong is true, other conditions are also true.
     if (this._waitingForPong) {
       debug("Did not receive pong in time. Reconnecting WebSocket.");
-      this._shutdownWS();
-      this._reconnectAfterBackoff();
+      this._reconnect();
     }
     else if (this._currentState == STATE_READY) {
       // Send a ping.
       // Bypass the queue; we don't want this to be kept pending.
       // Watch out for exception in case the socket has disconnected.
       // When this happens, we pretend the ping was sent and don't specially
       // handle the exception, as the lack of a pong will lead to the socket
       // being reset.
@@ -786,39 +791,40 @@ this.PushServiceWebSocket = {
     // To avoid sticking extra large values sent by an evil server into prefs.
     if (reply.uaid.length > 128) {
       debug("UAID received from server was too long: " +
             reply.uaid);
       this._shutdownWS();
       return;
     }
 
-    let notifyRequestQueue = () => {
+    let sendRequests = () => {
       if (this._notifyRequestQueue) {
         this._notifyRequestQueue();
         this._notifyRequestQueue = null;
       }
+      this._sendRegisterRequests();
     };
 
     function finishHandshake() {
       this._UAID = reply.uaid;
       this._currentState = STATE_READY;
       prefs.observe("userAgentID", this);
 
       this._dataEnabled = !!reply.use_webpush;
       if (this._dataEnabled) {
         this._mainPushService.getAllUnexpired().then(records =>
           Promise.all(records.map(record =>
             this._mainPushService.ensureP256dhKey(record).catch(error => {
               debug("finishHandshake: Error updating record " + record.keyID);
             })
           ))
-        ).then(notifyRequestQueue);
+        ).then(sendRequests);
       } else {
-        notifyRequestQueue();
+        sendRequests();
       }
     }
 
     // By this point we've got a UAID from the server that we are ready to
     // accept.
     //
     // We unconditionally drop all existing registrations and notify service
     // workers if we receive a new UAID. This ensures we expunge all stale
@@ -837,23 +843,23 @@ this.PushServiceWebSocket = {
   },
 
   /**
    * Protocol handler invoked by server message.
    */
   _handleRegisterReply: function(reply) {
     debug("handleRegisterReply()");
     if (typeof reply.channelID !== "string" ||
-        typeof this._pendingRequests[reply.channelID] !== "object") {
+        typeof this._registerRequests[reply.channelID] !== "object") {
       return;
     }
 
-    let tmp = this._pendingRequests[reply.channelID];
-    delete this._pendingRequests[reply.channelID];
-    if (Object.keys(this._pendingRequests).length === 0 &&
+    let tmp = this._registerRequests[reply.channelID];
+    delete this._registerRequests[reply.channelID];
+    if (Object.keys(this._registerRequests).length === 0 &&
         this._requestTimeoutTimer) {
       this._requestTimeoutTimer.cancel();
     }
 
     if (reply.status == 200) {
       try {
         Services.io.newURI(reply.pushEndpoint, null, null);
       }
@@ -972,33 +978,33 @@ this.PushServiceWebSocket = {
                           .getService(Ci.nsIUUIDGenerator);
     // generateUUID() gives a UUID surrounded by {...}, slice them off.
     return uuidGenerator.generateUUID().toString().slice(1, -1);
   },
 
   request: function(action, record) {
     debug("request() " + action);
 
-    if (Object.keys(this._pendingRequests).length === 0) {
+    if (Object.keys(this._registerRequests).length === 0) {
       // start the timer since we now have at least one request
       if (!this._requestTimeoutTimer) {
         this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"]
                                       .createInstance(Ci.nsITimer);
       }
       this._requestTimeoutTimer.init(this,
                                      this._requestTimeout,
                                      Ci.nsITimer.TYPE_REPEATING_SLACK);
     }
 
     if (action == "register") {
       let data = {channelID: this._generateID(),
                   messageType: action};
 
       return new Promise((resolve, reject) => {
-        this._pendingRequests[data.channelID] = {record: record,
+        this._registerRequests[data.channelID] = {record: record,
                                                  resolve: resolve,
                                                  reject: reject,
                                                  ctime: Date.now()
                                                 };
         this._queueRequest(data);
       }).then(record => {
         if (!this._dataEnabled) {
           return record;
@@ -1015,49 +1021,60 @@ this.PushServiceWebSocket = {
     this._queueRequest({channelID: record.channelID,
                         messageType: action});
     return Promise.resolve();
   },
 
   _queueStart: Promise.resolve(),
   _notifyRequestQueue: null,
   _queue: null,
-  _enqueue: function(op, errop) {
+  _enqueue: function(op) {
     debug("enqueue");
     if (!this._queue) {
       this._queue = this._queueStart;
     }
     this._queue = this._queue
                     .then(op)
                     .catch(_ => {});
   },
 
   _send(data) {
     if (this._currentState == STATE_READY) {
       if (data.messageType != "register" ||
-        typeof this._pendingRequests[data.channelID] == "object") {
+        typeof this._registerRequests[data.channelID] == "object") {
 
         // check if request has not been cancelled
         this._wsSendMessage(data);
       }
     }
   },
 
+  _sendRegisterRequests() {
+    this._enqueue(_ => Promise.all(Object.keys(this._registerRequests).map(channelID =>
+      this._send({
+        messageType: "register",
+        channelID: channelID,
+      })
+    )));
+  },
+
   _queueRequest(data) {
-    if (this._currentState != STATE_READY) {
-      if (!this._notifyRequestQueue) {
+    if (data.messageType != "register") {
+      if (this._currentState != STATE_READY && !this._notifyRequestQueue) {
         let promise = new Promise((resolve, reject) => {
           this._notifyRequestQueue = resolve;
         });
         this._enqueue(_ => promise);
       }
 
+      this._enqueue(_ => this._send(data));
+    } else if (this._currentState == STATE_READY) {
+      this._send(data);
     }
 
-    this._enqueue(_ => this._send(data));
     if (!this._ws) {
       // This will end up calling notifyRequestQueue().
       this._beginWSSetup();
       // If beginWSSetup does not succeed to make ws, notifyRequestQueue will
       // not be call.
       if (!this._ws && this._notifyRequestQueue) {
         this._notifyRequestQueue();
         this._notifyRequestQueue = null;
@@ -1145,23 +1162,20 @@ this.PushServiceWebSocket = {
    */
   _wsOnStop: function(context, statusCode) {
     debug("wsOnStop()");
     this._releaseWakeLock();
 
     if (statusCode != Cr.NS_OK &&
         !(statusCode == Cr.NS_BASE_STREAM_CLOSED && this._willBeWokenUpByUDP)) {
       debug("Socket error " + statusCode);
-      this._reconnectAfterBackoff();
+      this._reconnect();
+      return;
     }
 
-    // Bug 896919. We always shutdown the WebSocket, even if we need to
-    // reconnect. This works because _reconnectAfterBackoff() is "async"
-    // (there is a minimum delay of the pref retryBaseInterval, which by default
-    // is 5000ms), so that function will open the WebSocket again.
     this._shutdownWS();
   },
 
   _wsOnMessageAvailable: function(context, message) {
     debug("wsOnMessageAvailable() " + message);
 
     this._waitingForPong = false;
 
@@ -1240,22 +1254,22 @@ this.PushServiceWebSocket = {
     if (aStatusCode == kUDP_WAKEUP_WS_STATUS_CODE) {
       debug("Server closed with promise to wake up");
       this._willBeWokenUpByUDP = true;
       // TODO: there should be no pending requests
     }
   },
 
   /**
-   * Rejects all pending requests with errors.
+   * Rejects all pending register requests with errors.
    */
-  _cancelPendingRequests: function() {
-    for (let channelID in this._pendingRequests) {
-      let request = this._pendingRequests[channelID];
-      delete this._pendingRequests[channelID];
+  _cancelRegisterRequests: function() {
+    for (let channelID in this._registerRequests) {
+      let request = this._registerRequests[channelID];
+      delete this._registerRequests[channelID];
       request.reject({status: 0, error: "AbortError"});
     }
   },
 
   _makeUDPSocket: function() {
     return Cc["@mozilla.org/network/udp-socket;1"]
              .createInstance(Ci.nsIUDPSocket);
   },
new file mode 100644
--- /dev/null
+++ b/dom/push/test/xpcshell/test_reconnect_retry.js
@@ -0,0 +1,71 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+'use strict';
+
+const {PushDB, PushService, PushServiceWebSocket} = serviceExports;
+
+function run_test() {
+  do_get_profile();
+  setPrefs({
+    requestTimeout: 10000,
+    retryBaseInterval: 150
+  });
+  run_next_test();
+}
+
+add_task(function* test_reconnect_retry() {
+  let db = PushServiceWebSocket.newPushDB();
+  do_register_cleanup(() => {return db.drop().then(_ => db.close());});
+
+  let registers = 0;
+  let channelID;
+  PushService.init({
+    serverURI: "wss://push.example.org/",
+    networkInfo: new MockDesktopNetworkInfo(),
+    db,
+    makeWebSocket(uri) {
+      return new MockWebSocket(uri, {
+        onHello(request) {
+          this.serverSendMsg(JSON.stringify({
+            messageType: 'hello',
+            status: 200,
+            uaid: '083e6c17-1063-4677-8638-ab705aebebc2'
+          }));
+        },
+        onRegister(request) {
+          registers++;
+          if (registers == 1) {
+            channelID = request.channelID;
+            this.serverClose();
+            return;
+          }
+          if (registers == 2) {
+            equal(request.channelID, channelID,
+              'Should retry registers after reconnect');
+          }
+          this.serverSendMsg(JSON.stringify({
+            messageType: 'register',
+            channelID: request.channelID,
+            pushEndpoint: 'https://example.org/push/' + registers,
+            status: 200,
+          }));
+        }
+      });
+    }
+  });
+
+  let registration = yield PushNotificationService.register(
+    'https://example.com/page/1',
+    ChromeUtils.originAttributesToSuffix({ appId: Ci.nsIScriptSecurityManager.NO_APP_ID, inBrowser: false })
+  );
+  equal(registration.channelID, channelID, 'Wrong channel ID for retried request');
+
+  registration = yield PushNotificationService.register(
+    'https://example.com/page/2',
+    ChromeUtils.originAttributesToSuffix({ appId: Ci.nsIScriptSecurityManager.NO_APP_ID, inBrowser: false })
+  );
+  notEqual(registration.channelID, channelID, 'Wrong channel ID for new request');
+
+  equal(registers, 3, 'Wrong registration count');
+});
--- a/dom/push/test/xpcshell/xpcshell.ini
+++ b/dom/push/test/xpcshell/xpcshell.ini
@@ -33,16 +33,17 @@ run-sequentially = This will delete all 
 [test_registration_success.js]
 [test_unregister_empty_scope.js]
 [test_unregister_error.js]
 [test_unregister_invalid_json.js]
 [test_unregister_not_found.js]
 [test_unregister_success.js]
 [test_webapps_cleardata.js]
 [test_updateRecordNoEncryptionKeys_ws.js]
+[test_reconnect_retry.js]
 #http2 test
 [test_resubscribe_4xxCode_http2.js]
 [test_resubscribe_5xxCode_http2.js]
 [test_resubscribe_listening_for_msg_error_http2.js]
 [test_register_5xxCode_http2.js]
 [test_updateRecordNoEncryptionKeys_http2.js]
 [test_register_success_http2.js]
 skip-if = !hasNode