Bug 863732 - Use RTC wakeup to monitor WebSocket connection. r=jlebar
☠☠ backed out by 9ec0ad6f7e09 ☠ ☠
authorNikhil Marathe <nsm.nikhil@gmail.com>
Sat, 11 May 2013 10:45:56 +0530
changeset 138308 d4f14f6dd4012aba4e03e29ff868c55a67baa8f8
parent 138307 8db9762409f411507b6f0953796d749b8c45f331
child 138309 2745b9feab4426217fdf800f0e7ddb543cdf3e8d
push id3752
push userlsblakk@mozilla.com
push dateMon, 13 May 2013 17:21:10 +0000
treeherdermozilla-aurora@1580544aef0b [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersjlebar
bugs863732
milestone23.0a1
Bug 863732 - Use RTC wakeup to monitor WebSocket connection. r=jlebar
b2g/app/b2g.js
dom/push/src/PushService.jsm
--- a/b2g/app/b2g.js
+++ b/b2g/app/b2g.js
@@ -389,22 +389,23 @@ pref("dom.mozContacts.enabled", true);
 // WebAlarms
 pref("dom.mozAlarms.enabled", true);
 
 // SimplePush
 pref("services.push.enabled", true);
 // serverURL to be assigned by services team
 pref("services.push.serverURL", "");
 pref("services.push.userAgentID", "");
-// exponential back-off start is 5 seconds like in HTTP/1.1
+// Exponential back-off start is 5 seconds like in HTTP/1.1.
+// Maximum back-off is pingInterval.
 pref("services.push.retryBaseInterval", 5000);
-// WebSocket level ping transmit interval in seconds.
-pref("services.push.websocketPingInterval", 55);
-// exponential back-off end is 20 minutes
-pref("services.push.maxRetryInterval", 1200000);
+// Interval at which to ping PushServer to check connection status. In
+// milliseconds. If no reply is received within requestTimeout, the connection
+// is considered closed.
+pref("services.push.pingInterval", 1800000); // 30 minutes
 // How long before a DOMRequest errors as timeout
 pref("services.push.requestTimeout", 10000);
 // enable udp wakeup support
 pref("services.push.udp.wakeupEnabled", true);
 // port on which UDP server socket is bound
 pref("services.push.udp.port", 2442);
 
 // NetworkStats
--- a/dom/push/src/PushService.jsm
+++ b/dom/push/src/PushService.jsm
@@ -14,16 +14,17 @@ const Cu = Components.utils;
 const Cr = Components.results;
 
 Cu.import("resource://gre/modules/XPCOMUtils.jsm");
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/IndexedDBHelper.jsm");
 Cu.import("resource://gre/modules/Timer.jsm");
 Cu.import("resource://gre/modules/Preferences.jsm");
 Cu.import("resource://gre/modules/commonjs/sdk/core/promise.js");
+Cu.import("resource://gre/modules/AlarmService.jsm");
 
 this.EXPORTED_SYMBOLS = ["PushService"];
 
 const prefs = new Preferences("services.push.");
 
 const kPUSHDB_DB_NAME = "push";
 const kPUSHDB_DB_VERSION = 1; // Change this if the IndexedDB format changes
 const kPUSHDB_STORE_NAME = "push";
@@ -82,17 +83,18 @@ this.PushDB.prototype = {
     debug("put()");
 
     this.newTxn(
       "readwrite",
       kPUSHDB_STORE_NAME,
       function txnCb(aTxn, aStore) {
         debug("Going to put " + aChannelRecord.channelID);
         aStore.put(aChannelRecord).onsuccess = function setTxnResult(aEvent) {
-          debug("Request successful. Updated record ID: " + aEvent.target.result);
+          debug("Request successful. Updated record ID: " +
+                aEvent.target.result);
         };
       },
       aSuccessCb,
       aErrorCb
     );
   },
 
   /*
@@ -262,22 +264,22 @@ this.PushWebSocketListener.prototype = {
         return;
     this._pushService._wsOnServerClose(context, aStatusCode, aReason);
   }
 }
 
 // websocket states
 // websocket is off
 const STATE_SHUT_DOWN = 0;
-// websocket has been opened on client side, waiting for successful open
+// Websocket has been opened on client side, waiting for successful open.
 // (_wsOnStart)
 const STATE_WAITING_FOR_WS_START = 1;
-// websocket opened, hello sent, waiting for server reply (_handleHelloReply)
+// Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
 const STATE_WAITING_FOR_HELLO = 2;
-// websocket operational, handshake completed, begin protocol messaging
+// Websocket operational, handshake completed, begin protocol messaging.
 const STATE_READY = 3;
 
 /**
  * The implementation of the SimplePush system. This runs in the B2G parent
  * process and is started on boot. It uses WebSockets to communicate with the
  * server and PushDB (IndexedDB) for persistence.
  */
 this.PushService = {
@@ -328,35 +330,33 @@ this.PushService = {
 
               delete this._pendingRequests[channelID];
               for (var i = this._requestQueue.length - 1; i >= 0; --i)
                 if (this._requestQueue[i].channelID == channelID)
                   this._requestQueue.splice(i, 1);
             }
           }
         }
-        else if (aSubject == this._retryTimeoutTimer) {
-          this._beginWSSetup();
-        }
         break;
       case "webapps-uninstall":
         debug("webapps-uninstall");
         let appsService = Cc["@mozilla.org/AppsService;1"]
                             .getService(Ci.nsIAppsService);
         var app = appsService.getAppFromObserverMessage(aData);
         if (!app) {
           debug("webapps-uninstall: No app found " + aData.origin);
           return;
         }
 
         this._db.getAllByManifestURL(app.manifestURL, function(records) {
           debug("Got " + records.length);
           for (var i = 0; i < records.length; i++) {
             this._db.delete(records[i].channelID, null, function() {
-              debug("app uninstall: " + app.manifestURL + " Could not delete entry " + records[i].channelID);
+              debug("app uninstall: " + app.manifestURL +
+                    " Could not delete entry " + records[i].channelID);
             });
             // courtesy, but don't establish a connection
             // just for it
             if (this._ws) {
               debug("Had a connection, so telling the server");
               this._request("unregister", {channelID: records[i].channelID});
             }
           }
@@ -384,35 +384,16 @@ this.PushService = {
 
   // keeps requests buffered if the websocket disconnects or is not connected
   _requestQueue: [],
   _ws: null,
   _pendingRequests: {},
   _currentState: STATE_SHUT_DOWN,
   _requestTimeout: 0,
   _requestTimeoutTimer: null,
-
-  /**
-   * How retries work:  The goal is to ensure websocket is always up on
-   * networks not supporting UDP. So the websocket should only be shutdown if
-   * onServerClose indicates UDP wakeup.  If WS is closed due to socket error,
-   * _socketError() is called.  The retry timer is started and when it times
-   * out, beginWSSetup() is called again.
-   *
-   * On a successful connection, the timer is cancelled if it is running and
-   * the values are reset to defaults.
-   *
-   * If we are in the middle of a timeout (i.e. waiting), but
-   * a register/unregister is called, we don't want to wait around anymore.
-   * _sendRequest will automatically call beginWSSetup(), which will cancel the
-   * timer. In addition since the state will have changed, even if a pending
-   * timer event comes in (because the timer fired the event before it was
-   * cancelled), so the connection won't be reset.
-   */
-  _retryTimeoutTimer: null,
   _retryFailCount: 0,
 
   /**
    * According to the WS spec, servers should immediately close the underlying
    * TCP connection after they close a WebSocket. This causes wsOnStop to be
    * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
    * WebSocket up, it should try to reconnect. But if the server closes the
    * WebSocket because it will wake up the client via UDP, then the client
@@ -435,16 +416,18 @@ this.PushService = {
 
     let ppmm = Cc["@mozilla.org/parentprocessmessagemanager;1"]
                  .getService(Ci.nsIMessageBroadcaster);
 
     kCHILD_PROCESS_MESSAGES.forEach(function addMessage(msgName) {
         ppmm.addMessageListener(msgName, this);
     }.bind(this));
 
+    this._alarmID = null;
+
     this._requestTimeout = prefs.get("requestTimeout");
 
     this._udpPort = prefs.get("udp.port");
 
     this._db.getAllChannelIDs(
       function(channelIDs) {
         if (channelIDs.length > 0) {
           debug("Found registered channelIDs. Starting WebSocket");
@@ -461,22 +444,26 @@ this.PushService = {
     // slightly different URLs.
     prefs.observe("serverURL", this);
   },
 
   _shutdownWS: function() {
     debug("shutdownWS()");
     this._currentState = STATE_SHUT_DOWN;
     this._willBeWokenUpByUDP = false;
+
     if (this._wsListener)
       this._wsListener._pushService = null;
     try {
         this._ws.close(0, null);
     } catch (e) {}
     this._ws = null;
+
+    this._waitingForPong = false;
+    this._stopAlarm();
   },
 
   _shutdown: function() {
     debug("_shutdown()");
 
     Services.obs.removeObserver(this, "network-interface-state-changed",
                                 false);
     Services.obs.removeObserver(this, "webapps-uninstall", false);
@@ -493,60 +480,66 @@ this.PushService = {
     // All pending requests (ideally none) are dropped at this point. We
     // shouldn't have any applications performing registration/unregistration
     // or receiving notifications.
     this._shutdownWS();
 
     // At this point, profile-change-net-teardown has already fired, so the
     // WebSocket has been closed with NS_ERROR_ABORT (if it was up) and will
     // try to reconnect. Stop the timer.
-    if (this._retryTimeoutTimer)
-      this._retryTimeoutTimer.cancel();
+    this._stopAlarm();
 
     if (this._requestTimeoutTimer)
       this._requestTimeoutTimer.cancel();
 
     debug("shutdown complete!");
   },
 
-  // aStatusCode is an NS error from Components.results
-  _socketError: function(aStatusCode) {
-    debug("socketError()");
+  /**
+   * How retries work:  The goal is to ensure websocket is always up on
+   * networks not supporting UDP. So the websocket should only be shutdown if
+   * onServerClose indicates UDP wakeup.  If WS is closed due to socket error,
+   * _reconnectAfterBackoff() is called.  The retry alarm is started and when
+   * it times out, beginWSSetup() is called again.
+   *
+   * On a successful connection, the alarm is cancelled in
+   * wsOnMessageAvailable() when the ping alarm is started.
+   *
+   * If we are in the middle of a timeout (i.e. waiting), but
+   * a register/unregister is called, we don't want to wait around anymore.
+   * _sendRequest will automatically call beginWSSetup(), which will cancel the
+   * timer. In addition since the state will have changed, even if a pending
+   * timer event comes in (because the timer fired the event before it was
+   * cancelled), so the connection won't be reset.
+   */
+  _reconnectAfterBackoff: function() {
+    debug("reconnectAfterBackoff()");
 
-    // Calculate new timeout, but cap it to
+    // Calculate new timeout, but cap it to pingInterval.
     var retryTimeout = prefs.get("retryBaseInterval") *
                        Math.pow(2, this._retryFailCount);
-
-    // It is easier to express the max interval as a pref in milliseconds,
-    // rather than have it as a number and make people do the calculation of
-    // retryBaseInterval * 2^maxRetryFailCount.
-    retryTimeout = Math.min(retryTimeout, prefs.get("maxRetryInterval"));
+    retryTimeout = Math.min(retryTimeout, prefs.get("pingInterval"));
 
     this._retryFailCount++;
 
     debug("Retry in " + retryTimeout + " Try number " + this._retryFailCount);
-
-    if (!this._retryTimeoutTimer) {
-      this._retryTimeoutTimer = Cc["@mozilla.org/timer;1"]
-                                  .createInstance(Ci.nsITimer);
-    }
-
-    this._retryTimeoutTimer.init(this,
-                                 retryTimeout,
-                                 Ci.nsITimer.TYPE_ONE_SHOT);
+    this._setAlarm(retryTimeout);
   },
 
   _beginWSSetup: function() {
     debug("beginWSSetup()");
     if (this._currentState != STATE_SHUT_DOWN) {
       debug("_beginWSSetup: Not in shutdown state! Current state " +
             this._currentState);
       return;
     }
 
+    // Stop any pending reconnects scheduled for the near future.
+    this._stopAlarm();
+
     var serverURL = prefs.get("serverURL");
     if (!serverURL) {
       debug("No services.push.serverURL found!");
       return;
     }
 
     var uri;
     try {
@@ -565,24 +558,111 @@ this.PushService = {
       debug("Push over an insecure connection (ws://) is not allowed!");
       return;
     }
     else {
       debug("Unsupported websocket scheme " + uri.scheme);
       return;
     }
 
+
     debug("serverURL: " + uri.spec);
     this._wsListener = new PushWebSocketListener(this);
     this._ws.protocol = "push-notification";
-    this._ws.pingInterval = prefs.get("websocketPingInterval");
     this._ws.asyncOpen(uri, serverURL, this._wsListener, null);
     this._currentState = STATE_WAITING_FOR_WS_START;
   },
 
+  /** |delay| should be in milliseconds. */
+  _setAlarm: function(delay) {
+    // Stop any existing alarm.
+    this._stopAlarm();
+
+    AlarmService.add(
+      {
+        date: new Date(Date.now() + delay),
+        ignoreTimezone: true
+      },
+      this._onAlarmFired.bind(this),
+      function onSuccess(alarmID) {
+        this._alarmID = alarmID;
+        debug("Set alarm " + delay + " in the future " + this._alarmID);
+      }.bind(this)
+    )
+  },
+
+  _stopAlarm: function() {
+    if (this._alarmID !== null) {
+      debug("Stopped existing alarm " + this._alarmID);
+      AlarmService.remove(this._alarmID);
+      this._alarmID = null;
+    }
+  },
+
+  /**
+   * There is only one alarm active at any time. This alarm has 3 intervals
+   * corresponding to 3 tasks.
+   *
+   * 1) Reconnect on ping timeout.
+   *    If we haven't received any messages from the server by the time this
+   *    alarm fires, the connection is closed and PushService tries to
+   *    reconnect, repurposing the alarm for (3).
+   *
+   * 2) Send a ping.
+   *    The protocol sends a ping ({}) on the wire every pingInterval ms. Once
+   *    it sends the ping, the alarm goes to task (1) which is waiting for
+   *    a pong. If data is received after the ping is sent,
+   *    _wsOnMessageAvailable() will reset the ping alarm (which cancels
+   *    waiting for the pong). So as long as the connection is fine, pong alarm
+   *    never fires.
+   *
+   * 3) Reconnect after backoff.
+   *    The alarm is set by _reconnectAfterBackoff() and increases in duration
+   *    every time we try and fail to connect.  When it triggers, websocket
+   *    setup begins again. On successful socket setup, the socket starts
+   *    receiving messages. The alarm now goes to (2) where it monitors the
+   *    WebSocket by sending a ping.  Since incoming data is a sign of the
+   *    connection being up, the ping alarm is reset every time data is
+   *    received.
+   */
+  _onAlarmFired: function() {
+    // Conditions are arranged in decreasing specificity.
+    // i.e. when _waitingForPong is true, other conditions are also true.
+    if (this._waitingForPong) {
+      debug("Did not receive pong in time. Reconnecting WebSocket.");
+      this._shutdownWS();
+      this._reconnectAfterBackoff();
+    }
+    else if (this._currentState == STATE_READY) {
+      // Send a ping.
+      // Bypass the queue; we don't want this to be kept pending.
+      this._ws.sendMsg('{}');
+      debug("Sent ping.");
+      this._waitingForPong = true;
+      this._setAlarm(prefs.get("requestTimeout"));
+    }
+    else if (this._alarmID !== null) {
+      debug("reconnect alarm fired.");
+      // Reconnect after back-off.
+      // The check for a non-null _alarmID prevents a situation where the alarm
+      // fires, but _shutdownWS() is called from another code-path (e.g.
+      // network state change) and we don't want to reconnect.
+      //
+      // It also handles the case where _beginWSSetup() is called from another
+      // code-path.
+      //
+      // alarmID will be non-null only when no shutdown/connect is
+      // called between _reconnectAfterBackoff() setting the alarm and the
+      // alarm firing.
+
+      // Websocket is shut down. Backoff interval expired, try to connect.
+      this._beginWSSetup();
+    }
+  },
+
   /**
    * Protocol handler invoked by server message.
    */
   _handleHelloReply: function(reply) {
     debug("handleHelloReply()");
     if (this._currentState != STATE_WAITING_FOR_HELLO) {
       debug("Unexpected state " + this._currentState +
             "(expected STATE_WAITING_FOR_HELLO)");
@@ -1123,19 +1203,16 @@ this.PushService = {
   _wsOnStart: function(context) {
     debug("wsOnStart()");
     if (this._currentState != STATE_WAITING_FOR_WS_START) {
       debug("NOT in STATE_WAITING_FOR_WS_START. Current state " +
             this._currentState + ". Skipping");
       return;
     }
 
-    if (this._retryTimeoutTimer)
-      this._retryTimeoutTimer.cancel();
-
     // Since we've had a successful connection reset the retry fail count.
     this._retryFailCount = 0;
 
     var data = {
       messageType: "hello",
     }
 
     if (this._UAID)
@@ -1172,27 +1249,35 @@ this.PushService = {
    * connection close status code.
    *
    * If we do not explicitly call ws.close() then statusCode is always
    * NS_BASE_STREAM_CLOSED, even on a successful close.
    */
   _wsOnStop: function(context, statusCode) {
     debug("wsOnStop()");
 
+    this._shutdownWS();
+
     if (statusCode != Cr.NS_OK &&
         !(statusCode == Cr.NS_BASE_STREAM_CLOSED && this._willBeWokenUpByUDP)) {
       debug("Socket error " + statusCode);
-      this._socketError(statusCode);
+      this._reconnectAfterBackoff();
     }
 
-    this._shutdownWS();
   },
 
   _wsOnMessageAvailable: function(context, message) {
     debug("wsOnMessageAvailable() " + message);
+
+    this._waitingForPong = false;
+
+    // Reset the ping timer.  Note: This path is executed at every step of the
+    // handshake, so this alarm does not need to be set explicitly at startup.
+    this._setAlarm(prefs.get("pingInterval"));
+
     var reply = undefined;
     try {
       reply = JSON.parse(message);
     } catch(e) {
       debug("Parsing JSON failed. text : " + message);
       return;
     }
 
@@ -1224,17 +1309,17 @@ this.PushService = {
     }
 
     this[handler](reply);
   },
 
   /**
    * The websocket should never be closed. Since we don't call ws.close(),
    * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
-   * function), which calls socketError and re-establishes the WebSocket
+   * function), which calls reconnect and re-establishes the WebSocket
    * connection.
    *
    * If the server said it'll use UDP for wakeup, we set _willBeWokenUpByUDP
    * and stop reconnecting in _wsOnStop().
    */
   _wsOnServerClose: function(context, aStatusCode, aReason) {
     debug("wsOnServerClose() " + aStatusCode + " " + aReason);
 
@@ -1288,19 +1373,19 @@ this.PushService = {
    * notifications.
    */
   onStopListening: function(aServ, aStatus) {
     debug("UDP Server socket was shutdown. Status: " + aStatus);
     this._beginWSSetup();
   },
 
   /**
-   * Get mobile network information to decide if the client is capable of being woken
-   * up by UDP (which currently just means having an mcc and mnc along with an
-   * IP).
+   * Get mobile network information to decide if the client is capable of being
+   * woken up by UDP (which currently just means having an mcc and mnc along
+   * with an IP).
    */
   _getNetworkState: function() {
     debug("getNetworkState()");
     try {
       var nm = Cc["@mozilla.org/network/manager;1"].getService(Ci.nsINetworkManager);
       if (nm.active && nm.active.type == Ci.nsINetworkInterface.NETWORK_TYPE_MOBILE) {
         var mcp = Cc["@mozilla.org/ril/content-helper;1"].getService(Ci.nsIMobileConnectionProvider);
         if (mcp.iccInfo) {