Bug 1207744 - Track and re-send push unregister requests on reconnect. r=dragana
☠☠ backed out by f7fb88074a1f ☠ ☠
authorKit Cambridge <kcambridge@mozilla.com>
Mon, 06 Jun 2016 16:29:36 -0700
changeset 300999 f7e1ab1bd99c05c219fe75913f8f37ba39aec092
parent 300998 3943ac2a0fd15c79dec6fba028f7087560e8ccbe
child 301000 51bd83e16a49cf1c31114ba2f767295e5d50b0d2
push id19599
push usercbook@mozilla.com
push dateWed, 08 Jun 2016 10:16:21 +0000
treeherderfx-team@81f4cc3f6f4c [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersdragana
bugs1207744
milestone50.0a1
Bug 1207744 - Track and re-send push unregister requests on reconnect. r=dragana MozReview-Commit-ID: 2rFLm07n4EU
dom/push/PushDB.jsm
dom/push/PushService.jsm
dom/push/PushServiceAndroidGCM.jsm
dom/push/PushServiceHttp2.jsm
dom/push/PushServiceWebSocket.jsm
dom/push/test/test_utils.js
dom/push/test/xpcshell/test_permissions.js
dom/push/test/xpcshell/test_unregister_invalid_json.js
dom/push/test/xpcshell/test_unregister_success.js
--- a/dom/push/PushDB.jsm
+++ b/dom/push/PushDB.jsm
@@ -120,20 +120,20 @@ this.PushDB.prototype = {
    */
   delete: function(aKeyID) {
     console.debug("delete()");
 
     return new Promise((resolve, reject) =>
       this.newTxn(
         "readwrite",
         this._dbStoreName,
-        function txnCb(aTxn, aStore) {
+        (aTxn, aStore) => {
           console.debug("delete: Removing record", aKeyID);
           aStore.get(aKeyID).onsuccess = event => {
-            aTxn.result = event.target.result;
+            aTxn.result = this.toPushRecord(event.target.result);
             aStore.delete(aKeyID);
           };
         },
         resolve,
         reject
       )
     );
   },
--- a/dom/push/PushService.jsm
+++ b/dom/push/PushService.jsm
@@ -1016,20 +1016,18 @@ this.PushService = {
 
   getAllUnexpired: function() {
     return this._db.getAllUnexpired();
   },
 
   _sendRequest(action, ...params) {
     if (this._state == PUSH_SERVICE_CONNECTION_DISABLE) {
       return Promise.reject(new Error("Push service disabled"));
-    } else if (this._state == PUSH_SERVICE_ACTIVE_OFFLINE) {
-      if (this._service.serviceType() == "WebSocket" && action == "unregister") {
-        return Promise.resolve();
-      }
+    }
+    if (this._state == PUSH_SERVICE_ACTIVE_OFFLINE) {
       return Promise.reject(new Error("Push service offline"));
     }
     // Ensure the backend is ready. `getByPageRecord` already checks this, but
     // we need to check again here in case the service was restarted in the
     // meantime.
     return this._checkActivated().then(_ => {
       switch (action) {
         case "register":
@@ -1196,22 +1194,23 @@ this.PushService = {
       .then(record => {
         if (record === undefined) {
           return false;
         }
 
         let reason = Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL;
         return Promise.all([
           this._sendUnregister(record, reason),
-          this._db.delete(record.keyID),
-        ]).then(() => {
-          gPushNotifier.notifySubscriptionModified(record.scope,
-                                                   record.principal);
-          return true;
-        });
+          this._db.delete(record.keyID).then(record => {
+            if (record) {
+              gPushNotifier.notifySubscriptionModified(record.scope,
+                                                       record.principal);
+            }
+          }),
+        ]).then(([success]) => success);
       });
   },
 
   clear: function(info) {
     return this._checkActivated()
       .then(_ => {
         return this._dropRegistrationsIf(record =>
           info.domain == "*" ||
--- a/dom/push/PushServiceAndroidGCM.jsm
+++ b/dom/push/PushServiceAndroidGCM.jsm
@@ -54,20 +54,16 @@ this.PushServiceAndroidGCM = {
   newPushDB: function() {
     return new PushDB(kPUSHANDROIDGCMDB_DB_NAME,
                       kPUSHANDROIDGCMDB_DB_VERSION,
                       kPUSHANDROIDGCMDB_STORE_NAME,
                       "channelID",
                       PushRecordAndroidGCM);
   },
 
-  serviceType: function() {
-    return "AndroidGCM";
-  },
-
   validServerURI: function(serverURI) {
     if (!serverURI) {
       return false;
     }
 
     if (serverURI.scheme == "https") {
       return true;
     }
--- a/dom/push/PushServiceHttp2.jsm
+++ b/dom/push/PushServiceHttp2.jsm
@@ -425,20 +425,16 @@ this.PushServiceHttp2 = {
   newPushDB: function() {
     return new PushDB(kPUSHHTTP2DB_DB_NAME,
                       kPUSHHTTP2DB_DB_VERSION,
                       kPUSHHTTP2DB_STORE_NAME,
                       "subscriptionUri",
                       PushRecordHttp2);
   },
 
-  serviceType: function() {
-    return "http2";
-  },
-
   hasmainPushService: function() {
     return this._mainPushService !== null;
   },
 
   validServerURI: function(serverURI) {
     if (serverURI.scheme == "http") {
       return !!prefs.get("testing.allowInsecureServerURL");
     }
--- a/dom/push/PushServiceWebSocket.jsm
+++ b/dom/push/PushServiceWebSocket.jsm
@@ -138,20 +138,16 @@ this.PushServiceWebSocket = {
   newPushDB: function() {
     return new PushDB(kPUSHWSDB_DB_NAME,
                       kPUSHWSDB_DB_VERSION,
                       kPUSHWSDB_STORE_NAME,
                       "channelID",
                       PushRecordWebSocket);
   },
 
-  serviceType: function() {
-    return "WebSocket";
-  },
-
   disconnect: function() {
     this._shutdownWS();
   },
 
   observe: function(aSubject, aTopic, aData) {
     if (aTopic == "nsPref:changed" && aData == "dom.push.userAgentID") {
       this._onUAIDChanged();
     } else if (aTopic == "timer-callback") {
@@ -229,27 +225,25 @@ this.PushServiceWebSocket = {
 
     if (this._lastPingTime > 0 &&
         now - this._lastPingTime > this._requestTimeout) {
 
       console.debug("timeOutRequests: Did not receive pong in time");
       requestTimedOut = true;
 
     } else {
-      for (let [channelID, request] of this._registerRequests) {
+      for (let [key, request] of this._pendingRequests) {
         let duration = now - request.ctime;
         // If any of the registration requests time out, all the ones after it
         // also made to fail, since we are going to be disconnecting the
         // socket.
         requestTimedOut |= duration > this._requestTimeout;
         if (requestTimedOut) {
-          request.reject(new Error(
-            "Register request timed out for channel ID " + channelID));
-
-          this._registerRequests.delete(channelID);
+          request.reject(new Error("Request timed out"));
+          this._pendingRequests.delete(key);
         }
       }
     }
 
     // The most likely reason for a pong or registration request timing out is
     // that the socket has disconnected. Best to reconnect.
     if (requestTimedOut) {
       this._reconnect();
@@ -273,17 +267,17 @@ this.PushServiceWebSocket = {
         "Not updating userAgentID");
       return;
     }
     console.debug("New _UAID", newID);
     prefs.set("userAgentID", newID);
   },
 
   _ws: null,
-  _registerRequests: new Map(),
+  _pendingRequests: new Map(),
   _currentState: STATE_SHUT_DOWN,
   _requestTimeout: 0,
   _requestTimeoutTimer: null,
   _retryFailCount: 0,
 
   /**
    * According to the WS spec, servers should immediately close the underlying
    * TCP connection after they close a WebSocket. This causes wsOnStop to be
@@ -371,17 +365,17 @@ this.PushServiceWebSocket = {
 
     this._lastPingTime = 0;
 
     if (this._pingTimer) {
       this._pingTimer.cancel();
     }
 
     if (shouldCancelPending) {
-      this._cancelRegisterRequests();
+      this._cancelPendingRequests();
     }
 
     if (this._notifyRequestQueue) {
       this._notifyRequestQueue();
       this._notifyRequestQueue = null;
     }
   },
 
@@ -432,17 +426,17 @@ this.PushServiceWebSocket = {
       this._backoffTimer = Cc["@mozilla.org/timer;1"]
                                .createInstance(Ci.nsITimer);
     }
     this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT);
   },
 
   /** Indicates whether we're waiting for pongs or requests. */
   _hasPendingRequests() {
-    return this._lastPingTime > 0 || this._registerRequests.size > 0;
+    return this._lastPingTime > 0 || this._pendingRequests.size > 0;
   },
 
   /**
    * Starts the request timeout timer unless we're already waiting for a pong
    * or register request.
    */
   _startRequestTimeoutTimer() {
     if (this._hasPendingRequests()) {
@@ -617,17 +611,17 @@ this.PushServiceWebSocket = {
       return;
     }
 
     let sendRequests = () => {
       if (this._notifyRequestQueue) {
         this._notifyRequestQueue();
         this._notifyRequestQueue = null;
       }
-      this._sendRegisterRequests();
+      this._sendingPendingRequests();
     };
 
     function finishHandshake() {
       this._UAID = reply.uaid;
       this._currentState = STATE_READY;
       prefs.observe("userAgentID", this);
 
       this._dataEnabled = !!reply.use_webpush;
@@ -664,27 +658,22 @@ this.PushServiceWebSocket = {
     finishHandshake.bind(this)();
   },
 
   /**
    * Protocol handler invoked by server message.
    */
   _handleRegisterReply: function(reply) {
     console.debug("handleRegisterReply()");
-    if (typeof reply.channelID !== "string" ||
-        !this._registerRequests.has(reply.channelID)) {
+
+    let tmp = this._takeRequestForReply(reply);
+    if (!tmp) {
       return;
     }
 
-    let tmp = this._registerRequests.get(reply.channelID);
-    this._registerRequests.delete(reply.channelID);
-    if (!this._hasPendingRequests()) {
-      this._requestTimeoutTimer.cancel();
-    }
-
     if (reply.status == 200) {
       try {
         Services.io.newURI(reply.pushEndpoint, null, null);
       }
       catch (e) {
         tmp.reject(new Error("Invalid push endpoint: " + reply.pushEndpoint));
         return;
       }
@@ -703,16 +692,28 @@ this.PushServiceWebSocket = {
       tmp.resolve(record);
     } else {
       console.error("handleRegisterReply: Unexpected server response", reply);
       tmp.reject(new Error("Wrong status code for register reply: " +
         reply.status));
     }
   },
 
+  _handleUnregisterReply(reply) {
+    console.debug("handleUnregisterReply()");
+
+    let request = this._takeRequestForReply(reply);
+    if (!request) {
+      return;
+    }
+
+    let success = reply.status === 200;
+    request.resolve(success);
+  },
+
   _handleDataUpdate: function(update) {
     let promise;
     if (typeof update.channelID != "string") {
       console.warn("handleDataUpdate: Discarding update without channel ID",
         update);
       return;
     }
     function updateRecord(record) {
@@ -840,63 +841,61 @@ this.PushServiceWebSocket = {
                           .getService(Ci.nsIUUIDGenerator);
     // generateUUID() gives a UUID surrounded by {...}, slice them off.
     return uuidGenerator.generateUUID().toString().slice(1, -1);
   },
 
   register(record) {
     console.debug("register() ", record);
 
-    // start the timer since we now have at least one request
-    this._startRequestTimeoutTimer();
-
     let data = {channelID: this._generateID(),
                 messageType: "register"};
 
     if (record.appServerKey) {
       data.key = ChromeUtils.base64URLEncode(record.appServerKey, {
         // The Push server requires padding.
         pad: true,
       });
     }
 
-    return new Promise((resolve, reject) => {
-      this._registerRequests.set(data.channelID, {
-        record: record,
-        resolve: resolve,
-        reject: reject,
-        ctime: Date.now(),
-      });
-      this._queueRequest(data);
-    }).then(record => {
+    return this._sendRequest(record, data).then(record => {
       if (!this._dataEnabled) {
         return record;
       }
       return PushCrypto.generateKeys()
         .then(([publicKey, privateKey]) => {
           record.p256dhPublicKey = publicKey;
           record.p256dhPrivateKey = privateKey;
           record.authenticationSecret = PushCrypto.generateAuthenticationSecret();
           return record;
         });
     });
   },
 
   unregister(record, reason) {
     console.debug("unregister() ", record, reason);
 
-    let code = kUNREGISTER_REASON_TO_CODE[reason];
-    if (!code) {
-      return Promise.reject(new Error('Invalid unregister reason'));
-    }
-    let data = {channelID: record.channelID,
-                messageType: "unregister",
-                code: code};
-    this._queueRequest(data);
-    return Promise.resolve();
+    return Promise.resolve().then(_ => {
+      let code = kUNREGISTER_REASON_TO_CODE[reason];
+      if (!code) {
+        throw new Error('Invalid unregister reason');
+      }
+      let data = {channelID: record.channelID,
+                  messageType: "unregister",
+                  code: code};
+
+      // If we're connected to a Web Push server, wait for an unregister
+      // response. Simple Push servers aren't required to support
+      // unregistration, so we return immediately.
+      if (this._dataEnabled) {
+        return this._sendRequest(record, data);
+      }
+      this._queueRequest(data);
+      return true;
+    });
   },
 
   _queueStart: Promise.resolve(),
   _notifyRequestQueue: null,
   _queue: null,
   _enqueue: function(op) {
     console.debug("enqueue()");
     if (!this._queue) {
@@ -904,32 +903,25 @@ this.PushServiceWebSocket = {
     }
     this._queue = this._queue
                     .then(op)
                     .catch(_ => {});
   },
 
   _send(data) {
     if (this._currentState == STATE_READY) {
-      if (data.messageType != "register" ||
-          this._registerRequests.has(data.channelID)) {
-
-        // check if request has not been cancelled
-        this._wsSendMessage(data);
-      }
+      // check if request has not been cancelled
+      this._wsSendMessage(data);
     }
   },
 
-  _sendRegisterRequests() {
+  _sendingPendingRequests() {
     this._enqueue(_ => {
-      for (let channelID of this._registerRequests.keys()) {
-        this._send({
-          messageType: "register",
-          channelID: channelID,
-        });
+      for (let request of this._pendingRequests.values()) {
+        this._send(request.data);
       }
     });
   },
 
   _queueRequest(data) {
     if (data.messageType != "register") {
       if (this._currentState != STATE_READY && !this._notifyRequestQueue) {
         let promise = new Promise((resolve, reject) => {
@@ -1054,17 +1046,17 @@ this.PushServiceWebSocket = {
 
     // If it is a ping, do not handle the message.
     if (doNotHandle) {
       return;
     }
 
     // A whitelist of protocol handlers. Add to these if new messages are added
     // in the protocol.
-    let handlers = ["Hello", "Register", "Notification"];
+    let handlers = ["Hello", "Register", "Unregister", "Notification"];
 
     // Build up the handler name to call from messageType.
     // e.g. messageType == "register" -> _handleRegisterReply.
     let handlerName = reply.messageType[0].toUpperCase() +
                       reply.messageType.slice(1).toLowerCase();
 
     if (handlers.indexOf(handlerName) == -1) {
       console.warn("wsOnMessageAvailable: No whitelisted handler", handlerName,
@@ -1100,21 +1092,63 @@ this.PushServiceWebSocket = {
       console.debug("wsOnServerClose: Skipping automatic reconnect");
       this._skipReconnect = true;
     }
   },
 
   /**
    * Rejects all pending register requests with errors.
    */
-  _cancelRegisterRequests: function() {
-    for (let request of this._registerRequests.values()) {
-      request.reject(new Error("Register request aborted"));
+  _cancelPendingRequests() {
+    for (let request of this._pendingRequests.values()) {
+      request.reject(new Error("Request aborted"));
     }
-    this._registerRequests.clear();
+    this._pendingRequests.clear();
+  },
+
+  _makePendingRequestKey(request) {
+    return request.messageType.toLowerCase() + "|" + request.channelID;
+  },
+
+  _sendRequest(record, data) {
+    // start the timer since we now have at least one request
+    this._startRequestTimeoutTimer();
+
+    let key = this._makePendingRequestKey(data);
+    if (!this._pendingRequests.has(key)) {
+      let request = {
+        data: data,
+        record: record,
+        ctime: Date.now(),
+      };
+      request.promise = new Promise((resolve, reject) => {
+        request.resolve = resolve;
+        request.reject = reject;
+        this._queueRequest(data);
+      });
+      this._pendingRequests.set(key, request);
+    }
+
+    return this._pendingRequests.get(key).promise;
+  },
+
+  _takeRequestForReply(reply) {
+    if (typeof reply.channelID !== "string") {
+      return null;
+    }
+    let key = this._makePendingRequestKey(reply);
+    let request = this._pendingRequests.get(key);
+    if (!request) {
+      return null;
+    }
+    this._pendingRequests.delete(key);
+    if (!this._hasPendingRequests()) {
+      this._requestTimeoutTimer.cancel();
+    }
+    return request;
   },
 };
 
 function PushRecordWebSocket(record) {
   PushRecord.call(this, record);
   this.channelID = record.channelID;
   this.version = record.version;
 }
--- a/dom/push/test/test_utils.js
+++ b/dom/push/test/test_utils.js
@@ -101,17 +101,21 @@
         uaid: userAgentID,
         channelID: request.channelID,
         status: 200,
         pushEndpoint: "https://example.com/endpoint/" + registerCount++
       }));
     },
 
     onUnregister(request) {
-      // Do nothing.
+      this.serverSendMsg(JSON.stringify({
+        messageType: "unregister",
+        channelID: request.channelID,
+        status: 200,
+      }));
     },
 
     onAck(request) {
       // Do nothing.
     },
 
     handleMessage(msg) {
       let request = JSON.parse(msg);
--- a/dom/push/test/xpcshell/test_permissions.js
+++ b/dom/push/test/xpcshell/test_permissions.js
@@ -115,16 +115,21 @@ add_task(function* setUp() {
         onUnregister(request) {
           let resolve = unregisterDefers[request.channelID];
           equal(typeof resolve, 'function',
             'Dropped unexpected channel ID ' + request.channelID);
           delete unregisterDefers[request.channelID];
           equal(request.code, 202,
             'Expected permission revoked unregister reason');
           resolve();
+          this.serverSendMsg(JSON.stringify({
+            messageType: 'unregister',
+            status: 200,
+            channelID: request.channelID,
+          }));
         },
         onACK(request) {},
       });
     }
   });
   yield handshakePromise;
 });
 
--- a/dom/push/test/xpcshell/test_unregister_invalid_json.js
+++ b/dom/push/test/xpcshell/test_unregister_invalid_json.js
@@ -45,41 +45,48 @@ add_task(function* test_unregister_inval
     serverURI: "wss://push.example.org/",
     db,
     makeWebSocket(uri) {
       return new MockWebSocket(uri, {
         onHello(request) {
           this.serverSendMsg(JSON.stringify({
             messageType: 'hello',
             status: 200,
-            uaid: userAgentID
+            uaid: userAgentID,
+            use_webpush: true,
           }));
         },
         onUnregister(request) {
           this.serverSendMsg(');alert(1);(');
           unregisterDone();
         }
       });
     }
   });
 
-  // "unregister" is fire-and-forget: it's sent via _send(), not
-  // _sendRequest().
-  yield PushService.unregister({
-    scope: 'https://example.edu/page/1',
-    originAttributes: '',
-  });
+  yield rejects(
+    PushService.unregister({
+      scope: 'https://example.edu/page/1',
+      originAttributes: '',
+    }),
+    'Expected error for first invalid JSON response'
+  );
+
   let record = yield db.getByKeyID(
     '87902e90-c57e-4d18-8354-013f4a556559');
   ok(!record, 'Failed to delete unregistered record');
 
-  yield PushService.unregister({
-    scope: 'https://example.net/page/1',
-    originAttributes: ChromeUtils.originAttributesToSuffix(
-      { appId: Ci.nsIScriptSecurityManager.NO_APP_ID, inIsolatedMozBrowser: false }),
-  });
+  yield rejects(
+    PushService.unregister({
+      scope: 'https://example.net/page/1',
+      originAttributes: ChromeUtils.originAttributesToSuffix(
+        { appId: Ci.nsIScriptSecurityManager.NO_APP_ID, inIsolatedMozBrowser: false }),
+    }),
+    'Expected error for second invalid JSON response'
+  );
+
   record = yield db.getByKeyID(
     '057caa8f-9b99-47ff-891c-adad18ce603e');
   ok(!record,
     'Failed to delete unregistered record after receiving invalid JSON');
 
   yield unregisterPromise;
 });
--- a/dom/push/test/xpcshell/test_unregister_success.js
+++ b/dom/push/test/xpcshell/test_unregister_success.js
@@ -1,20 +1,23 @@
 /* Any copyright is dedicated to the Public Domain.
    http://creativecommons.org/publicdomain/zero/1.0/ */
 
 'use strict';
 
 const {PushDB, PushService, PushServiceWebSocket} = serviceExports;
 
+const userAgentID = 'fbe865a6-aeb8-446f-873c-aeebdb8d493c';
 const channelID = 'db0a7021-ec2d-4bd3-8802-7a6966f10ed8';
 
 function run_test() {
   do_get_profile();
-  setPrefs();
+  setPrefs({
+    userAgentID: userAgentID,
+  });
   run_next_test();
 }
 
 add_task(function* test_unregister_success() {
   let db = PushServiceWebSocket.newPushDB();
   do_register_cleanup(() => {return db.drop().then(_ => db.close());});
   yield db.put({
     channelID,
@@ -31,17 +34,18 @@ add_task(function* test_unregister_succe
     serverURI: "wss://push.example.org/",
     db,
     makeWebSocket(uri) {
       return new MockWebSocket(uri, {
         onHello(request) {
           this.serverSendMsg(JSON.stringify({
             messageType: 'hello',
             status: 200,
-            uaid: 'fbe865a6-aeb8-446f-873c-aeebdb8d493c'
+            uaid: userAgentID,
+            use_webpush: true,
           }));
         },
         onUnregister(request) {
           equal(request.channelID, channelID, 'Should include the channel ID');
           equal(request.code, 200, 'Expected manual unregister reason');
           this.serverSendMsg(JSON.stringify({
             messageType: 'unregister',
             status: 200,