Bug 863732 - Use RTC wakeup to monitor WebSocket connection. r=jlebar
☠☠ backed out by 9ec0ad6f7e09 ☠ ☠
authorNikhil Marathe <nsm.nikhil@gmail.com>
Sat, 11 May 2013 10:45:56 +0530
changeset 142480 d4f14f6dd4012aba4e03e29ff868c55a67baa8f8
parent 142479 8db9762409f411507b6f0953796d749b8c45f331
child 142481 2745b9feab4426217fdf800f0e7ddb543cdf3e8d
push id2579
push userakeybl@mozilla.com
push dateMon, 24 Jun 2013 18:52:47 +0000
treeherdermozilla-beta@b69b7de8a05a [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersjlebar
bugs863732
milestone23.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 863732 - Use RTC wakeup to monitor WebSocket connection. r=jlebar
b2g/app/b2g.js
dom/push/src/PushService.jsm
--- a/b2g/app/b2g.js
+++ b/b2g/app/b2g.js
@@ -389,22 +389,23 @@ pref("dom.mozContacts.enabled", true);
 // WebAlarms
 pref("dom.mozAlarms.enabled", true);
 
 // SimplePush
 pref("services.push.enabled", true);
 // serverURL to be assigned by services team
 pref("services.push.serverURL", "");
 pref("services.push.userAgentID", "");
-// exponential back-off start is 5 seconds like in HTTP/1.1
+// Exponential back-off start is 5 seconds like in HTTP/1.1.
+// Maximum back-off is pingInterval.
 pref("services.push.retryBaseInterval", 5000);
-// WebSocket level ping transmit interval in seconds.
-pref("services.push.websocketPingInterval", 55);
-// exponential back-off end is 20 minutes
-pref("services.push.maxRetryInterval", 1200000);
+// Interval at which to ping PushServer to check connection status. In
+// milliseconds. If no reply is received within requestTimeout, the connection
+// is considered closed.
+pref("services.push.pingInterval", 1800000); // 30 minutes
 // How long before a DOMRequest errors as timeout
 pref("services.push.requestTimeout", 10000);
 // enable udp wakeup support
 pref("services.push.udp.wakeupEnabled", true);
 // port on which UDP server socket is bound
 pref("services.push.udp.port", 2442);
 
 // NetworkStats
--- a/dom/push/src/PushService.jsm
+++ b/dom/push/src/PushService.jsm
@@ -14,16 +14,17 @@ const Cu = Components.utils;
 const Cr = Components.results;
 
 Cu.import("resource://gre/modules/XPCOMUtils.jsm");
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/IndexedDBHelper.jsm");
 Cu.import("resource://gre/modules/Timer.jsm");
 Cu.import("resource://gre/modules/Preferences.jsm");
 Cu.import("resource://gre/modules/commonjs/sdk/core/promise.js");
+Cu.import("resource://gre/modules/AlarmService.jsm");
 
 this.EXPORTED_SYMBOLS = ["PushService"];
 
 const prefs = new Preferences("services.push.");
 
 const kPUSHDB_DB_NAME = "push";
 const kPUSHDB_DB_VERSION = 1; // Change this if the IndexedDB format changes
 const kPUSHDB_STORE_NAME = "push";
@@ -82,17 +83,18 @@ this.PushDB.prototype = {
     debug("put()");
 
     this.newTxn(
       "readwrite",
       kPUSHDB_STORE_NAME,
       function txnCb(aTxn, aStore) {
         debug("Going to put " + aChannelRecord.channelID);
         aStore.put(aChannelRecord).onsuccess = function setTxnResult(aEvent) {
-          debug("Request successful. Updated record ID: " + aEvent.target.result);
+          debug("Request successful. Updated record ID: " +
+                aEvent.target.result);
         };
       },
       aSuccessCb,
       aErrorCb
     );
   },
 
   /*
@@ -262,22 +264,22 @@ this.PushWebSocketListener.prototype = {
         return;
     this._pushService._wsOnServerClose(context, aStatusCode, aReason);
   }
 }
 
 // websocket states
 // websocket is off
 const STATE_SHUT_DOWN = 0;
-// websocket has been opened on client side, waiting for successful open
+// Websocket has been opened on client side, waiting for successful open.
 // (_wsOnStart)
 const STATE_WAITING_FOR_WS_START = 1;
-// websocket opened, hello sent, waiting for server reply (_handleHelloReply)
+// Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
 const STATE_WAITING_FOR_HELLO = 2;
-// websocket operational, handshake completed, begin protocol messaging
+// Websocket operational, handshake completed, begin protocol messaging.
 const STATE_READY = 3;
 
 /**
  * The implementation of the SimplePush system. This runs in the B2G parent
  * process and is started on boot. It uses WebSockets to communicate with the
  * server and PushDB (IndexedDB) for persistence.
  */
 this.PushService = {
@@ -328,35 +330,33 @@ this.PushService = {
 
               delete this._pendingRequests[channelID];
               for (var i = this._requestQueue.length - 1; i >= 0; --i)
                 if (this._requestQueue[i].channelID == channelID)
                   this._requestQueue.splice(i, 1);
             }
           }
         }
-        else if (aSubject == this._retryTimeoutTimer) {
-          this._beginWSSetup();
-        }
         break;
       case "webapps-uninstall":
         debug("webapps-uninstall");
         let appsService = Cc["@mozilla.org/AppsService;1"]
                             .getService(Ci.nsIAppsService);
         var app = appsService.getAppFromObserverMessage(aData);
         if (!app) {
           debug("webapps-uninstall: No app found " + aData.origin);
           return;
         }
 
         this._db.getAllByManifestURL(app.manifestURL, function(records) {
           debug("Got " + records.length);
           for (var i = 0; i < records.length; i++) {
             this._db.delete(records[i].channelID, null, function() {
-              debug("app uninstall: " + app.manifestURL + " Could not delete entry " + records[i].channelID);
+              debug("app uninstall: " + app.manifestURL +
+                    " Could not delete entry " + records[i].channelID);
             });
             // courtesy, but don't establish a connection
             // just for it
             if (this._ws) {
               debug("Had a connection, so telling the server");
               this._request("unregister", {channelID: records[i].channelID});
             }
           }
@@ -384,35 +384,16 @@ this.PushService = {
 
   // keeps requests buffered if the websocket disconnects or is not connected
   _requestQueue: [],
   _ws: null,
   _pendingRequests: {},
   _currentState: STATE_SHUT_DOWN,
   _requestTimeout: 0,
   _requestTimeoutTimer: null,
-
-  /**
-   * How retries work:  The goal is to ensure websocket is always up on
-   * networks not supporting UDP. So the websocket should only be shutdown if
-   * onServerClose indicates UDP wakeup.  If WS is closed due to socket error,
-   * _socketError() is called.  The retry timer is started and when it times
-   * out, beginWSSetup() is called again.
-   *
-   * On a successful connection, the timer is cancelled if it is running and
-   * the values are reset to defaults.
-   *
-   * If we are in the middle of a timeout (i.e. waiting), but
-   * a register/unregister is called, we don't want to wait around anymore.
-   * _sendRequest will automatically call beginWSSetup(), which will cancel the
-   * timer. In addition since the state will have changed, even if a pending
-   * timer event comes in (because the timer fired the event before it was
-   * cancelled), so the connection won't be reset.
-   */
-  _retryTimeoutTimer: null,
   _retryFailCount: 0,
 
   /**
    * According to the WS spec, servers should immediately close the underlying
    * TCP connection after they close a WebSocket. This causes wsOnStop to be
    * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
    * WebSocket up, it should try to reconnect. But if the server closes the
    * WebSocket because it will wake up the client via UDP, then the client
@@ -435,16 +416,18 @@ this.PushService = {
 
     let ppmm = Cc["@mozilla.org/parentprocessmessagemanager;1"]
                  .getService(Ci.nsIMessageBroadcaster);
 
     kCHILD_PROCESS_MESSAGES.forEach(function addMessage(msgName) {
         ppmm.addMessageListener(msgName, this);
     }.bind(this));
 
+    this._alarmID = null;
+
     this._requestTimeout = prefs.get("requestTimeout");
 
     this._udpPort = prefs.get("udp.port");
 
     this._db.getAllChannelIDs(
       function(channelIDs) {
         if (channelIDs.length > 0) {
           debug("Found registered channelIDs. Starting WebSocket");
@@ -461,22 +444,26 @@ this.PushService = {
     // slightly different URLs.
     prefs.observe("serverURL", this);
   },
 
   _shutdownWS: function() {
     debug("shutdownWS()");
     this._currentState = STATE_SHUT_DOWN;
     this._willBeWokenUpByUDP = false;
+
     if (this._wsListener)
       this._wsListener._pushService = null;
     try {
         this._ws.close(0, null);
     } catch (e) {}
     this._ws = null;
+
+    this._waitingForPong = false;
+    this._stopAlarm();
   },
 
   _shutdown: function() {
     debug("_shutdown()");
 
     Services.obs.removeObserver(this, "network-interface-state-changed",
                                 false);
     Services.obs.removeObserver(this, "webapps-uninstall", false);
@@ -493,60 +480,66 @@ this.PushService = {
     // All pending requests (ideally none) are dropped at this point. We
     // shouldn't have any applications performing registration/unregistration
     // or receiving notifications.
     this._shutdownWS();
 
     // At this point, profile-change-net-teardown has already fired, so the
     // WebSocket has been closed with NS_ERROR_ABORT (if it was up) and will
     // try to reconnect. Stop the timer.
-    if (this._retryTimeoutTimer)
-      this._retryTimeoutTimer.cancel();
+    this._stopAlarm();
 
     if (this._requestTimeoutTimer)
       this._requestTimeoutTimer.cancel();
 
     debug("shutdown complete!");
   },
 
-  // aStatusCode is an NS error from Components.results
-  _socketError: function(aStatusCode) {
-    debug("socketError()");
+  /**
+   * How retries work:  The goal is to ensure websocket is always up on
+   * networks not supporting UDP. So the websocket should only be shutdown if
+   * onServerClose indicates UDP wakeup.  If WS is closed due to socket error,
+   * _reconnectAfterBackoff() is called.  The retry alarm is started and when
+   * it times out, beginWSSetup() is called again.
+   *
+   * On a successful connection, the alarm is cancelled in
+   * wsOnMessageAvailable() when the ping alarm is started.
+   *
+   * If we are in the middle of a timeout (i.e. waiting), but
+   * a register/unregister is called, we don't want to wait around anymore.
+   * _sendRequest will automatically call beginWSSetup(), which will cancel the
+   * timer. In addition since the state will have changed, even if a pending
+   * timer event comes in (because the timer fired the event before it was
+   * cancelled), so the connection won't be reset.
+   */
+  _reconnectAfterBackoff: function() {
+    debug("reconnectAfterBackoff()");
 
-    // Calculate new timeout, but cap it to
+    // Calculate new timeout, but cap it to pingInterval.
     var retryTimeout = prefs.get("retryBaseInterval") *
                        Math.pow(2, this._retryFailCount);
-
-    // It is easier to express the max interval as a pref in milliseconds,
-    // rather than have it as a number and make people do the calculation of
-    // retryBaseInterval * 2^maxRetryFailCount.
-    retryTimeout = Math.min(retryTimeout, prefs.get("maxRetryInterval"));
+    retryTimeout = Math.min(retryTimeout, prefs.get("pingInterval"));
 
     this._retryFailCount++;
 
     debug("Retry in " + retryTimeout + " Try number " + this._retryFailCount);
-
-    if (!this._retryTimeoutTimer) {
-      this._retryTimeoutTimer = Cc["@mozilla.org/timer;1"]
-                                  .createInstance(Ci.nsITimer);
-    }
-
-    this._retryTimeoutTimer.init(this,
-                                 retryTimeout,
-                                 Ci.nsITimer.TYPE_ONE_SHOT);
+    this._setAlarm(retryTimeout);
   },
 
   _beginWSSetup: function() {
     debug("beginWSSetup()");
     if (this._currentState != STATE_SHUT_DOWN) {
       debug("_beginWSSetup: Not in shutdown state! Current state " +
             this._currentState);
       return;
     }
 
+    // Stop any pending reconnects scheduled for the near future.
+    this._stopAlarm();
+
     var serverURL = prefs.get("serverURL");
     if (!serverURL) {
       debug("No services.push.serverURL found!");
       return;
     }
 
     var uri;
     try {
@@ -565,24 +558,111 @@ this.PushService = {
       debug("Push over an insecure connection (ws://) is not allowed!");
       return;
     }
     else {
       debug("Unsupported websocket scheme " + uri.scheme);
       return;
     }
 
+
     debug("serverURL: " + uri.spec);
     this._wsListener = new PushWebSocketListener(this);
     this._ws.protocol = "push-notification";
-    this._ws.pingInterval = prefs.get("websocketPingInterval");
     this._ws.asyncOpen(uri, serverURL, this._wsListener, null);
     this._currentState = STATE_WAITING_FOR_WS_START;
   },
 
+  /** |delay| should be in milliseconds. */
+  _setAlarm: function(delay) {
+    // Stop any existing alarm.
+    this._stopAlarm();
+
+    AlarmService.add(
+      {
+        date: new Date(Date.now() + delay),
+        ignoreTimezone: true
+      },
+      this._onAlarmFired.bind(this),
+      function onSuccess(alarmID) {
+        this._alarmID = alarmID;
+        debug("Set alarm " + delay + " in the future " + this._alarmID);
+      }.bind(this)
+    )
+  },
+
+  _stopAlarm: function() {
+    if (this._alarmID !== null) {
+      debug("Stopped existing alarm " + this._alarmID);
+      AlarmService.remove(this._alarmID);
+      this._alarmID = null;
+    }
+  },
+
+  /**
+   * There is only one alarm active at any time. This alarm has 3 intervals
+   * corresponding to 3 tasks.
+   *
+   * 1) Reconnect on ping timeout.
+   *    If we haven't received any messages from the server by the time this
+   *    alarm fires, the connection is closed and PushService tries to
+   *    reconnect, repurposing the alarm for (3).
+   *
+   * 2) Send a ping.
+   *    The protocol sends a ping ({}) on the wire every pingInterval ms. Once
+   *    it sends the ping, the alarm goes to task (1) which is waiting for
+   *    a pong. If data is received after the ping is sent,
+   *    _wsOnMessageAvailable() will reset the ping alarm (which cancels
+   *    waiting for the pong). So as long as the connection is fine, pong alarm
+   *    never fires.
+   *
+   * 3) Reconnect after backoff.
+   *    The alarm is set by _reconnectAfterBackoff() and increases in duration
+   *    every time we try and fail to connect.  When it triggers, websocket
+   *    setup begins again. On successful socket setup, the socket starts
+   *    receiving messages. The alarm now goes to (2) where it monitors the
+   *    WebSocket by sending a ping.  Since incoming data is a sign of the
+   *    connection being up, the ping alarm is reset every time data is
+   *    received.
+   */
+  _onAlarmFired: function() {
+    // Conditions are arranged in decreasing specificity.
+    // i.e. when _waitingForPong is true, other conditions are also true.
+    if (this._waitingForPong) {
+      debug("Did not receive pong in time. Reconnecting WebSocket.");
+      this._shutdownWS();
+      this._reconnectAfterBackoff();
+    }
+    else if (this._currentState == STATE_READY) {
+      // Send a ping.
+      // Bypass the queue; we don't want this to be kept pending.
+      this._ws.sendMsg('{}');
+      debug("Sent ping.");
+      this._waitingForPong = true;
+      this._setAlarm(prefs.get("requestTimeout"));
+    }
+    else if (this._alarmID !== null) {
+      debug("reconnect alarm fired.");
+      // Reconnect after back-off.
+      // The check for a non-null _alarmID prevents a situation where the alarm
+      // fires, but _shutdownWS() is called from another code-path (e.g.
+      // network state change) and we don't want to reconnect.
+      //
+      // It also handles the case where _beginWSSetup() is called from another
+      // code-path.
+      //
+      // alarmID will be non-null only when no shutdown/connect is
+      // called between _reconnectAfterBackoff() setting the alarm and the
+      // alarm firing.
+
+      // Websocket is shut down. Backoff interval expired, try to connect.
+      this._beginWSSetup();
+    }
+  },
+
   /**
    * Protocol handler invoked by server message.
    */
   _handleHelloReply: function(reply) {
     debug("handleHelloReply()");
     if (this._currentState != STATE_WAITING_FOR_HELLO) {
       debug("Unexpected state " + this._currentState +
             "(expected STATE_WAITING_FOR_HELLO)");
@@ -1123,19 +1203,16 @@ this.PushService = {
   _wsOnStart: function(context) {
     debug("wsOnStart()");
     if (this._currentState != STATE_WAITING_FOR_WS_START) {
       debug("NOT in STATE_WAITING_FOR_WS_START. Current state " +
             this._currentState + ". Skipping");
       return;
     }
 
-    if (this._retryTimeoutTimer)
-      this._retryTimeoutTimer.cancel();
-
     // Since we've had a successful connection reset the retry fail count.
     this._retryFailCount = 0;
 
     var data = {
       messageType: "hello",
     }
 
     if (this._UAID)
@@ -1172,27 +1249,35 @@ this.PushService = {
    * connection close status code.
    *
    * If we do not explicitly call ws.close() then statusCode is always
    * NS_BASE_STREAM_CLOSED, even on a successful close.
    */
   _wsOnStop: function(context, statusCode) {
     debug("wsOnStop()");
 
+    this._shutdownWS();
+
     if (statusCode != Cr.NS_OK &&
         !(statusCode == Cr.NS_BASE_STREAM_CLOSED && this._willBeWokenUpByUDP)) {
       debug("Socket error " + statusCode);
-      this._socketError(statusCode);
+      this._reconnectAfterBackoff();
     }
 
-    this._shutdownWS();
   },
 
   _wsOnMessageAvailable: function(context, message) {
     debug("wsOnMessageAvailable() " + message);
+
+    this._waitingForPong = false;
+
+    // Reset the ping timer.  Note: This path is executed at every step of the
+    // handshake, so this alarm does not need to be set explicitly at startup.
+    this._setAlarm(prefs.get("pingInterval"));
+
     var reply = undefined;
     try {
       reply = JSON.parse(message);
     } catch(e) {
       debug("Parsing JSON failed. text : " + message);
       return;
     }
 
@@ -1224,17 +1309,17 @@ this.PushService = {
     }
 
     this[handler](reply);
   },
 
   /**
    * The websocket should never be closed. Since we don't call ws.close(),
    * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
-   * function), which calls socketError and re-establishes the WebSocket
+   * function), which calls reconnect and re-establishes the WebSocket
    * connection.
    *
    * If the server said it'll use UDP for wakeup, we set _willBeWokenUpByUDP
    * and stop reconnecting in _wsOnStop().
    */
   _wsOnServerClose: function(context, aStatusCode, aReason) {
     debug("wsOnServerClose() " + aStatusCode + " " + aReason);
 
@@ -1288,19 +1373,19 @@ this.PushService = {
    * notifications.
    */
   onStopListening: function(aServ, aStatus) {
     debug("UDP Server socket was shutdown. Status: " + aStatus);
     this._beginWSSetup();
   },
 
   /**
-   * Get mobile network information to decide if the client is capable of being woken
-   * up by UDP (which currently just means having an mcc and mnc along with an
-   * IP).
+   * Get mobile network information to decide if the client is capable of being
+   * woken up by UDP (which currently just means having an mcc and mnc along
+   * with an IP).
    */
   _getNetworkState: function() {
     debug("getNetworkState()");
     try {
       var nm = Cc["@mozilla.org/network/manager;1"].getService(Ci.nsINetworkManager);
       if (nm.active && nm.active.type == Ci.nsINetworkInterface.NETWORK_TYPE_MOBILE) {
         var mcp = Cc["@mozilla.org/ril/content-helper;1"].getService(Ci.nsIMobileConnectionProvider);
         if (mcp.iccInfo) {