Bug 1150812 - Split PushService - separate webSocket part. r=nsm, r=mt
authorDragana Damjanovic <dd.mozilla@gmail.com>
Wed, 03 Jun 2015 08:04:00 -0400
changeset 279378 d9707252997de48b577ba539c9027574c3bb37ba
parent 279377 479de45e9179eb441d067e6054dc11b9b08641c2
child 279379 4d17715232f56f8c44afc560c06f98883f25918c
push id897
push userjlund@mozilla.com
push dateMon, 14 Sep 2015 18:56:12 +0000
treeherdermozilla-release@9411e2d2b214 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersnsm, mt
bugs1150812
milestone41.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1150812 - Split PushService - separate webSocket part. r=nsm, r=mt
dom/push/PushDB.jsm
dom/push/PushService.jsm
dom/push/PushServiceWebSocket.jsm
dom/push/moz.build
new file mode 100644
--- /dev/null
+++ b/dom/push/PushDB.jsm
@@ -0,0 +1,206 @@
+/* jshint moz: true, esnext: true */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+// Don't modify this, instead set dom.push.debug.
+let gDebuggingEnabled = false;
+
+function debug(s) {
+  if (gDebuggingEnabled) {
+    dump("-*- PushDB.jsm: " + s + "\n");
+  }
+}
+
+const Cu = Components.utils;
+Cu.import("resource://gre/modules/IndexedDBHelper.jsm");
+Cu.importGlobalProperties(["indexedDB"]);
+
+this.EXPORTED_SYMBOLS = ["PushDB"];
+
+this.PushDB = function PushDB(dbName, dbVersion, dbStoreName, schemaFunction) {
+  debug("PushDB()");
+  this._dbStoreName = dbStoreName;
+  this._schemaFunction = schemaFunction;
+
+  // set the indexeddb database
+  this.initDBHelper(dbName, dbVersion,
+                    [dbStoreName]);
+};
+
+this.PushDB.prototype = {
+  __proto__: IndexedDBHelper.prototype,
+
+  upgradeSchema: function(aTransaction, aDb, aOldVersion, aNewVersion) {
+    if (this._schemaFunction) {
+      this._schemaFunction(aTransaction, aDb, aOldVersion, aNewVersion, this);
+    }
+  },
+
+  /*
+   * @param aChannelRecord
+   *        The record to be added.
+   */
+
+  put: function(aChannelRecord) {
+    debug("put()" + JSON.stringify(aChannelRecord));
+
+    return new Promise((resolve, reject) =>
+      this.newTxn(
+        "readwrite",
+        this._dbStoreName,
+        function txnCb(aTxn, aStore) {
+          debug("Going to put " + aChannelRecord.channelID);
+          aStore.put(aChannelRecord).onsuccess = function setTxnResult(aEvent) {
+            debug("Request successful. Updated record ID: " +
+                  aEvent.target.result);
+          };
+        },
+        resolve,
+        reject
+      )
+    );
+  },
+
+  /*
+   * @param aChannelID
+   *        The ID of record to be deleted.
+   */
+  delete: function(aChannelID) {
+    debug("delete()");
+
+    return new Promise((resolve, reject) =>
+      this.newTxn(
+        "readwrite",
+        this._dbStoreName,
+        function txnCb(aTxn, aStore) {
+          debug("Going to delete " + aChannelID);
+          aStore.delete(aChannelID);
+        },
+        resolve,
+        reject
+      )
+    );
+  },
+
+  clearAll: function clear() {
+    return new Promise((resolve, reject) =>
+      this.newTxn(
+        "readwrite",
+        this._dbStoreName,
+        function (aTxn, aStore) {
+          debug("Going to clear all!");
+          aStore.clear();
+        },
+        resolve,
+        reject
+      )
+    );
+  },
+
+  getByPushEndpoint: function(aPushEndpoint) {
+    debug("getByPushEndpoint()");
+
+    return new Promise((resolve, reject) =>
+      this.newTxn(
+        "readonly",
+        this._dbStoreName,
+        function txnCb(aTxn, aStore) {
+          aTxn.result = undefined;
+
+          let index = aStore.index("pushEndpoint");
+          index.get(aPushEndpoint).onsuccess = function setTxnResult(aEvent) {
+            aTxn.result = aEvent.target.result;
+            debug("Fetch successful " + aEvent.target.result);
+          }
+        },
+        resolve,
+        reject
+      )
+    );
+  },
+
+  getByChannelID: function(aChannelID) {
+    debug("getByChannelID()");
+
+    return new Promise((resolve, reject) =>
+      this.newTxn(
+        "readonly",
+        this._dbStoreName,
+        function txnCb(aTxn, aStore) {
+          aTxn.result = undefined;
+
+          aStore.get(aChannelID).onsuccess = function setTxnResult(aEvent) {
+            aTxn.result = aEvent.target.result;
+            debug("Fetch successful " + aEvent.target.result);
+          }
+        },
+        resolve,
+        reject
+      )
+    );
+  },
+
+
+  getByScope: function(aScope) {
+    debug("getByScope() " + aScope);
+
+    return new Promise((resolve, reject) =>
+      this.newTxn(
+        "readonly",
+        this._dbStoreName,
+        function txnCb(aTxn, aStore) {
+          aTxn.result = undefined;
+
+          let index = aStore.index("scope");
+          index.get(aScope).onsuccess = function setTxnResult(aEvent) {
+            aTxn.result = aEvent.target.result;
+            debug("Fetch successful " + aEvent.target.result);
+          }
+        },
+        resolve,
+        reject
+      )
+    );
+  },
+
+  getAllChannelIDs: function() {
+    debug("getAllChannelIDs()");
+
+    return new Promise((resolve, reject) =>
+      this.newTxn(
+        "readonly",
+        this._dbStoreName,
+        function txnCb(aTxn, aStore) {
+          aStore.mozGetAll().onsuccess = function(event) {
+            aTxn.result = event.target.result;
+          }
+        },
+        resolve,
+        reject
+      )
+    );
+  },
+
+  drop: function() {
+    debug("drop()");
+
+    return new Promise((resolve, reject) =>
+      this.newTxn(
+        "readwrite",
+        this._dbStoreName,
+        function txnCb(aTxn, aStore) {
+          aStore.clear();
+        },
+        resolve,
+        reject
+      )
+    );
+  },
+
+  observeDebug: function(enabled) {
+    gDebuggingEnabled = enabled;
+  }
+};
--- a/dom/push/PushService.jsm
+++ b/dom/push/PushService.jsm
@@ -13,391 +13,91 @@ function debug(s) {
     dump("-*- PushService.jsm: " + s + "\n");
 }
 
 const Cc = Components.classes;
 const Ci = Components.interfaces;
 const Cu = Components.utils;
 const Cr = Components.results;
 
+const {PushDB} = Cu.import("resource://gre/modules/PushDB.jsm");
 Cu.import("resource://gre/modules/XPCOMUtils.jsm");
 Cu.import("resource://gre/modules/Services.jsm");
-Cu.import("resource://gre/modules/IndexedDBHelper.jsm");
 Cu.import("resource://gre/modules/Timer.jsm");
 Cu.import("resource://gre/modules/Preferences.jsm");
 Cu.import("resource://gre/modules/Promise.jsm");
-Cu.importGlobalProperties(["indexedDB"]);
 
-XPCOMUtils.defineLazyServiceGetter(this, "gDNSService",
-                                   "@mozilla.org/network/dns-service;1",
-                                   "nsIDNSService");
+const {PushServiceWebSocket} = Cu.import("resource://gre/modules/PushServiceWebSocket.jsm");
 
 XPCOMUtils.defineLazyModuleGetter(this, "AlarmService",
                                   "resource://gre/modules/AlarmService.jsm");
 
-#ifdef MOZ_B2G
-XPCOMUtils.defineLazyServiceGetter(this, "gPowerManagerService",
-                                   "@mozilla.org/power/powermanagerservice;1",
-                                   "nsIPowerManagerService");
-#endif
-
-var threadManager = Cc["@mozilla.org/thread-manager;1"].getService(Ci.nsIThreadManager);
-
 this.EXPORTED_SYMBOLS = ["PushService"];
 
 const prefs = new Preferences("dom.push.");
 // Set debug first so that all debugging actually works.
 gDebuggingEnabled = prefs.get("debug");
 
-const kPUSHDB_DB_NAME = "pushapi";
-const kPUSHDB_DB_VERSION = 1; // Change this if the IndexedDB format changes
-const kPUSHDB_STORE_NAME = "pushapi";
-
-const kUDP_WAKEUP_WS_STATUS_CODE = 4774;  // WebSocket Close status code sent
-                                          // by server to signal that it can
-                                          // wake client up using UDP.
+const kPUSHWSDB_DB_NAME = "pushapi";
+const kPUSHWSDB_DB_VERSION = 1; // Change this if the IndexedDB format changes
+const kPUSHWSDB_STORE_NAME = "pushapi";
 
 const kCHILD_PROCESS_MESSAGES = ["Push:Register", "Push:Unregister",
                                  "Push:Registration"];
 
-const kWS_MAX_WENTDOWN = 2;
-
-// 1 minute is the least allowed ping interval
-const kWS_MIN_PING_INTERVAL = 60000;
-
-// This is a singleton
-this.PushDB = function PushDB() {
-  debug("PushDB()");
-
-  // set the indexeddb database
-  this.initDBHelper(kPUSHDB_DB_NAME, kPUSHDB_DB_VERSION,
-                    [kPUSHDB_STORE_NAME]);
-};
-
-this.PushDB.prototype = {
-  __proto__: IndexedDBHelper.prototype,
-
-  upgradeSchema: function(aTransaction, aDb, aOldVersion, aNewVersion) {
-    debug("PushDB.upgradeSchema()");
-
-    let objectStore = aDb.createObjectStore(kPUSHDB_STORE_NAME,
-                                            { keyPath: "channelID" });
-
-    // index to fetch records based on endpoints. used by unregister
-    objectStore.createIndex("pushEndpoint", "pushEndpoint", { unique: true });
-
-    // index to fetch records per scope, so we can identify endpoints
-    // associated with an app.
-    objectStore.createIndex("scope", "scope", { unique: true });
-  },
-
-  /*
-   * @param aChannelRecord
-   *        The record to be added.
-   */
-  put: function(aChannelRecord) {
-    debug("put()" + JSON.stringify(aChannelRecord));
-
-    return new Promise((resolve, reject) =>
-      this.newTxn(
-        "readwrite",
-        kPUSHDB_STORE_NAME,
-        function txnCb(aTxn, aStore) {
-          debug("Going to put " + aChannelRecord.channelID);
-          aStore.put(aChannelRecord).onsuccess = function setTxnResult(aEvent) {
-            debug("Request successful. Updated record ID: " +
-                  aEvent.target.result);
-          };
-        },
-        resolve,
-        reject
-      )
-    );
-  },
-
-  /*
-   * @param aChannelID
-   *        The ID of record to be deleted.
+var upgradeSchemaWS = function(aTransaction, aDb, aOldVersion, aNewVersion, aDbInstance) {
+  debug("upgradeSchemaWS()");
 
-   */
-  delete: function(aChannelID) {
-    debug("delete()");
-
-    return new Promise((resolve, reject) =>
-      this.newTxn(
-        "readwrite",
-        kPUSHDB_STORE_NAME,
-        function txnCb(aTxn, aStore) {
-          debug("Going to delete " + aChannelID);
-          aStore.delete(aChannelID);
-        },
-        resolve,
-        reject
-      )
-    );
-  },
-
-  clearAll: function clear() {
-    return new Promise((resolve, reject) =>
-      this.newTxn(
-        "readwrite",
-        kPUSHDB_STORE_NAME,
-        function (aTxn, aStore) {
-          debug("Going to clear all!");
-          aStore.clear();
-        },
-        resolve,
-        reject
-      )
-    );
-  },
-
-  getByPushEndpoint: function(aPushEndpoint) {
-    debug("getByPushEndpoint()");
-
-    return new Promise((resolve, reject) =>
-      this.newTxn(
-        "readonly",
-        kPUSHDB_STORE_NAME,
-        function txnCb(aTxn, aStore) {
-          aTxn.result = undefined;
-
-          let index = aStore.index("pushEndpoint");
-          index.get(aPushEndpoint).onsuccess = function setTxnResult(aEvent) {
-            aTxn.result = aEvent.target.result;
-            debug("Fetch successful " + aEvent.target.result);
-          }
-        },
-        resolve,
-        reject
-      )
-    );
-  },
-
-  getByChannelID: function(aChannelID) {
-    debug("getByChannelID()");
-
-    return new Promise((resolve, reject) =>
-      this.newTxn(
-        "readonly",
-        kPUSHDB_STORE_NAME,
-        function txnCb(aTxn, aStore) {
-          aTxn.result = undefined;
+  let objectStore = aDb.createObjectStore(aDbInstance._dbStoreName,
+                                          { keyPath: "channelID" });
 
-          aStore.get(aChannelID).onsuccess = function setTxnResult(aEvent) {
-            aTxn.result = aEvent.target.result;
-            debug("Fetch successful " + aEvent.target.result);
-          }
-        },
-        resolve,
-        reject
-      )
-    );
-  },
-
-
-  getByScope: function(aScope) {
-    debug("getByScope() " + aScope);
-
-    return new Promise((resolve, reject) =>
-      this.newTxn(
-        "readonly",
-        kPUSHDB_STORE_NAME,
-        function txnCb(aTxn, aStore) {
-          aTxn.result = undefined;
-
-          let index = aStore.index("scope");
-          index.get(aScope).onsuccess = function setTxnResult(aEvent) {
-            aTxn.result = aEvent.target.result;
-            debug("Fetch successful " + aEvent.target.result);
-          }
-        },
-        resolve,
-        reject
-      )
-    );
-  },
-
-  getAllChannelIDs: function() {
-    debug("getAllChannelIDs()");
-
-    return new Promise((resolve, reject) =>
-      this.newTxn(
-        "readonly",
-        kPUSHDB_STORE_NAME,
-        function txnCb(aTxn, aStore) {
-          aStore.mozGetAll().onsuccess = function(event) {
-            aTxn.result = event.target.result;
-          }
-        },
-        resolve,
-        reject
-      )
-    );
-  },
-
-  drop: function() {
-    debug("drop()");
+  // index to fetch records based on endpoints. used by unregister
+  objectStore.createIndex("pushEndpoint", "pushEndpoint", { unique: true });
 
-    return new Promise((resolve, reject) =>
-      this.newTxn(
-        "readwrite",
-        kPUSHDB_STORE_NAME,
-        function txnCb(aTxn, aStore) {
-          aStore.clear();
-        },
-        resolve,
-        reject
-      )
-    );
-  }
+  // index to fetch records per scope, so we can identify endpoints
+  // associated with an app.
+  objectStore.createIndex("scope", "scope", { unique: true });
 };
 
 /**
- * A proxy between the PushService and the WebSocket. The listener is used so
- * that the PushService can silence messages from the WebSocket by setting
- * PushWebSocketListener._pushService to null. This is required because
- * a WebSocket can continue to send messages or errors after it has been
- * closed but the PushService may not be interested in these. It's easier to
- * stop listening than to have checks at specific points.
- */
-this.PushWebSocketListener = function(pushService) {
-  this._pushService = pushService;
-}
-
-this.PushWebSocketListener.prototype = {
-  onStart: function(context) {
-    if (!this._pushService)
-        return;
-    this._pushService._wsOnStart(context);
-  },
-
-  onStop: function(context, statusCode) {
-    if (!this._pushService)
-        return;
-    this._pushService._wsOnStop(context, statusCode);
-  },
-
-  onAcknowledge: function(context, size) {
-    // EMPTY
-  },
-
-  onBinaryMessageAvailable: function(context, message) {
-    // EMPTY
-  },
-
-  onMessageAvailable: function(context, message) {
-    if (!this._pushService)
-        return;
-    this._pushService._wsOnMessageAvailable(context, message);
-  },
-
-  onServerClose: function(context, aStatusCode, aReason) {
-    if (!this._pushService)
-        return;
-    this._pushService._wsOnServerClose(context, aStatusCode, aReason);
-  }
-}
-
-// websocket states
-// websocket is off
-const STATE_SHUT_DOWN = 0;
-// Websocket has been opened on client side, waiting for successful open.
-// (_wsOnStart)
-const STATE_WAITING_FOR_WS_START = 1;
-// Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
-const STATE_WAITING_FOR_HELLO = 2;
-// Websocket operational, handshake completed, begin protocol messaging.
-const STATE_READY = 3;
-
-/**
  * The implementation of the SimplePush system. This runs in the B2G parent
  * process and is started on boot. It uses WebSockets to communicate with the
  * server and PushDB (IndexedDB) for persistence.
  */
 this.PushService = {
+  _service: null,
+
   observe: function observe(aSubject, aTopic, aData) {
     switch (aTopic) {
       /*
        * We need to call uninit() on shutdown to clean up things that modules aren't very good
        * at automatically cleaning up, so we don't get shutdown leaks on browser shutdown.
        */
       case "xpcom-shutdown":
         this.uninit();
         break;
       case "network-active-changed":         /* On B2G. */
       case "network:offline-status-changed": /* On desktop. */
-        // In case of network-active-changed, always disconnect existing
-        // connections. In case of offline-status changing from offline to
-        // online, it is likely that these statements will be no-ops.
-        if (this._udpServer) {
-          this._udpServer.close();
-          // Set to null since this is checked in _listenForUDPWakeup()
-          this._udpServer = null;
-        }
-
-        this._shutdownWS();
-
-        // Try to connect if network-active-changed or the offline-status
-        // changed to online.
-        if (aTopic === "network-active-changed" || aData === "online") {
-          this._startListeningIfChannelsPresent();
-        }
+        this._service.observeNetworkChange(aSubject, aTopic, aData);
         break;
       case "nsPref:changed":
         if (aData == "dom.push.serverURL") {
           debug("dom.push.serverURL changed! websocket. new value " +
                 prefs.get("serverURL"));
-          this._shutdownWS();
+          this._service.shutdownService();
         } else if (aData == "dom.push.connection.enabled") {
-          if (prefs.get("connection.enabled")) {
-            this._startListeningIfChannelsPresent();
-          } else {
-            this._shutdownWS();
-          }
+          this._service.observePushConnectionPref(prefs.get("connection.enabled"));
         } else if (aData == "dom.push.debug") {
           gDebuggingEnabled = prefs.get("debug");
+          this._service.observeDebug(gDebuggingEnabled);
+          this._db.observeDebug(gDebuggingEnabled);
         }
         break;
       case "timer-callback":
-        if (aSubject == this._requestTimeoutTimer) {
-          if (Object.keys(this._pendingRequests).length == 0) {
-            this._requestTimeoutTimer.cancel();
-          }
-
-          // Set to true if at least one request timed out.
-          let requestTimedOut = false;
-          for (let channelID in this._pendingRequests) {
-            let duration = Date.now() - this._pendingRequests[channelID].ctime;
-
-            // If any of the registration requests time out, all the ones after it
-            // also made to fail, since we are going to be disconnecting the socket.
-            if (requestTimedOut || duration > this._requestTimeout) {
-              debug("Request timeout: Removing " + channelID);
-              requestTimedOut = true;
-              this._pendingRequests[channelID]
-                .deferred.reject({status: 0, error: "TimeoutError"});
-
-              delete this._pendingRequests[channelID];
-              for (let i = this._requestQueue.length - 1; i >= 0; --i) {
-                let [, data] = this._requestQueue[i];
-                if (data && data.channelID == channelID) {
-                  this._requestQueue.splice(i, 1);
-                }
-              }
-            }
-          }
-
-          // The most likely reason for a registration request timing out is
-          // that the socket has disconnected. Best to reconnect.
-          if (requestTimedOut) {
-            this._shutdownWS();
-            this._reconnectAfterBackoff();
-          }
-        }
+        this._service.observeTimer(aSubject, aTopic, aData);
         break;
       case "webapps-clear-data":
         debug("webapps-clear-data");
 
         let data = aSubject.QueryInterface(Ci.mozIApplicationClearPrivateDataParams);
         if (!data) {
           debug("webapps-clear-data: Failed to get information about application");
           return;
@@ -409,186 +109,65 @@ this.PushService = {
                             .getService(Ci.nsIAppsService);
         let scope = appsService.getScopeByLocalId(data.appId);
         if (!scope) {
           debug("webapps-clear-data: No scope found for " + data.appId);
           return;
         }
 
         this._db.getByScope(scope)
-          .then(record => {
+          .then(record =>
             this._db.delete(records.channelID)
-              .then(_ => {
-                // courtesy, but don't establish a connection
-                // just for it
-                if (this._ws) {
-                  debug("Had a connection, so telling the server");
-                  this._send("unregister", {channelID: records.channelID});
-                }
-              }, err => {
-                debug("webapps-clear-data: " + scope +
-                      " Could not delete entry " + records.channelID);
+              .then(_ =>
+                this._service.unregister(records),
+                err => {
+                  debug("webapps-clear-data: " + scope +
+                        " Could not delete entry " + records.channelID);
 
-                // courtesy, but don't establish a connection
-                // just for it
-                if (this._ws) {
-                  debug("Had a connection, so telling the server");
-                  this._send("unregister", {channelID: records.channelID});
-                }
+                this._service.unregister(records)
                 throw "Database error";
-              });
-          }, _ => {
+              })
+          , _ => {
             debug("webapps-clear-data: Error in getByScope(" + scope + ")");
           });
 
         break;
     }
   },
 
-  get _UAID() {
-    return prefs.get("userAgentID");
-  },
-
-  set _UAID(newID) {
-    if (typeof(newID) !== "string") {
-      debug("Got invalid, non-string UAID " + newID +
-            ". Not updating userAgentID");
-      return;
+  // utility function used to add/remove observers in init() and shutdown()
+  getNetworkStateChangeEventName: function() {
+    try {
+      Cc["@mozilla.org/network/manager;1"].getService(Ci.nsINetworkManager);
+      return "network-active-changed";
+    } catch (e) {
+      return "network:offline-status-changed";
     }
-    debug("New _UAID: " + newID);
-    prefs.set("userAgentID", newID);
-  },
-
-  // keeps requests buffered if the websocket disconnects or is not connected
-  _requestQueue: [],
-  _ws: null,
-  _pendingRequests: {},
-  _currentState: STATE_SHUT_DOWN,
-  _requestTimeout: 0,
-  _requestTimeoutTimer: null,
-  _retryFailCount: 0,
-
-  /**
-   * According to the WS spec, servers should immediately close the underlying
-   * TCP connection after they close a WebSocket. This causes wsOnStop to be
-   * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
-   * WebSocket up, it should try to reconnect. But if the server closes the
-   * WebSocket because it will wake up the client via UDP, then the client
-   * shouldn't re-establish the connection. If the server says that it will
-   * wake up the client over UDP, this is set to true in wsOnServerClose. It is
-   * checked in wsOnStop.
-   */
-  _willBeWokenUpByUDP: false,
-
-  /**
-   * Holds if the adaptive ping is enabled. This is read on init().
-   * If adaptive ping is enabled, a new ping is calculed each time we receive
-   * a pong message, trying to maximize network resources while minimizing
-   * cellular signalling storms.
-   */
-  _adaptiveEnabled: false,
-
-  /**
-   * This saves a flag about if we need to recalculate a new ping, based on:
-   *   1) the gap between the maximum working ping and the first ping that
-   *      gives an error (timeout) OR
-   *   2) we have reached the pref of the maximum value we allow for a ping
-   *      (dom.push.adaptive.upperLimit)
-   */
-  _recalculatePing: true,
-
-  /**
-   * This map holds a (pingInterval, triedTimes) of each pingInterval tried.
-   * It is used to check if the pingInterval has been tested enough to know that
-   * is incorrect and is above the limit the network allow us to keep the
-   * connection open.
-   */
-  _pingIntervalRetryTimes: {},
-
-  /**
-   * Holds the lastGoodPingInterval for our current connection.
-   */
-  _lastGoodPingInterval: 0,
-
-  /**
-   * Maximum ping interval that we can reach.
-   */
-  _upperLimit: 0,
-
-  /**
-   * Count the times WebSocket goes down without receiving Pings
-   * so we can re-enable the ping recalculation algorithm
-   */
-  _wsWentDownCounter: 0,
-
-  /**
-   * Sends a message to the Push Server through an open websocket.
-   * typeof(msg) shall be an object
-   */
-  _wsSendMessage: function(msg) {
-    if (!this._ws) {
-      debug("No WebSocket initialized. Cannot send a message.");
-      return;
-    }
-    msg = JSON.stringify(msg);
-    debug("Sending message: " + msg);
-    this._ws.sendMsg(msg);
   },
 
   init: function(options = {}) {
     debug("init()");
     if (this._started) {
       return;
     }
 
-    // Override the backing store for testing.
-    this._db = options.db;
-    if (!this._db) {
-      this._db = new PushDB();
-    }
-
-    // Override the default WebSocket factory function. The returned object
-    // must be null or satisfy the nsIWebSocketChannel interface. Used by
-    // the tests to provide a mock WebSocket implementation.
-    if (options.makeWebSocket) {
-      this._makeWebSocket = options.makeWebSocket;
-    }
-
-    // Override the default UDP socket factory function. The returned object
-    // must be null or satisfy the nsIUDPSocket interface. Used by the
-    // UDP tests.
-    if (options.makeUDPSocket) {
-      this._makeUDPSocket = options.makeUDPSocket;
-    }
-
-    this._networkInfo = options.networkInfo;
-    if (!this._networkInfo) {
-      this._networkInfo = PushNetworkInfo;
-    }
-
     var globalMM = Cc["@mozilla.org/globalmessagemanager;1"]
                .getService(Ci.nsIFrameScriptLoader);
 
     globalMM.loadFrameScript("chrome://global/content/PushServiceChildPreload.js", true);
 
     let ppmm = Cc["@mozilla.org/parentprocessmessagemanager;1"]
                  .getService(Ci.nsIMessageBroadcaster);
 
     kCHILD_PROCESS_MESSAGES.forEach(function addMessage(msgName) {
         ppmm.addMessageListener(msgName, this);
     }.bind(this));
 
     this._alarmID = null;
 
-    this._requestTimeout = prefs.get("requestTimeout");
-    this._adaptiveEnabled = prefs.get('adaptive.enabled');
-    this._upperLimit = prefs.get('adaptive.upperLimit');
-
-    this._startListeningIfChannelsPresent();
-
     Services.obs.addObserver(this, "xpcom-shutdown", false);
     Services.obs.addObserver(this, "webapps-clear-data", false);
 
     // On B2G the NetworkManager interface fires a network-active-changed
     // event.
     //
     // The "active network" is based on priority - i.e. Wi-Fi has higher
     // priority than data. The PushService should just use the preferred
@@ -600,293 +179,66 @@ this.PushService = {
     //
     // On non-B2G platforms, the offline-status-changed event is used to know
     // when to (dis)connect. It may not fire if the underlying OS changes
     // networks; in such a case we rely on timeout.
     //
     // On B2G both events fire, one after the other, when the network goes
     // online, so we explicitly check for the presence of NetworkManager and
     // don't add an observer for offline-status-changed on B2G.
-    this._networkStateChangeEventName = this._networkInfo.getNetworkStateChangeEventName();
+    this._networkStateChangeEventName = this.getNetworkStateChangeEventName();
     Services.obs.addObserver(this, this._networkStateChangeEventName, false);
 
     // This is only used for testing. Different tests require connecting to
     // slightly different URLs.
     prefs.observe("serverURL", this);
     // Used to monitor if the user wishes to disable Push.
     prefs.observe("connection.enabled", this);
     // Debugging
     prefs.observe("debug", this);
-
+    this._db = options.db;
+    if (!this._db) {
+      this._db = new PushDB(kPUSHWSDB_DB_NAME, kPUSHWSDB_DB_VERSION, kPUSHWSDB_STORE_NAME, upgradeSchemaWS);
+    }
+    this._service = PushServiceWebSocket;
+    this._service.init(options, this);
+    this._service.startListeningIfChannelsPresent();
     this._started = true;
   },
 
-  _shutdownWS: function() {
-    debug("shutdownWS()");
-    this._currentState = STATE_SHUT_DOWN;
-    this._willBeWokenUpByUDP = false;
-
-    if (this._wsListener)
-      this._wsListener._pushService = null;
-    try {
-        this._ws.close(0, null);
-    } catch (e) {}
-    this._ws = null;
-
-    this._waitingForPong = false;
-    this._stopAlarm();
-    this._cancelPendingRequests();
-  },
-
   uninit: function() {
     if (!this._started)
       return;
 
     debug("uninit()");
 
     prefs.ignore("debug", this);
     prefs.ignore("connection.enabled", this);
     prefs.ignore("serverURL", this);
     Services.obs.removeObserver(this, this._networkStateChangeEventName);
     Services.obs.removeObserver(this, "webapps-clear-data", false);
     Services.obs.removeObserver(this, "xpcom-shutdown", false);
 
+    // At this point, profile-change-net-teardown has already fired, so the
+    // WebSocket has been closed with NS_ERROR_ABORT (if it was up) and will
+    // try to reconnect. Stop the timer.
+    this.stopAlarm();
+
     if (this._db) {
       this._db.close();
       this._db = null;
     }
 
-    if (this._udpServer) {
-      this._udpServer.close();
-      this._udpServer = null;
-    }
-
-    // All pending requests (ideally none) are dropped at this point. We
-    // shouldn't have any applications performing registration/unregistration
-    // or receiving notifications.
-    this._shutdownWS();
-
-    // At this point, profile-change-net-teardown has already fired, so the
-    // WebSocket has been closed with NS_ERROR_ABORT (if it was up) and will
-    // try to reconnect. Stop the timer.
-    this._stopAlarm();
-
-    if (this._requestTimeoutTimer) {
-      this._requestTimeoutTimer.cancel();
-    }
+    this._service.uninit();
 
     this._started = false;
     debug("shutdown complete!");
   },
 
-  /**
-   * How retries work:  The goal is to ensure websocket is always up on
-   * networks not supporting UDP. So the websocket should only be shutdown if
-   * onServerClose indicates UDP wakeup.  If WS is closed due to socket error,
-   * _reconnectAfterBackoff() is called.  The retry alarm is started and when
-   * it times out, beginWSSetup() is called again.
-   *
-   * On a successful connection, the alarm is cancelled in
-   * wsOnMessageAvailable() when the ping alarm is started.
-   *
-   * If we are in the middle of a timeout (i.e. waiting), but
-   * a register/unregister is called, we don't want to wait around anymore.
-   * _sendRequest will automatically call beginWSSetup(), which will cancel the
-   * timer. In addition since the state will have changed, even if a pending
-   * timer event comes in (because the timer fired the event before it was
-   * cancelled), so the connection won't be reset.
-   */
-  _reconnectAfterBackoff: function() {
-    debug("reconnectAfterBackoff()");
-    //Calculate new ping interval
-    this._calculateAdaptivePing(true /* wsWentDown */);
-
-    // Calculate new timeout, but cap it to pingInterval.
-    let retryTimeout = prefs.get("retryBaseInterval") *
-                       Math.pow(2, this._retryFailCount);
-    retryTimeout = Math.min(retryTimeout, prefs.get("pingInterval"));
-
-    this._retryFailCount++;
-
-    debug("Retry in " + retryTimeout + " Try number " + this._retryFailCount);
-    this._setAlarm(retryTimeout);
-  },
-
-  /**
-   * We need to calculate a new ping based on:
-   *  1) Latest good ping
-   *  2) A safe gap between 1) and the calculated new ping (which is
-   *  by default, 1 minute)
-   *
-   * This is for 3G networks, whose connections keepalives differ broadly,
-   * for example:
-   *  1) Movistar Spain: 29 minutes
-   *  2) VIVO Brazil: 5 minutes
-   *  3) Movistar Colombia: XXX minutes
-   *
-   * So a fixed ping is not good for us for two reasons:
-   *  1) We might lose the connection, so we need to reconnect again (wasting
-   *  resources)
-   *  2) We use a lot of network signaling just for pinging.
-   *
-   * This algorithm tries to search the best value between a disconnection and a
-   * valid ping, to ensure better battery life and network resources usage.
-   *
-   * The value is saved in dom.push.pingInterval
-   * @param wsWentDown [Boolean] if the WebSocket was closed or it is still alive
-   *
-   */
-  _calculateAdaptivePing: function(wsWentDown) {
-    debug('_calculateAdaptivePing()');
-    if (!this._adaptiveEnabled) {
-      debug('Adaptive ping is disabled');
-      return;
-    }
-
-    if (this._retryFailCount > 0) {
-      debug('Push has failed to connect to the Push Server ' +
-        this._retryFailCount + ' times. ' +
-        'Do not calculate a new pingInterval now');
-      return;
-    }
-
-    if (!wsWentDown) {
-      debug('Setting websocket down counter to 0');
-      this._wsWentDownCounter = 0;
-    }
-
-    if (!this._recalculatePing && !wsWentDown) {
-      debug('We do not need to recalculate the ping now, based on previous data');
-      return;
-    }
-
-    // Save actual state of the network
-    let ns = this._networkInfo.getNetworkInformation();
-
-    if (ns.ip) {
-      // mobile
-      debug('mobile');
-      let oldNetwork = prefs.get('adaptive.mobile');
-      let newNetwork = 'mobile-' + ns.mcc + '-' + ns.mnc;
-
-      // Mobile networks differ, reset all intervals and pings
-      if (oldNetwork !== newNetwork) {
-        // Network differ, reset all values
-        debug('Mobile networks differ. Old network is ' + oldNetwork +
-              ' and new is ' + newNetwork);
-        prefs.set('adaptive.mobile', newNetwork);
-        //We reset the upper bound member
-        this._recalculatePing = true;
-        this._pingIntervalRetryTimes = {};
-
-        // Put default values
-        let defaultPing = prefs.get('pingInterval.default');
-        prefs.set('pingInterval', defaultPing);
-        this._lastGoodPingInterval = defaultPing;
-
-      } else {
-        // Mobile network is the same, let's just update things
-        prefs.set('pingInterval', prefs.get('pingInterval.mobile'));
-        this._lastGoodPingInterval = prefs.get('adaptive.lastGoodPingInterval.mobile');
-      }
-
-    } else {
-      // wifi
-      debug('wifi');
-      prefs.set('pingInterval', prefs.get('pingInterval.wifi'));
-      this._lastGoodPingInterval = prefs.get('adaptive.lastGoodPingInterval.wifi');
-    }
-
-    let nextPingInterval;
-    let lastTriedPingInterval = prefs.get('pingInterval');
-
-    if (!this._recalculatePing && wsWentDown) {
-      debug('Websocket disconnected without ping adaptative algorithm running');
-      this._wsWentDownCounter++;
-      if (this._wsWentDownCounter > kWS_MAX_WENTDOWN) {
-        debug('Too many disconnects. Reenabling ping adaptative algoritm');
-        this._wsWentDownCounter = 0;
-        this._recalculatePing = true;
-        this._lastGoodPingInterval = Math.floor(lastTriedPingInterval / 2);
-        if (this._lastGoodPingInterval < kWS_MIN_PING_INTERVAL) {
-          nextPingInterval = kWS_MIN_PING_INTERVAL;
-        } else {
-          nextPingInterval = this._lastGoodPingInterval;
-        }
-        prefs.set('pingInterval', nextPingInterval);
-        this._save(ns, nextPingInterval);
-        return;
-      }
-
-      debug('We do not need to recalculate the ping, based on previous data');
-    }
-
-    if (wsWentDown) {
-      debug('The WebSocket was disconnected, calculating next ping');
-
-      // If we have not tried this pingInterval yet, initialize
-      this._pingIntervalRetryTimes[lastTriedPingInterval] =
-           (this._pingIntervalRetryTimes[lastTriedPingInterval] || 0) + 1;
-
-       // Try the pingInterval at least 3 times, just to be sure that the
-       // calculated interval is not valid.
-       if (this._pingIntervalRetryTimes[lastTriedPingInterval] < 2) {
-         debug('pingInterval= ' + lastTriedPingInterval + ' tried only ' +
-           this._pingIntervalRetryTimes[lastTriedPingInterval] + ' times');
-         return;
-       }
-
-       // Latest ping was invalid, we need to lower the limit to limit / 2
-       nextPingInterval = Math.floor(lastTriedPingInterval / 2);
-
-      // If the new ping interval is close to the last good one, we are near
-      // optimum, so stop calculating.
-      if (nextPingInterval - this._lastGoodPingInterval < prefs.get('adaptive.gap')) {
-        debug('We have reached the gap, we have finished the calculation');
-        debug('nextPingInterval=' + nextPingInterval);
-        debug('lastGoodPing=' + this._lastGoodPingInterval);
-        nextPingInterval = this._lastGoodPingInterval;
-        this._recalculatePing = false;
-      } else {
-        debug('We need to calculate next time');
-        this._recalculatePing = true;
-      }
-
-    } else {
-      debug('The WebSocket is still up');
-      this._lastGoodPingInterval = lastTriedPingInterval;
-      nextPingInterval = Math.floor(lastTriedPingInterval * 1.5);
-    }
-
-    // Check if we have reached the upper limit
-    if (this._upperLimit < nextPingInterval) {
-      debug('Next ping will be bigger than the configured upper limit, capping interval');
-      this._recalculatePing = false;
-      this._lastGoodPingInterval = lastTriedPingInterval;
-      nextPingInterval = lastTriedPingInterval;
-    }
-
-    debug('Setting the pingInterval to ' + nextPingInterval);
-    prefs.set('pingInterval', nextPingInterval);
-
-    this._save(ns, nextPingInterval);
-  },
-
-  _save: function(ns, nextPingInterval){
-    //Save values for our current network
-    if (ns.ip) {
-      prefs.set('pingInterval.mobile', nextPingInterval);
-      prefs.set('adaptive.lastGoodPingInterval.mobile', this._lastGoodPingInterval);
-    } else {
-      prefs.set('pingInterval.wifi', nextPingInterval);
-      prefs.set('adaptive.lastGoodPingInterval.wifi', this._lastGoodPingInterval);
-    }
-  },
-
-  _getServerURI: function() {
+  getServerURI: function() {
     let serverURL = prefs.get("serverURL");
     if (!serverURL) {
       debug("No dom.push.serverURL found!");
       return;
     }
 
     let uri;
     try {
@@ -894,507 +246,62 @@ this.PushService = {
     } catch(e) {
       debug("Error creating valid URI from dom.push.serverURL (" +
             serverURL + ")");
       return;
     }
     return uri;
   },
 
-  _makeWebSocket: function(uri) {
-    if (!prefs.get("connection.enabled")) {
-      debug("_makeWebSocket: connection.enabled is not set to true. Aborting.");
-      return null;
-    }
-
-    if (Services.io.offline) {
-      debug("Network is offline.");
-      return null;
-    }
-
-    let socket;
-    if (uri.scheme === "wss") {
-      socket = Cc["@mozilla.org/network/protocol;1?name=wss"]
-                 .createInstance(Ci.nsIWebSocketChannel);
-    }
-    else if (uri.scheme === "ws") {
-      if (uri.host != "localhost") {
-        debug("Push over an insecure connection (ws://) is not allowed!");
-        return null;
-      }
-      socket = Cc["@mozilla.org/network/protocol;1?name=ws"]
-                 .createInstance(Ci.nsIWebSocketChannel);
-    }
-    else {
-      debug("Unsupported websocket scheme " + uri.scheme);
-      return null;
-    }
-
-    socket.initLoadInfo(null, // aLoadingNode
-                        Services.scriptSecurityManager.getSystemPrincipal(),
-                        null, // aTriggeringPrincipal
-                        Ci.nsILoadInfo.SEC_NORMAL,
-                        Ci.nsIContentPolicy.TYPE_WEBSOCKET);
-
-    return socket;
-  },
-
-  _beginWSSetup: function() {
-    debug("beginWSSetup()");
-    if (this._currentState != STATE_SHUT_DOWN) {
-      debug("_beginWSSetup: Not in shutdown state! Current state " +
-            this._currentState);
-      return;
-    }
-
-    // Stop any pending reconnects scheduled for the near future.
-    this._stopAlarm();
-
-    let uri = this._getServerURI();
-    if (!uri) {
-      return;
-    }
-    let socket = this._makeWebSocket(uri);
-    if (!socket) {
-      return;
-    }
-    this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel);
-
-    debug("serverURL: " + uri.spec);
-    this._wsListener = new PushWebSocketListener(this);
-    this._ws.protocol = "push-notification";
-
-    try {
-      // Grab a wakelock before we open the socket to ensure we don't go to sleep
-      // before connection the is opened.
-      this._ws.asyncOpen(uri, uri.spec, this._wsListener, null);
-      this._acquireWakeLock();
-      this._currentState = STATE_WAITING_FOR_WS_START;
-    } catch(e) {
-      debug("Error opening websocket. asyncOpen failed!");
-      this._shutdownWS();
-      this._reconnectAfterBackoff();
-    }
-  },
-
-  _startListeningIfChannelsPresent: function() {
-    // Check to see if we need to do anything.
-    if (this._requestQueue.length > 0) {
-      this._beginWSSetup();
-      return;
-    }
-    this._db.getAllChannelIDs()
-      .then(channelIDs => {
-        if (channelIDs.length > 0) {
-          this._beginWSSetup();
-        }
-      });
-  },
-
   /** |delay| should be in milliseconds. */
-  _setAlarm: function(delay) {
+  setAlarm: function(delay) {
     // Bug 909270: Since calls to AlarmService.add() are async, calls must be
     // 'queued' to ensure only one alarm is ever active.
     if (this._settingAlarm) {
         // onSuccess will handle the set. Overwriting the variable enforces the
         // last-writer-wins semantics.
         this._queuedAlarmDelay = delay;
         this._waitingForAlarmSet = true;
         return;
     }
 
     // Stop any existing alarm.
-    this._stopAlarm();
+    this.stopAlarm();
 
     this._settingAlarm = true;
     AlarmService.add(
       {
         date: new Date(Date.now() + delay),
         ignoreTimezone: true
       },
-      this._onAlarmFired.bind(this),
+      this._service.onAlarmFired.bind(this._service),
       function onSuccess(alarmID) {
         this._alarmID = alarmID;
         debug("Set alarm " + delay + " in the future " + this._alarmID);
         this._settingAlarm = false;
 
         if (this._waitingForAlarmSet) {
           this._waitingForAlarmSet = false;
-          this._setAlarm(this._queuedAlarmDelay);
+          this.setAlarm(this._queuedAlarmDelay);
         }
       }.bind(this)
     )
   },
 
-  _stopAlarm: function() {
+  stopAlarm: function() {
     if (this._alarmID !== null) {
       debug("Stopped existing alarm " + this._alarmID);
       AlarmService.remove(this._alarmID);
       this._alarmID = null;
     }
   },
 
-  /**
-   * There is only one alarm active at any time. This alarm has 3 intervals
-   * corresponding to 3 tasks.
-   *
-   * 1) Reconnect on ping timeout.
-   *    If we haven't received any messages from the server by the time this
-   *    alarm fires, the connection is closed and PushService tries to
-   *    reconnect, repurposing the alarm for (3).
-   *
-   * 2) Send a ping.
-   *    The protocol sends a ping ({}) on the wire every pingInterval ms. Once
-   *    it sends the ping, the alarm goes to task (1) which is waiting for
-   *    a pong. If data is received after the ping is sent,
-   *    _wsOnMessageAvailable() will reset the ping alarm (which cancels
-   *    waiting for the pong). So as long as the connection is fine, pong alarm
-   *    never fires.
-   *
-   * 3) Reconnect after backoff.
-   *    The alarm is set by _reconnectAfterBackoff() and increases in duration
-   *    every time we try and fail to connect.  When it triggers, websocket
-   *    setup begins again. On successful socket setup, the socket starts
-   *    receiving messages. The alarm now goes to (2) where it monitors the
-   *    WebSocket by sending a ping.  Since incoming data is a sign of the
-   *    connection being up, the ping alarm is reset every time data is
-   *    received.
-   */
-  _onAlarmFired: function() {
-    // Conditions are arranged in decreasing specificity.
-    // i.e. when _waitingForPong is true, other conditions are also true.
-    if (this._waitingForPong) {
-      debug("Did not receive pong in time. Reconnecting WebSocket.");
-      this._shutdownWS();
-      this._reconnectAfterBackoff();
-    }
-    else if (this._currentState == STATE_READY) {
-      // Send a ping.
-      // Bypass the queue; we don't want this to be kept pending.
-      // Watch out for exception in case the socket has disconnected.
-      // When this happens, we pretend the ping was sent and don't specially
-      // handle the exception, as the lack of a pong will lead to the socket
-      // being reset.
-      try {
-        this._wsSendMessage({});
-      } catch (e) {
-      }
-
-      this._waitingForPong = true;
-      this._setAlarm(prefs.get("requestTimeout"));
-    }
-    else if (this._alarmID !== null) {
-      debug("reconnect alarm fired.");
-      // Reconnect after back-off.
-      // The check for a non-null _alarmID prevents a situation where the alarm
-      // fires, but _shutdownWS() is called from another code-path (e.g.
-      // network state change) and we don't want to reconnect.
-      //
-      // It also handles the case where _beginWSSetup() is called from another
-      // code-path.
-      //
-      // alarmID will be non-null only when no shutdown/connect is
-      // called between _reconnectAfterBackoff() setting the alarm and the
-      // alarm firing.
-
-      // Websocket is shut down. Backoff interval expired, try to connect.
-      this._beginWSSetup();
-    }
-  },
-
-  _acquireWakeLock: function() {
-#ifdef MOZ_B2G
-    // Disable the wake lock on non-B2G platforms to work around bug 1154492.
-    if (!this._socketWakeLock) {
-      debug("Acquiring Socket Wakelock");
-      this._socketWakeLock = gPowerManagerService.newWakeLock("cpu");
-    }
-    if (!this._socketWakeLockTimer) {
-      debug("Creating Socket WakeLock Timer");
-      this._socketWakeLockTimer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
-    }
-
-    debug("Setting Socket WakeLock Timer");
-    this._socketWakeLockTimer
-      .initWithCallback(this._releaseWakeLock.bind(this),
-                        // Allow the same time for socket setup as we do for
-                        // requests after the setup. Fudge it a bit since
-                        // timers can be a little off and we don't want to go
-                        // to sleep just as the socket connected.
-                        this._requestTimeout + 1000,
-                        Ci.nsITimer.TYPE_ONE_SHOT);
-#endif
-  },
-
-  _releaseWakeLock: function() {
-#ifdef MOZ_B2G
-    debug("Releasing Socket WakeLock");
-    if (this._socketWakeLockTimer) {
-      this._socketWakeLockTimer.cancel();
-    }
-    if (this._socketWakeLock) {
-      this._socketWakeLock.unlock();
-      this._socketWakeLock = null;
-    }
-#endif
-  },
-
-  /**
-   * Protocol handler invoked by server message.
-   */
-  _handleHelloReply: function(reply) {
-    debug("handleHelloReply()");
-    if (this._currentState != STATE_WAITING_FOR_HELLO) {
-      debug("Unexpected state " + this._currentState +
-            "(expected STATE_WAITING_FOR_HELLO)");
-      this._shutdownWS();
-      return;
-    }
-
-    if (typeof reply.uaid !== "string") {
-      debug("No UAID received or non string UAID received");
-      this._shutdownWS();
-      return;
-    }
-
-    if (reply.uaid === "") {
-      debug("Empty UAID received!");
-      this._shutdownWS();
-      return;
-    }
-
-    // To avoid sticking extra large values sent by an evil server into prefs.
-    if (reply.uaid.length > 128) {
-      debug("UAID received from server was too long: " +
-            reply.uaid);
-      this._shutdownWS();
-      return;
-    }
-
-    function finishHandshake() {
-      this._UAID = reply.uaid;
-      this._currentState = STATE_READY;
-      this._processNextRequestInQueue();
-    }
-
-    // By this point we've got a UAID from the server that we are ready to
-    // accept.
-    //
-    // If we already had a valid UAID before, we have to ask apps to
-    // re-register.
-    if (this._UAID && this._UAID != reply.uaid) {
-      debug("got new UAID: all re-register");
-
-      this._notifyAllAppsRegister()
-          .then(this._dropRegistration.bind(this))
-          .then(finishHandshake.bind(this));
-
-      return;
-    }
-
-    // otherwise we are good to go
-    finishHandshake.bind(this)();
-  },
-
-  /**
-   * Protocol handler invoked by server message.
-   */
-  _handleRegisterReply: function(reply) {
-    debug("handleRegisterReply()");
-    if (typeof reply.channelID !== "string" ||
-        typeof this._pendingRequests[reply.channelID] !== "object")
-      return;
-
-    let tmp = this._pendingRequests[reply.channelID];
-    delete this._pendingRequests[reply.channelID];
-    if (Object.keys(this._pendingRequests).length == 0 &&
-        this._requestTimeoutTimer)
-      this._requestTimeoutTimer.cancel();
-
-    if (reply.status == 200) {
-      tmp.deferred.resolve(reply);
-    } else {
-      tmp.deferred.reject(reply);
-    }
-  },
-
-  /**
-   * Protocol handler invoked by server message.
-   */
-  _handleNotificationReply: function(reply) {
-    debug("handleNotificationReply()");
-    if (typeof reply.updates !== 'object') {
-      debug("No 'updates' field in response. Type = " + typeof reply.updates);
-      return;
-    }
-
-    debug("Reply updates: " + reply.updates.length);
-    for (let i = 0; i < reply.updates.length; i++) {
-      let update = reply.updates[i];
-      debug("Update: " + update.channelID + ": " + update.version);
-      if (typeof update.channelID !== "string") {
-        debug("Invalid update literal at index " + i);
-        continue;
-      }
-
-      if (update.version === undefined) {
-        debug("update.version does not exist");
-        continue;
-      }
-
-      let version = update.version;
-
-      if (typeof version === "string") {
-        version = parseInt(version, 10);
-      }
-
-      if (typeof version === "number" && version >= 0) {
-        // FIXME(nsm): this relies on app update notification being infallible!
-        // eventually fix this
-        this._receivedUpdate(update.channelID, version);
-        this._sendAck(update.channelID, version);
-      }
-    }
-  },
-
-  // FIXME(nsm): batch acks for efficiency reasons.
-  _sendAck: function(channelID, version) {
-    debug("sendAck()");
-    this._send('ack', {
-      updates: [{channelID: channelID, version: version}]
-    });
-  },
-
-  /*
-   * Must be used only by request/response style calls over the websocket.
-   */
-  _sendRequest: function(action, data) {
-    debug("sendRequest() " + action);
-    if (typeof data.channelID !== "string") {
-      debug("Received non-string channelID");
-      return Promise.reject({error: "Received non-string channelID"});
-    }
-
-    if (Object.keys(this._pendingRequests).length == 0) {
-      // start the timer since we now have at least one request
-      if (!this._requestTimeoutTimer)
-        this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"]
-                                      .createInstance(Ci.nsITimer);
-      this._requestTimeoutTimer.init(this,
-                                     this._requestTimeout,
-                                     Ci.nsITimer.TYPE_REPEATING_SLACK);
-    }
-
-    let deferred;
-    let request = this._pendingRequests[data.channelID];
-    if (request) {
-      // If a request is already pending for this channel ID, assume it's a
-      // retry. Use the existing deferred, but update the send time and re-send
-      // the request.
-      deferred = request.deferred;
-    } else {
-      deferred = Promise.defer();
-      request = this._pendingRequests[data.channelID] = {deferred};
-    }
-    request.ctime = Date.now();
-
-    this._send(action, data);
-    return deferred.promise;
-  },
-
-  _send: function(action, data) {
-    debug("send()");
-    this._requestQueue.push([action, data]);
-    debug("Queued " + action);
-    this._processNextRequestInQueue();
-  },
-
-  _processNextRequestInQueue: function() {
-    debug("_processNextRequestInQueue()");
-
-    if (this._requestQueue.length == 0) {
-      debug("Request queue empty");
-      return;
-    }
-
-    if (this._currentState != STATE_READY) {
-      if (!this._started) {
-        // The component hasn't been initialized yet. Return early; init()
-        // will dequeue all pending requests.
-        return;
-      }
-      if (!this._ws) {
-        // This will end up calling processNextRequestInQueue().
-        this._beginWSSetup();
-      }
-      else {
-        // We have a socket open so we are just waiting for hello to finish.
-        // That will call processNextRequestInQueue().
-      }
-      return;
-    }
-
-    let [action, data] = this._requestQueue.shift();
-    data.messageType = action;
-    if (!this._ws) {
-      // If our websocket is not ready and our state is STATE_READY we may as
-      // well give up all assumptions about the world and start from scratch
-      // again.  Discard the message itself, let the timeout notify error to
-      // the app.
-      debug("This should never happen!");
-      this._shutdownWS();
-    }
-
-    this._wsSendMessage(data);
-    // Process the next one as soon as possible.
-    setTimeout(this._processNextRequestInQueue.bind(this), 0);
-  },
-
-  _receivedUpdate: function(aChannelID, aLatestVersion) {
-    debug("Updating: " + aChannelID + " -> " + aLatestVersion);
-
-    let compareRecordVersionAndNotify = function(aPushRecord) {
-      debug("compareRecordVersionAndNotify()");
-      if (!aPushRecord) {
-        debug("No record for channel ID " + aChannelID);
-        return;
-      }
-
-      if (aPushRecord.version == null ||
-          aPushRecord.version < aLatestVersion) {
-        debug("Version changed, notifying app and updating DB");
-        aPushRecord.version = aLatestVersion;
-        aPushRecord.pushCount = aPushRecord.pushCount + 1;
-        aPushRecord.lastPush = new Date().getTime();
-        this._notifyApp(aPushRecord);
-        this._updatePushRecord(aPushRecord)
-          .then(
-            null,
-            function(e) {
-              debug("Error updating push record");
-            }
-          );
-      }
-      else {
-        debug("No significant version change: " + aLatestVersion);
-      }
-    }
-
-    let recoverNoSuchChannelID = function(aChannelIDFromServer) {
-      debug("Could not get channelID " + aChannelIDFromServer + " from DB");
-    }
-
-    this._db.getByChannelID(aChannelID)
-      .then(compareRecordVersionAndNotify.bind(this),
-            err => recoverNoSuchChannelID(err));
-  },
-
   // Fires a push-register system message to all applications that have
   // registration.
-  _notifyAllAppsRegister: function() {
+  notifyAllAppsRegister: function() {
     debug("notifyAllAppsRegister()");
     // records are objects describing the registration as stored in IndexedDB.
     return this._db.getAllChannelIDs()
       .then(records => {
         let scopes = new Set();
         for (let record of records) {
           scopes.add(record.scope);
         }
@@ -1407,22 +314,22 @@ this.PushService = {
             scope
           );
 
           let data = {
             originAttributes: {}, // TODO bug 1166350
             scope: scope
           };
 
-          globalMM.broadcastAsyncMessage('pushsubscriptionchanged', data);
+          globalMM.broadcastAsyncMessage('pushsubscriptionchange', data);
         }
       });
   },
 
-  _notifyApp: function(aPushRecord) {
+  notifyApp: function(aPushRecord) {
     if (!aPushRecord || !aPushRecord.scope) {
       debug("notifyApp() something is undefined.  Dropping notification: "
         + JSON.stringify(aPushRecord) );
       return;
     }
 
     debug("notifyApp() " + aPushRecord.scope);
     let scopeURI = Services.io.newURI(aPushRecord.scope, null, null);
@@ -1454,46 +361,41 @@ this.PushService = {
       scope: aPushRecord.scope
     };
 
     let globalMM = Cc['@mozilla.org/globalmessagemanager;1']
                  .getService(Ci.nsIMessageListenerManager);
     globalMM.broadcastAsyncMessage('push', data);
   },
 
-  _updatePushRecord: function(aPushRecord) {
+  updatePushRecord: function(aPushRecord) {
     debug("updatePushRecord()");
     return this._db.put(aPushRecord);
   },
 
-  _dropRegistration: function() {
+  getByChannelID: function(aChannelID) {
+    return this._db.getByChannelID(aChannelID)
+  },
+
+  getAllChannelIDs: function() {
+    return this._db.getAllChannelIDs()
+  },
+
+  dropRegistration: function() {
     return this._db.drop();
   },
 
-  receiveMessage: function(aMessage) {
-    debug("receiveMessage(): " + aMessage.name);
-
-    if (kCHILD_PROCESS_MESSAGES.indexOf(aMessage.name) == -1) {
-      debug("Invalid message from child " + aMessage.name);
-      return;
-    }
-
-    let mm = aMessage.target.QueryInterface(Ci.nsIMessageSender);
-    let json = aMessage.data;
-    this[aMessage.name.slice("Push:".length).toLowerCase()](json, mm);
-  },
-
-  /**
+    /**
    * Called on message from the child process. aPageRecord is an object sent by
    * navigator.push, identifying the sending page and other fields.
    */
   _registerWithServer: function(channelID, aPageRecord) {
     debug("registerWithServer()");
 
-    return this._sendRequest("register", {channelID: channelID})
+    return this._service.register(channelID)
       .then(
         this._onRegisterSuccess.bind(this, aPageRecord, channelID),
         this._onRegisterError.bind(this)
       );
   },
 
   _generateID: function() {
     let uuidGenerator = Cc["@mozilla.org/uuid-generator;1"]
@@ -1513,37 +415,16 @@ this.PushService = {
       },
       error => {
         debug("getByScope failed");
         throw "Database error";
       }
     );
   },
 
-  register: function(aPageRecord, aMessageManager) {
-    debug("register(): " + JSON.stringify(aPageRecord));
-
-    this._register(aPageRecord).then(
-      function(aPageRecord, aMessageManager, pushRecord) {
-        let message = {
-          requestID: aPageRecord.requestID,
-          pushEndpoint: pushRecord.pushEndpoint
-        };
-        aMessageManager.sendAsyncMessage("PushService:Register:OK", message);
-      }.bind(this, aPageRecord, aMessageManager),
-      function(error) {
-        let message = {
-          requestID: aPageRecord.requestID,
-          error
-        };
-        aMessageManager.sendAsyncMessage("PushService:Register:KO", message);
-      }
-    );
-  },
-
   /**
    * Exceptions thrown in _onRegisterSuccess are caught by the promise obtained
    * from _sendRequest, causing the promise to be rejected instead.
    */
   _onRegisterSuccess: function(aPageRecord, generatedChannelID, data) {
     debug("_onRegisterSuccess()");
 
     if (typeof data.channelID !== "string") {
@@ -1571,24 +452,24 @@ this.PushService = {
       scope: aPageRecord.scope,
       pushCount: 0,
       lastPush: 0,
       version: null
     };
 
     debug("scope in _onRegisterSuccess: " + aPageRecord.scope)
 
-    return this._updatePushRecord(record)
+    return this.updatePushRecord(record)
       .then(
         function() {
           return record;
         },
         function(error) {
           // Unable to save.
-          this._send("unregister", {channelID: record.channelID});
+          this._service.unregister(record);
           throw error;
         }.bind(this)
       );
   },
 
   /**
    * Exceptions thrown in _onRegisterError are caught by the promise obtained
    * from _sendRequest, causing the promise to be rejected instead.
@@ -1597,16 +478,50 @@ this.PushService = {
     debug("_onRegisterError()");
     if (!reply.error) {
       debug("Called without valid error message!");
       throw "Registration error";
     }
     throw reply.error;
   },
 
+  receiveMessage: function(aMessage) {
+    debug("receiveMessage(): " + aMessage.name);
+
+    if (kCHILD_PROCESS_MESSAGES.indexOf(aMessage.name) == -1) {
+      debug("Invalid message from child " + aMessage.name);
+      return;
+    }
+
+    let mm = aMessage.target.QueryInterface(Ci.nsIMessageSender);
+    let json = aMessage.data;
+    this[aMessage.name.slice("Push:".length).toLowerCase()](json, mm);
+  },
+
+  register: function(aPageRecord, aMessageManager) {
+    debug("register(): " + JSON.stringify(aPageRecord));
+
+    this._register(aPageRecord).then(
+      function(aPageRecord, aMessageManager, pushRecord) {
+        let message = {
+          requestID: aPageRecord.requestID,
+          pushEndpoint: pushRecord.pushEndpoint
+        };
+        aMessageManager.sendAsyncMessage("PushService:Register:OK", message);
+      }.bind(this, aPageRecord, aMessageManager),
+      function(error) {
+        let message = {
+          requestID: aPageRecord.requestID,
+          error
+        };
+        aMessageManager.sendAsyncMessage("PushService:Register:KO", message);
+      }
+    );
+  },
+
   /**
    * Called on message from the child process.
    *
    * Why is the record being deleted from the local database before the server
    * is told?
    *
    * Unregistration is for the benefit of the app and the AppServer
    * so that the AppServer does not keep pinging a channel the UserAgent isn't
@@ -1639,17 +554,17 @@ this.PushService = {
         if (record === undefined) {
           throw "NotFoundError";
         }
 
         this._db.delete(record.channelID)
           .then(_ =>
             // Let's be nice to the server and try to inform it, but we don't care
             // about the reply.
-            this._send("unregister", {channelID: record.channelID})
+            this._service.unregister(record)
           );
       });
   },
 
   unregister: function(aPageRecord, aMessageManager) {
     debug("unregister() " + JSON.stringify(aPageRecord));
 
     this._unregister(aPageRecord).then(
@@ -1709,367 +624,10 @@ this.PushService = {
       },
       error => {
         aMessageManager.sendAsyncMessage("PushService:Registration:KO", {
           requestID: aPageRecord.requestID,
           error
         });
       }
     );
-  },
-
-  // begin Push protocol handshake
-  _wsOnStart: function(context) {
-    debug("wsOnStart()");
-    this._releaseWakeLock();
-
-    if (this._currentState != STATE_WAITING_FOR_WS_START) {
-      debug("NOT in STATE_WAITING_FOR_WS_START. Current state " +
-            this._currentState + ". Skipping");
-      return;
-    }
-
-    // Since we've had a successful connection reset the retry fail count.
-    this._retryFailCount = 0;
-
-    let data = {
-      messageType: "hello",
-    }
-
-    if (this._UAID)
-      data["uaid"] = this._UAID;
-
-    function sendHelloMessage(ids) {
-      // On success, ids is an array, on error its not.
-      data["channelIDs"] = ids.map ?
-                           ids.map(function(el) { return el.channelID; }) : [];
-      this._wsSendMessage(data);
-      this._currentState = STATE_WAITING_FOR_HELLO;
-    }
-
-    this._networkInfo.getNetworkState((networkState) => {
-      if (networkState.ip) {
-        // Opening an available UDP port.
-        this._listenForUDPWakeup();
-
-        // Host-port is apparently a thing.
-        data["wakeup_hostport"] = {
-          ip: networkState.ip,
-          port: this._udpServer && this._udpServer.port
-        };
-
-        data["mobilenetwork"] = {
-          mcc: networkState.mcc,
-          mnc: networkState.mnc,
-          netid: networkState.netid
-        };
-      }
-
-      this._db.getAllChannelIDs()
-        .then(sendHelloMessage.bind(this),
-              sendHelloMessage.bind(this));
-    });
-  },
-
-  /**
-   * This statusCode is not the websocket protocol status code, but the TCP
-   * connection close status code.
-   *
-   * If we do not explicitly call ws.close() then statusCode is always
-   * NS_BASE_STREAM_CLOSED, even on a successful close.
-   */
-  _wsOnStop: function(context, statusCode) {
-    debug("wsOnStop()");
-    this._releaseWakeLock();
-
-    if (statusCode != Cr.NS_OK &&
-        !(statusCode == Cr.NS_BASE_STREAM_CLOSED && this._willBeWokenUpByUDP)) {
-      debug("Socket error " + statusCode);
-      this._reconnectAfterBackoff();
-    }
-
-    // Bug 896919. We always shutdown the WebSocket, even if we need to
-    // reconnect. This works because _reconnectAfterBackoff() is "async"
-    // (there is a minimum delay of the pref retryBaseInterval, which by default
-    // is 5000ms), so that function will open the WebSocket again.
-    this._shutdownWS();
-  },
-
-  _wsOnMessageAvailable: function(context, message) {
-    debug("wsOnMessageAvailable() " + message);
-
-    this._waitingForPong = false;
-
-    let reply = undefined;
-    try {
-      reply = JSON.parse(message);
-    } catch(e) {
-      debug("Parsing JSON failed. text : " + message);
-      return;
-    }
-
-    // If we are not waiting for a hello message, reset the retry fail count
-    if (this._currentState != STATE_WAITING_FOR_HELLO) {
-      debug('Reseting _retryFailCount and _pingIntervalRetryTimes');
-      this._retryFailCount = 0;
-      this._pingIntervalRetryTimes = {};
-    }
-
-    let doNotHandle = false;
-    if ((message === '{}') ||
-        (reply.messageType === undefined) ||
-        (reply.messageType === "ping") ||
-        (typeof reply.messageType != "string")) {
-      debug('Pong received');
-      this._calculateAdaptivePing(false);
-      doNotHandle = true;
-    }
-
-    // Reset the ping timer.  Note: This path is executed at every step of the
-    // handshake, so this alarm does not need to be set explicitly at startup.
-    this._setAlarm(prefs.get("pingInterval"));
-
-    // If it is a ping, do not handle the message.
-    if (doNotHandle) {
-      return;
-    }
-
-    // A whitelist of protocol handlers. Add to these if new messages are added
-    // in the protocol.
-    let handlers = ["Hello", "Register", "Notification"];
-
-    // Build up the handler name to call from messageType.
-    // e.g. messageType == "register" -> _handleRegisterReply.
-    let handlerName = reply.messageType[0].toUpperCase() +
-                      reply.messageType.slice(1).toLowerCase();
-
-    if (handlers.indexOf(handlerName) == -1) {
-      debug("No whitelisted handler " + handlerName + ". messageType: " +
-            reply.messageType);
-      return;
-    }
-
-    let handler = "_handle" + handlerName + "Reply";
-
-    if (typeof this[handler] !== "function") {
-      debug("Handler whitelisted but not implemented! " + handler);
-      return;
-    }
-
-    this[handler](reply);
-  },
-
-  /**
-   * The websocket should never be closed. Since we don't call ws.close(),
-   * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
-   * function), which calls reconnect and re-establishes the WebSocket
-   * connection.
-   *
-   * If the server said it'll use UDP for wakeup, we set _willBeWokenUpByUDP
-   * and stop reconnecting in _wsOnStop().
-   */
-  _wsOnServerClose: function(context, aStatusCode, aReason) {
-    debug("wsOnServerClose() " + aStatusCode + " " + aReason);
-
-    // Switch over to UDP.
-    if (aStatusCode == kUDP_WAKEUP_WS_STATUS_CODE) {
-      debug("Server closed with promise to wake up");
-      this._willBeWokenUpByUDP = true;
-      // TODO: there should be no pending requests
-    }
-  },
-
-  /**
-   * Rejects all pending requests with errors.
-   */
-  _cancelPendingRequests: function() {
-    for (let channelID in this._pendingRequests) {
-      let request = this._pendingRequests[channelID];
-      delete this._pendingRequests[channelID];
-      request.deferred.reject({status: 0, error: "AbortError"});
-    }
-  },
-
-  _makeUDPSocket: function() {
-    return Cc["@mozilla.org/network/udp-socket;1"]
-             .createInstance(Ci.nsIUDPSocket);
-  },
-
-  /**
-   * This method should be called only if the device is on a mobile network!
-   */
-  _listenForUDPWakeup: function() {
-    debug("listenForUDPWakeup()");
-
-    if (this._udpServer) {
-      debug("UDP Server already running");
-      return;
-    }
-
-    if (!prefs.get("udp.wakeupEnabled")) {
-      debug("UDP support disabled");
-      return;
-    }
-
-    let socket = this._makeUDPSocket();
-    if (!socket) {
-      return;
-    }
-    this._udpServer = socket.QueryInterface(Ci.nsIUDPSocket);
-    this._udpServer.init(-1, false, Services.scriptSecurityManager.getSystemPrincipal());
-    this._udpServer.asyncListen(this);
-    debug("listenForUDPWakeup listening on " + this._udpServer.port);
-
-    return this._udpServer.port;
-  },
-
-  /**
-   * Called by UDP Server Socket. As soon as a ping is recieved via UDP,
-   * reconnect the WebSocket and get the actual data.
-   */
-  onPacketReceived: function(aServ, aMessage) {
-    debug("Recv UDP datagram on port: " + this._udpServer.port);
-    this._beginWSSetup();
-  },
-
-  /**
-   * Called by UDP Server Socket if the socket was closed for some reason.
-   *
-   * If this happens, we reconnect the WebSocket to not miss out on
-   * notifications.
-   */
-  onStopListening: function(aServ, aStatus) {
-    debug("UDP Server socket was shutdown. Status: " + aStatus);
-    this._udpServer = undefined;
-    this._beginWSSetup();
   }
 };
-
-let PushNetworkInfo = {
-  /**
-   * Returns information about MCC-MNC and the IP of the current connection.
-   */
-  getNetworkInformation: function() {
-    debug("getNetworkInformation()");
-
-    try {
-      if (!prefs.get("udp.wakeupEnabled")) {
-        debug("UDP support disabled, we do not send any carrier info");
-        throw new Error("UDP disabled");
-      }
-
-      let nm = Cc["@mozilla.org/network/manager;1"].getService(Ci.nsINetworkManager);
-      if (nm.active && nm.active.type == Ci.nsINetworkInterface.NETWORK_TYPE_MOBILE) {
-        let iccService = Cc["@mozilla.org/icc/iccservice;1"].getService(Ci.nsIIccService);
-        // TODO: Bug 927721 - PushService for multi-sim
-        // In Multi-sim, there is more than one client in iccService. Each
-        // client represents a icc handle. To maintain backward compatibility
-        // with single sim, we always use client 0 for now. Adding support
-        // for multiple sim will be addressed in bug 927721, if needed.
-        let clientId = 0;
-        let icc = iccService.getIccByServiceId(clientId);
-        let iccInfo = icc && icc.iccInfo;
-        if (iccInfo) {
-          debug("Running on mobile data");
-
-          let ips = {};
-          let prefixLengths = {};
-          nm.active.getAddresses(ips, prefixLengths);
-
-          return {
-            mcc: iccInfo.mcc,
-            mnc: iccInfo.mnc,
-            ip:  ips.value[0]
-          }
-        }
-      }
-    } catch (e) {
-      debug("Error recovering mobile network information: " + e);
-    }
-
-    debug("Running on wifi");
-    return {
-      mcc: 0,
-      mnc: 0,
-      ip: undefined
-    };
-  },
-
-  /**
-   * Get mobile network information to decide if the client is capable of being
-   * woken up by UDP (which currently just means having an mcc and mnc along
-   * with an IP, and optionally a netid).
-   */
-  getNetworkState: function(callback) {
-    debug("getNetworkState()");
-
-    if (typeof callback !== 'function') {
-      throw new Error("No callback method. Aborting push agent !");
-    }
-
-    var networkInfo = this.getNetworkInformation();
-
-    if (networkInfo.ip) {
-      this._getMobileNetworkId(networkInfo, function(netid) {
-        debug("Recovered netID = " + netid);
-        callback({
-          mcc: networkInfo.mcc,
-          mnc: networkInfo.mnc,
-          ip:  networkInfo.ip,
-          netid: netid
-        });
-      });
-    } else {
-      callback(networkInfo);
-    }
-  },
-
-  // utility function used to add/remove observers in init() and shutdown()
-  getNetworkStateChangeEventName: function() {
-    try {
-      Cc["@mozilla.org/network/manager;1"].getService(Ci.nsINetworkManager);
-      return "network-active-changed";
-    } catch (e) {
-      return "network:offline-status-changed";
-    }
-  },
-
-  /*
-   * Get the mobile network ID (netid)
-   *
-   * @param networkInfo
-   *        Network information object { mcc, mnc, ip, port }
-   * @param callback
-   *        Callback function to invoke with the netid or null if not found
-   */
-  _getMobileNetworkId: function(networkInfo, callback) {
-    if (typeof callback !== 'function') {
-      return;
-    }
-
-    function queryDNSForDomain(domain) {
-      debug("[_getMobileNetworkId:queryDNSForDomain] Querying DNS for " +
-        domain);
-      let netIDDNSListener = {
-        onLookupComplete: function(aRequest, aRecord, aStatus) {
-          if (aRecord) {
-            let netid = aRecord.getNextAddrAsString();
-            debug("[_getMobileNetworkId:queryDNSForDomain] NetID found: " +
-              netid);
-            callback(netid);
-          } else {
-            debug("[_getMobileNetworkId:queryDNSForDomain] NetID not found");
-            callback(null);
-          }
-        }
-      };
-      gDNSService.asyncResolve(domain, 0, netIDDNSListener,
-        threadManager.currentThread);
-      return [];
-    }
-
-    debug("[_getMobileNetworkId:queryDNSForDomain] Getting mobile network ID");
-
-    let netidAddress = "wakeup.mnc" + ("00" + networkInfo.mnc).slice(-3) +
-      ".mcc" + ("00" + networkInfo.mcc).slice(-3) + ".3gppnetwork.org";
-    queryDNSForDomain(netidAddress, callback);
-  }
-};
new file mode 100644
--- /dev/null
+++ b/dom/push/PushServiceWebSocket.jsm
@@ -0,0 +1,1338 @@
+/* jshint moz: true, esnext: true */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+const Cc = Components.classes;
+const Ci = Components.interfaces;
+const Cu = Components.utils;
+const Cr = Components.results;
+
+Cu.import("resource://gre/modules/Preferences.jsm");
+Cu.import("resource://gre/modules/Timer.jsm");
+Cu.import("resource://gre/modules/Promise.jsm");
+Cu.import("resource://gre/modules/XPCOMUtils.jsm");
+Cu.import("resource://gre/modules/Services.jsm");
+
+
+XPCOMUtils.defineLazyServiceGetter(this, "gDNSService",
+                                   "@mozilla.org/network/dns-service;1",
+                                   "nsIDNSService");
+
+#ifdef MOZ_B2G
+XPCOMUtils.defineLazyServiceGetter(this, "gPowerManagerService",
+                                   "@mozilla.org/power/powermanagerservice;1",
+                                   "nsIPowerManagerService");
+#endif
+
+var threadManager = Cc["@mozilla.org/thread-manager;1"].getService(Ci.nsIThreadManager);
+
+const kUDP_WAKEUP_WS_STATUS_CODE = 4774;  // WebSocket Close status code sent
+                                          // by server to signal that it can
+                                          // wake client up using UDP.
+
+const kWS_MAX_WENTDOWN = 2;
+
+// 1 minute is the least allowed ping interval
+const kWS_MIN_PING_INTERVAL = 60000;
+
+const prefs = new Preferences("dom.push.");
+
+this.EXPORTED_SYMBOLS = ["PushServiceWebSocket"];
+
+// Don't modify this, instead set dom.push.debug.
+let gDebuggingEnabled = true;
+
+function debug(s) {
+  if (gDebuggingEnabled)
+    dump("-*- PushServiceWebSocket.jsm: " + s + "\n");
+}
+
+/**
+ * A proxy between the PushService and the WebSocket. The listener is used so
+ * that the PushService can silence messages from the WebSocket by setting
+ * PushWebSocketListener._pushService to null. This is required because
+ * a WebSocket can continue to send messages or errors after it has been
+ * closed but the PushService may not be interested in these. It's easier to
+ * stop listening than to have checks at specific points.
+ */
+this.PushWebSocketListener = function(pushService) {
+  this._pushService = pushService;
+}
+
+this.PushWebSocketListener.prototype = {
+  onStart: function(context) {
+    if (!this._pushService)
+        return;
+    this._pushService._wsOnStart(context);
+  },
+
+  onStop: function(context, statusCode) {
+    if (!this._pushService)
+        return;
+    this._pushService._wsOnStop(context, statusCode);
+  },
+
+  onAcknowledge: function(context, size) {
+    // EMPTY
+  },
+
+  onBinaryMessageAvailable: function(context, message) {
+    // EMPTY
+  },
+
+  onMessageAvailable: function(context, message) {
+    if (!this._pushService)
+        return;
+    this._pushService._wsOnMessageAvailable(context, message);
+  },
+
+  onServerClose: function(context, aStatusCode, aReason) {
+    if (!this._pushService)
+        return;
+    this._pushService._wsOnServerClose(context, aStatusCode, aReason);
+  }
+};
+
+// websocket states
+// websocket is off
+const STATE_SHUT_DOWN = 0;
+// Websocket has been opened on client side, waiting for successful open.
+// (_wsOnStart)
+const STATE_WAITING_FOR_WS_START = 1;
+// Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
+const STATE_WAITING_FOR_HELLO = 2;
+// Websocket operational, handshake completed, begin protocol messaging.
+const STATE_READY = 3;
+
+this.PushServiceWebSocket = {
+  _mainPushService: null,
+
+  observeNetworkChange: function(aSubject, aTopic, aData) {
+    // In case of network-active-changed, always disconnect existing
+    // connections. In case of offline-status changing from offline to
+    // online, it is likely that these statements will be no-ops.
+    if (this._udpServer) {
+      this._udpServer.close();
+      // Set to null since this is checked in _listenForUDPWakeup()
+      this._udpServer = null;
+    }
+
+    this._shutdownWS();
+
+    // Try to connect if network-active-changed or the offline-status
+    // changed to online.
+    if (aTopic === "network-active-changed" || aData === "online") {
+      this.startListeningIfChannelsPresent();
+    }
+  },
+
+  observePushConnectionPref: function(enabled) {
+    if (enabled) {
+      this.startListeningIfChannelsPresent();
+    } else {
+      this._shutdownWS();
+    }
+  },
+
+  observeDebug: function(enabled) {
+    gDebuggingEnabled = enabled;
+  },
+  shutdownService: function() {
+    this._shutdownWS();
+  },
+
+  observeTimer: function(aSubject, aTopic, aData) {
+    if (aSubject == this._requestTimeoutTimer) {
+      if (Object.keys(this._pendingRequests).length == 0) {
+        this._requestTimeoutTimer.cancel();
+      }
+
+      // Set to true if at least one request timed out.
+      let requestTimedOut = false;
+      for (let channelID in this._pendingRequests) {
+        let duration = Date.now() - this._pendingRequests[channelID].ctime;
+        // If any of the registration requests time out, all the ones after it
+        // also made to fail, since we are going to be disconnecting the socket.
+        if (requestTimedOut || duration > this._requestTimeout) {
+          debug("Request timeout: Removing " + channelID);
+          requestTimedOut = true;
+          this._pendingRequests[channelID]
+            .deferred.reject({status: 0, error: "TimeoutError"});
+
+          delete this._pendingRequests[channelID];
+          for (let i = this._requestQueue.length - 1; i >= 0; --i) {
+            let [, data] = this._requestQueue[i];
+            if (data && data.channelID == channelID) {
+              this._requestQueue.splice(i, 1);
+            }
+          }
+        }
+      }
+
+      // The most likely reason for a registration request timing out is
+      // that the socket has disconnected. Best to reconnect.
+      if (requestTimedOut) {
+        this._shutdownWS();
+        this._reconnectAfterBackoff();
+      }
+    }
+  },
+
+  get _UAID() {
+    return prefs.get("userAgentID");
+  },
+
+  set _UAID(newID) {
+    if (typeof(newID) !== "string") {
+      debug("Got invalid, non-string UAID " + newID +
+            ". Not updating userAgentID");
+      return;
+    }
+    debug("New _UAID: " + newID);
+    prefs.set("userAgentID", newID);
+  },
+
+  // keeps requests buffered if the websocket disconnects or is not connected
+  _requestQueue: [],
+  _ws: null,
+  _pendingRequests: {},
+  _currentState: STATE_SHUT_DOWN,
+  _requestTimeout: 0,
+  _requestTimeoutTimer: null,
+  _retryFailCount: 0,
+
+  /**
+   * According to the WS spec, servers should immediately close the underlying
+   * TCP connection after they close a WebSocket. This causes wsOnStop to be
+   * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
+   * WebSocket up, it should try to reconnect. But if the server closes the
+   * WebSocket because it will wake up the client via UDP, then the client
+   * shouldn't re-establish the connection. If the server says that it will
+   * wake up the client over UDP, this is set to true in wsOnServerClose. It is
+   * checked in wsOnStop.
+   */
+  _willBeWokenUpByUDP: false,
+
+  /**
+   * Holds if the adaptive ping is enabled. This is read on init().
+   * If adaptive ping is enabled, a new ping is calculed each time we receive
+   * a pong message, trying to maximize network resources while minimizing
+   * cellular signalling storms.
+   */
+  _adaptiveEnabled: false,
+
+  /**
+   * This saves a flag about if we need to recalculate a new ping, based on:
+   *   1) the gap between the maximum working ping and the first ping that
+   *      gives an error (timeout) OR
+   *   2) we have reached the pref of the maximum value we allow for a ping
+   *      (dom.push.adaptive.upperLimit)
+   */
+  _recalculatePing: true,
+
+  /**
+   * This map holds a (pingInterval, triedTimes) of each pingInterval tried.
+   * It is used to check if the pingInterval has been tested enough to know that
+   * is incorrect and is above the limit the network allow us to keep the
+   * connection open.
+   */
+  _pingIntervalRetryTimes: {},
+
+  /**
+   * Holds the lastGoodPingInterval for our current connection.
+   */
+  _lastGoodPingInterval: 0,
+
+  /**
+   * Maximum ping interval that we can reach.
+   */
+  _upperLimit: 0,
+
+  /**
+   * Count the times WebSocket goes down without receiving Pings
+   * so we can re-enable the ping recalculation algorithm
+   */
+  _wsWentDownCounter: 0,
+
+  /**
+   * Sends a message to the Push Server through an open websocket.
+   * typeof(msg) shall be an object
+   */
+  _wsSendMessage: function(msg) {
+    if (!this._ws) {
+      debug("No WebSocket initialized. Cannot send a message.");
+      return;
+    }
+    msg = JSON.stringify(msg);
+    debug("Sending message: " + msg);
+    this._ws.sendMsg(msg);
+  },
+
+  init: function(options, mainPushService) {
+    debug("init()");
+
+    this._mainPushService = mainPushService;
+
+    // Override the default WebSocket factory function. The returned object
+    // must be null or satisfy the nsIWebSocketChannel interface. Used by
+    // the tests to provide a mock WebSocket implementation.
+    if (options.makeWebSocket) {
+      this._makeWebSocket = options.makeWebSocket;
+    }
+
+    // Override the default UDP socket factory function. The returned object
+    // must be null or satisfy the nsIUDPSocket interface. Used by the
+    // UDP tests.
+    if (options.makeUDPSocket) {
+      this._makeUDPSocket = options.makeUDPSocket;
+    }
+
+    this._networkInfo = options.networkInfo;
+    if (!this._networkInfo) {
+      this._networkInfo = PushNetworkInfo;
+    }
+
+    this._requestTimeout = prefs.get("requestTimeout");
+    this._adaptiveEnabled = prefs.get('adaptive.enabled');
+    this._upperLimit = prefs.get('adaptive.upperLimit');
+  },
+
+  _shutdownWS: function() {
+    debug("shutdownWS()");
+    this._currentState = STATE_SHUT_DOWN;
+    this._willBeWokenUpByUDP = false;
+
+    if (this._wsListener)
+      this._wsListener._pushService = null;
+    try {
+        this._ws.close(0, null);
+    } catch (e) {}
+    this._ws = null;
+
+    this._waitingForPong = false;
+    this._mainPushService.stopAlarm();
+    this._cancelPendingRequests();
+  },
+
+  uninit: function() {
+
+    if (this._udpServer) {
+      this._udpServer.close();
+      this._udpServer = null;
+    }
+
+    // All pending requests (ideally none) are dropped at this point. We
+    // shouldn't have any applications performing registration/unregistration
+    // or receiving notifications.
+    this._shutdownWS();
+
+    if (this._requestTimeoutTimer) {
+      this._requestTimeoutTimer.cancel();
+    }
+
+    this._mainPushService = null;
+  },
+
+  /**
+   * How retries work:  The goal is to ensure websocket is always up on
+   * networks not supporting UDP. So the websocket should only be shutdown if
+   * onServerClose indicates UDP wakeup.  If WS is closed due to socket error,
+   * _reconnectAfterBackoff() is called.  The retry alarm is started and when
+   * it times out, beginWSSetup() is called again.
+   *
+   * On a successful connection, the alarm is cancelled in
+   * wsOnMessageAvailable() when the ping alarm is started.
+   *
+   * If we are in the middle of a timeout (i.e. waiting), but
+   * a register/unregister is called, we don't want to wait around anymore.
+   * _sendRequest will automatically call beginWSSetup(), which will cancel the
+   * timer. In addition since the state will have changed, even if a pending
+   * timer event comes in (because the timer fired the event before it was
+   * cancelled), so the connection won't be reset.
+   */
+  _reconnectAfterBackoff: function() {
+    debug("reconnectAfterBackoff()");
+    //Calculate new ping interval
+    this._calculateAdaptivePing(true /* wsWentDown */);
+
+    // Calculate new timeout, but cap it to pingInterval.
+    let retryTimeout = prefs.get("retryBaseInterval") *
+                       Math.pow(2, this._retryFailCount);
+    retryTimeout = Math.min(retryTimeout, prefs.get("pingInterval"));
+
+    this._retryFailCount++;
+
+    debug("Retry in " + retryTimeout + " Try number " + this._retryFailCount);
+    this._mainPushService.setAlarm(retryTimeout);
+  },
+
+  /**
+   * We need to calculate a new ping based on:
+   *  1) Latest good ping
+   *  2) A safe gap between 1) and the calculated new ping (which is
+   *  by default, 1 minute)
+   *
+   * This is for 3G networks, whose connections keepalives differ broadly,
+   * for example:
+   *  1) Movistar Spain: 29 minutes
+   *  2) VIVO Brazil: 5 minutes
+   *  3) Movistar Colombia: XXX minutes
+   *
+   * So a fixed ping is not good for us for two reasons:
+   *  1) We might lose the connection, so we need to reconnect again (wasting
+   *  resources)
+   *  2) We use a lot of network signaling just for pinging.
+   *
+   * This algorithm tries to search the best value between a disconnection and a
+   * valid ping, to ensure better battery life and network resources usage.
+   *
+   * The value is saved in dom.push.pingInterval
+   * @param wsWentDown [Boolean] if the WebSocket was closed or it is still alive
+   *
+   */
+  _calculateAdaptivePing: function(wsWentDown) {
+    debug('_calculateAdaptivePing()');
+    if (!this._adaptiveEnabled) {
+      debug('Adaptive ping is disabled');
+      return;
+    }
+
+    if (this._retryFailCount > 0) {
+      debug('Push has failed to connect to the Push Server ' +
+        this._retryFailCount + ' times. ' +
+        'Do not calculate a new pingInterval now');
+      return;
+    }
+
+    if (!wsWentDown) {
+      debug('Setting websocket down counter to 0');
+      this._wsWentDownCounter = 0;
+    }
+
+    if (!this._recalculatePing && !wsWentDown) {
+      debug('We do not need to recalculate the ping now, based on previous data');
+      return;
+    }
+
+    // Save actual state of the network
+    let ns = this._networkInfo.getNetworkInformation();
+
+    if (ns.ip) {
+      // mobile
+      debug('mobile');
+      let oldNetwork = prefs.get('adaptive.mobile');
+      let newNetwork = 'mobile-' + ns.mcc + '-' + ns.mnc;
+
+      // Mobile networks differ, reset all intervals and pings
+      if (oldNetwork !== newNetwork) {
+        // Network differ, reset all values
+        debug('Mobile networks differ. Old network is ' + oldNetwork +
+              ' and new is ' + newNetwork);
+        prefs.set('adaptive.mobile', newNetwork);
+        //We reset the upper bound member
+        this._recalculatePing = true;
+        this._pingIntervalRetryTimes = {};
+
+        // Put default values
+        let defaultPing = prefs.get('pingInterval.default');
+        prefs.set('pingInterval', defaultPing);
+        this._lastGoodPingInterval = defaultPing;
+
+      } else {
+        // Mobile network is the same, let's just update things
+        prefs.set('pingInterval', prefs.get('pingInterval.mobile'));
+        this._lastGoodPingInterval = prefs.get('adaptive.lastGoodPingInterval.mobile');
+      }
+
+    } else {
+      // wifi
+      debug('wifi');
+      prefs.set('pingInterval', prefs.get('pingInterval.wifi'));
+      this._lastGoodPingInterval = prefs.get('adaptive.lastGoodPingInterval.wifi');
+    }
+
+    let nextPingInterval;
+    let lastTriedPingInterval = prefs.get('pingInterval');
+
+    if (!this._recalculatePing && wsWentDown) {
+      debug('Websocket disconnected without ping adaptative algorithm running');
+      this._wsWentDownCounter++;
+      if (this._wsWentDownCounter > kWS_MAX_WENTDOWN) {
+        debug('Too many disconnects. Reenabling ping adaptative algoritm');
+        this._wsWentDownCounter = 0;
+        this._recalculatePing = true;
+        this._lastGoodPingInterval = Math.floor(lastTriedPingInterval / 2);
+        if (this._lastGoodPingInterval < kWS_MIN_PING_INTERVAL) {
+          nextPingInterval = kWS_MIN_PING_INTERVAL;
+        } else {
+          nextPingInterval = this._lastGoodPingInterval;
+        }
+        prefs.set('pingInterval', nextPingInterval);
+        this._save(ns, nextPingInterval);
+        return;
+      }
+
+      debug('We do not need to recalculate the ping, based on previous data');
+    }
+
+    if (wsWentDown) {
+      debug('The WebSocket was disconnected, calculating next ping');
+
+      // If we have not tried this pingInterval yet, initialize
+      this._pingIntervalRetryTimes[lastTriedPingInterval] =
+           (this._pingIntervalRetryTimes[lastTriedPingInterval] || 0) + 1;
+
+       // Try the pingInterval at least 3 times, just to be sure that the
+       // calculated interval is not valid.
+       if (this._pingIntervalRetryTimes[lastTriedPingInterval] < 2) {
+         debug('pingInterval= ' + lastTriedPingInterval + ' tried only ' +
+           this._pingIntervalRetryTimes[lastTriedPingInterval] + ' times');
+         return;
+       }
+
+       // Latest ping was invalid, we need to lower the limit to limit / 2
+       nextPingInterval = Math.floor(lastTriedPingInterval / 2);
+
+      // If the new ping interval is close to the last good one, we are near
+      // optimum, so stop calculating.
+      if (nextPingInterval - this._lastGoodPingInterval < prefs.get('adaptive.gap')) {
+        debug('We have reached the gap, we have finished the calculation');
+        debug('nextPingInterval=' + nextPingInterval);
+        debug('lastGoodPing=' + this._lastGoodPingInterval);
+        nextPingInterval = this._lastGoodPingInterval;
+        this._recalculatePing = false;
+      } else {
+        debug('We need to calculate next time');
+        this._recalculatePing = true;
+      }
+
+    } else {
+      debug('The WebSocket is still up');
+      this._lastGoodPingInterval = lastTriedPingInterval;
+      nextPingInterval = Math.floor(lastTriedPingInterval * 1.5);
+    }
+
+    // Check if we have reached the upper limit
+    if (this._upperLimit < nextPingInterval) {
+      debug('Next ping will be bigger than the configured upper limit, capping interval');
+      this._recalculatePing = false;
+      this._lastGoodPingInterval = lastTriedPingInterval;
+      nextPingInterval = lastTriedPingInterval;
+    }
+
+    debug('Setting the pingInterval to ' + nextPingInterval);
+    prefs.set('pingInterval', nextPingInterval);
+
+    this._save(ns, nextPingInterval);
+  },
+
+  _save: function(ns, nextPingInterval){
+    //Save values for our current network
+    if (ns.ip) {
+      prefs.set('pingInterval.mobile', nextPingInterval);
+      prefs.set('adaptive.lastGoodPingInterval.mobile', this._lastGoodPingInterval);
+    } else {
+      prefs.set('pingInterval.wifi', nextPingInterval);
+      prefs.set('adaptive.lastGoodPingInterval.wifi', this._lastGoodPingInterval);
+    }
+  },
+
+  _makeWebSocket: function(uri) {
+    if (!prefs.get("connection.enabled")) {
+      debug("_makeWebSocket: connection.enabled is not set to true. Aborting.");
+      return null;
+    }
+    if (Services.io.offline) {
+      debug("Network is offline.");
+      return null;
+    }
+    let socket;
+    if (uri.scheme === "wss") {
+      socket = Cc["@mozilla.org/network/protocol;1?name=wss"]
+                 .createInstance(Ci.nsIWebSocketChannel);
+    }
+    else if (uri.scheme === "ws") {
+      if (uri.host != "localhost") {
+        debug("Push over an insecure connection (ws://) is not allowed!");
+        return null;
+      }
+      socket = Cc["@mozilla.org/network/protocol;1?name=ws"]
+                 .createInstance(Ci.nsIWebSocketChannel);
+    }
+    else {
+      debug("Unsupported websocket scheme " + uri.scheme);
+      return null;
+    }
+    socket.initLoadInfo(null, // aLoadingNode
+                        Services.scriptSecurityManager.getSystemPrincipal(),
+                        null, // aTriggeringPrincipal
+                        Ci.nsILoadInfo.SEC_NORMAL,
+                        Ci.nsIContentPolicy.TYPE_WEBSOCKET);
+
+    return socket;
+  },
+
+  _beginWSSetup: function() {
+    debug("beginWSSetup()");
+    if (this._currentState != STATE_SHUT_DOWN) {
+      debug("_beginWSSetup: Not in shutdown state! Current state " +
+            this._currentState);
+      return;
+    }
+
+    // Stop any pending reconnects scheduled for the near future.
+    this._mainPushService.stopAlarm();
+
+    let uri = this._mainPushService.getServerURI();
+    if (!uri) {
+      return;
+    }
+    let socket = this._makeWebSocket(uri);
+    if (!socket) {
+      return;
+    }
+    this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel);
+
+    debug("serverURL: " + uri.spec);
+    this._wsListener = new PushWebSocketListener(this);
+    this._ws.protocol = "push-notification";
+
+    try {
+      // Grab a wakelock before we open the socket to ensure we don't go to sleep
+      // before connection the is opened.
+      this._ws.asyncOpen(uri, uri.spec, this._wsListener, null);
+      this._acquireWakeLock();
+      this._currentState = STATE_WAITING_FOR_WS_START;
+    } catch(e) {
+      debug("Error opening websocket. asyncOpen failed!");
+      this._shutdownWS();
+      this._reconnectAfterBackoff();
+    }
+  },
+
+  startListeningIfChannelsPresent: function() {
+    // Check to see if we need to do anything.
+    if (this._requestQueue.length > 0) {
+      this._beginWSSetup();
+      return;
+    }
+    this._mainPushService.getAllChannelIDs()
+      .then(channelIDs => {
+        if (channelIDs.length > 0) {
+          this._beginWSSetup();
+        }
+      });
+  },
+
+  /**
+   * There is only one alarm active at any time. This alarm has 3 intervals
+   * corresponding to 3 tasks.
+   *
+   * 1) Reconnect on ping timeout.
+   *    If we haven't received any messages from the server by the time this
+   *    alarm fires, the connection is closed and PushService tries to
+   *    reconnect, repurposing the alarm for (3).
+   *
+   * 2) Send a ping.
+   *    The protocol sends a ping ({}) on the wire every pingInterval ms. Once
+   *    it sends the ping, the alarm goes to task (1) which is waiting for
+   *    a pong. If data is received after the ping is sent,
+   *    _wsOnMessageAvailable() will reset the ping alarm (which cancels
+   *    waiting for the pong). So as long as the connection is fine, pong alarm
+   *    never fires.
+   *
+   * 3) Reconnect after backoff.
+   *    The alarm is set by _reconnectAfterBackoff() and increases in duration
+   *    every time we try and fail to connect.  When it triggers, websocket
+   *    setup begins again. On successful socket setup, the socket starts
+   *    receiving messages. The alarm now goes to (2) where it monitors the
+   *    WebSocket by sending a ping.  Since incoming data is a sign of the
+   *    connection being up, the ping alarm is reset every time data is
+   *    received.
+   */
+  onAlarmFired: function() {
+    // Conditions are arranged in decreasing specificity.
+    // i.e. when _waitingForPong is true, other conditions are also true.
+    if (this._waitingForPong) {
+      debug("Did not receive pong in time. Reconnecting WebSocket.");
+      this._shutdownWS();
+      this._reconnectAfterBackoff();
+    }
+    else if (this._currentState == STATE_READY) {
+      // Send a ping.
+      // Bypass the queue; we don't want this to be kept pending.
+      // Watch out for exception in case the socket has disconnected.
+      // When this happens, we pretend the ping was sent and don't specially
+      // handle the exception, as the lack of a pong will lead to the socket
+      // being reset.
+      try {
+        this._wsSendMessage({});
+      } catch (e) {
+      }
+
+      this._waitingForPong = true;
+      this._mainPushService.setAlarm(prefs.get("requestTimeout"));
+    }
+    else if (this._alarmID !== null) {
+      debug("reconnect alarm fired.");
+      // Reconnect after back-off.
+      // The check for a non-null _alarmID prevents a situation where the alarm
+      // fires, but _shutdownWS() is called from another code-path (e.g.
+      // network state change) and we don't want to reconnect.
+      //
+      // It also handles the case where _beginWSSetup() is called from another
+      // code-path.
+      //
+      // alarmID will be non-null only when no shutdown/connect is
+      // called between _reconnectAfterBackoff() setting the alarm and the
+      // alarm firing.
+
+      // Websocket is shut down. Backoff interval expired, try to connect.
+      this._beginWSSetup();
+    }
+  },
+
+  _acquireWakeLock: function() {
+#ifdef MOZ_B2G
+    // Disable the wake lock on non-B2G platforms to work around bug 1154492.
+    if (!this._socketWakeLock) {
+      debug("Acquiring Socket Wakelock");
+      this._socketWakeLock = gPowerManagerService.newWakeLock("cpu");
+    }
+    if (!this._socketWakeLockTimer) {
+      debug("Creating Socket WakeLock Timer");
+      this._socketWakeLockTimer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
+    }
+
+    debug("Setting Socket WakeLock Timer");
+    this._socketWakeLockTimer
+      .initWithCallback(this._releaseWakeLock.bind(this),
+                        // Allow the same time for socket setup as we do for
+                        // requests after the setup. Fudge it a bit since
+                        // timers can be a little off and we don't want to go
+                        // to sleep just as the socket connected.
+                        this._requestTimeout + 1000,
+                        Ci.nsITimer.TYPE_ONE_SHOT);
+#endif
+  },
+
+  _releaseWakeLock: function() {
+#ifdef MOZ_B2G
+    debug("Releasing Socket WakeLock");
+    if (this._socketWakeLockTimer) {
+      this._socketWakeLockTimer.cancel();
+    }
+    if (this._socketWakeLock) {
+      this._socketWakeLock.unlock();
+      this._socketWakeLock = null;
+    }
+#endif
+  },
+
+  /**
+   * Protocol handler invoked by server message.
+   */
+  _handleHelloReply: function(reply) {
+    debug("handleHelloReply()");
+    if (this._currentState != STATE_WAITING_FOR_HELLO) {
+      debug("Unexpected state " + this._currentState +
+            "(expected STATE_WAITING_FOR_HELLO)");
+      this._shutdownWS();
+      return;
+    }
+
+    if (typeof reply.uaid !== "string") {
+      debug("No UAID received or non string UAID received");
+      this._shutdownWS();
+      return;
+    }
+
+    if (reply.uaid === "") {
+      debug("Empty UAID received!");
+      this._shutdownWS();
+      return;
+    }
+
+    // To avoid sticking extra large values sent by an evil server into prefs.
+    if (reply.uaid.length > 128) {
+      debug("UAID received from server was too long: " +
+            reply.uaid);
+      this._shutdownWS();
+      return;
+    }
+
+    function finishHandshake() {
+      this._UAID = reply.uaid;
+      this._currentState = STATE_READY;
+      this._processNextRequestInQueue();
+    }
+
+    // By this point we've got a UAID from the server that we are ready to
+    // accept.
+    //
+    // If we already had a valid UAID before, we have to ask apps to
+    // re-register.
+    if (this._UAID && this._UAID != reply.uaid) {
+      debug("got new UAID: all re-register");
+
+      this._mainPushService.notifyAllAppsRegister()
+          .then(_ => this._mainPushService.dropRegistrations())
+          .then(finishHandshake.bind(this));
+
+      return;
+    }
+
+    // otherwise we are good to go
+    finishHandshake.bind(this)();
+  },
+
+  /**
+   * Protocol handler invoked by server message.
+   */
+  _handleRegisterReply: function(reply) {
+    debug("handleRegisterReply()");
+    if (typeof reply.channelID !== "string" ||
+        typeof this._pendingRequests[reply.channelID] !== "object")
+      return;
+
+    let tmp = this._pendingRequests[reply.channelID];
+    delete this._pendingRequests[reply.channelID];
+    if (Object.keys(this._pendingRequests).length == 0 &&
+        this._requestTimeoutTimer)
+      this._requestTimeoutTimer.cancel();
+
+    if (reply.status == 200) {
+      tmp.deferred.resolve(reply);
+    } else {
+      tmp.deferred.reject(reply);
+    }
+  },
+
+  /**
+   * Protocol handler invoked by server message.
+   */
+  _handleNotificationReply: function(reply) {
+    debug("handleNotificationReply()");
+    if (typeof reply.updates !== 'object') {
+      debug("No 'updates' field in response. Type = " + typeof reply.updates);
+      return;
+    }
+
+    debug("Reply updates: " + reply.updates.length);
+    for (let i = 0; i < reply.updates.length; i++) {
+      let update = reply.updates[i];
+      debug("Update: " + update.channelID + ": " + update.version);
+      if (typeof update.channelID !== "string") {
+        debug("Invalid update literal at index " + i);
+        continue;
+      }
+
+      if (update.version === undefined) {
+        debug("update.version does not exist");
+        continue;
+      }
+
+      let version = update.version;
+
+      if (typeof version === "string") {
+        version = parseInt(version, 10);
+      }
+
+      if (typeof version === "number" && version >= 0) {
+        // FIXME(nsm): this relies on app update notification being infallible!
+        // eventually fix this
+        this._receivedUpdate(update.channelID, version);
+        this._sendAck(update.channelID, version);
+      }
+    }
+  },
+
+  // FIXME(nsm): batch acks for efficiency reasons.
+  _sendAck: function(channelID, version) {
+    debug("sendAck()");
+    this._send('ack', {
+      updates: [{channelID: channelID, version: version}]
+    });
+  },
+
+  /*
+   * Must be used only by request/response style calls over the websocket.
+   */
+  _sendRequest: function(action, data) {
+    debug("sendRequest() " + action);
+    if (typeof data.channelID !== "string") {
+      debug("Received non-string channelID");
+      return Promise.reject({error: "Received non-string channelID"});
+    }
+
+    if (Object.keys(this._pendingRequests).length == 0) {
+      // start the timer since we now have at least one request
+      if (!this._requestTimeoutTimer)
+        this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"]
+                                      .createInstance(Ci.nsITimer);
+      this._requestTimeoutTimer.init(this,
+                                     this._requestTimeout,
+                                     Ci.nsITimer.TYPE_REPEATING_SLACK);
+    }
+
+    let deferred;
+    let request = this._pendingRequests[data.channelID];
+    if (request) {
+      // If a request is already pending for this channel ID, assume it's a
+      // retry. Use the existing deferred, but update the send time and re-send
+      // the request.
+      deferred = request.deferred;
+    } else {
+      deferred = Promise.defer();
+      request = this._pendingRequests[data.channelID] = {deferred};
+    }
+    request.ctime = Date.now();
+    this._send(action, data);
+    return deferred.promise;
+  },
+
+  _send: function(action, data) {
+    debug("send()");
+    this._requestQueue.push([action, data]);
+    debug("Queued " + action);
+    this._processNextRequestInQueue();
+  },
+
+  _processNextRequestInQueue: function() {
+    debug("_processNextRequestInQueue()");
+
+    if (this._requestQueue.length == 0) {
+      debug("Request queue empty");
+      return;
+    }
+
+    if (this._currentState != STATE_READY) {
+      if (!this._ws) {
+        // This will end up calling processNextRequestInQueue().
+        this._beginWSSetup();
+      }
+      else {
+        // We have a socket open so we are just waiting for hello to finish.
+        // That will call processNextRequestInQueue().
+      }
+      return;
+    }
+
+    let [action, data] = this._requestQueue.shift();
+    data.messageType = action;
+    if (!this._ws) {
+      // If our websocket is not ready and our state is STATE_READY we may as
+      // well give up all assumptions about the world and start from scratch
+      // again.  Discard the message itself, let the timeout notify error to
+      // the app.
+      debug("This should never happen!");
+      this._shutdownWS();
+    }
+
+    this._wsSendMessage(data);
+    // Process the next one as soon as possible.
+    setTimeout(this._processNextRequestInQueue.bind(this), 0);
+  },
+
+  _receivedUpdate: function(aChannelID, aLatestVersion) {
+    debug("Updating: " + aChannelID + " -> " + aLatestVersion);
+
+    let compareRecordVersionAndNotify = function(aPushRecord) {
+      debug("compareRecordVersionAndNotify()");
+      if (!aPushRecord) {
+        debug("No record for channel ID " + aChannelID);
+        return;
+      }
+
+      if (aPushRecord.version == null ||
+          aPushRecord.version < aLatestVersion) {
+        debug("Version changed, notifying app and updating DB");
+        aPushRecord.version = aLatestVersion;
+        aPushRecord.pushCount = aPushRecord.pushCount + 1;
+        aPushRecord.lastPush = new Date().getTime();
+        this._mainPushService.notifyApp(aPushRecord);
+        this._mainPushService.updatePushRecord(aPushRecord)
+          .then(
+            null,
+            function(e) {
+              debug("Error updating push record");
+            }
+          );
+      }
+      else {
+        debug("No significant version change: " + aLatestVersion);
+      }
+    }
+
+    let recoverNoSuchChannelID = function(aChannelIDFromServer) {
+      debug("Could not get channelID " + aChannelIDFromServer + " from DB");
+    }
+
+    this._mainPushService.getByChannelID(aChannelID)
+      .then(compareRecordVersionAndNotify.bind(this),
+            err => recoverNoSuchChannelID(err));
+  },
+
+  register: function(channelID) {
+    return this._sendRequest("register", {channelID: channelID})
+  },
+
+  unregister: function(aRecord) {
+    // courtesy, but don't establish a connection
+    // just for it
+    if (this._ws) {
+      debug("Had a connection, so telling the server");
+      this._send("unregister", {channelID: aRecord.channelID});
+    }
+  },
+
+  // begin Push protocol handshake
+  _wsOnStart: function(context) {
+    debug("wsOnStart()");
+    this._releaseWakeLock();
+
+    if (this._currentState != STATE_WAITING_FOR_WS_START) {
+      debug("NOT in STATE_WAITING_FOR_WS_START. Current state " +
+            this._currentState + ". Skipping");
+      return;
+    }
+
+    // Since we've had a successful connection reset the retry fail count.
+    this._retryFailCount = 0;
+
+    let data = {
+      messageType: "hello",
+    }
+
+    if (this._UAID)
+      data["uaid"] = this._UAID;
+
+    function sendHelloMessage(ids) {
+      // On success, ids is an array, on error its not.
+      data["channelIDs"] = ids.map ?
+                           ids.map(function(el) { return el.channelID; }) : [];
+      this._wsSendMessage(data);
+      this._currentState = STATE_WAITING_FOR_HELLO;
+    }
+
+    this._networkInfo.getNetworkState((networkState) => {
+      if (networkState.ip) {
+        // Opening an available UDP port.
+        this._listenForUDPWakeup();
+
+        // Host-port is apparently a thing.
+        data["wakeup_hostport"] = {
+          ip: networkState.ip,
+          port: this._udpServer && this._udpServer.port
+        };
+
+        data["mobilenetwork"] = {
+          mcc: networkState.mcc,
+          mnc: networkState.mnc,
+          netid: networkState.netid
+        };
+      }
+
+      this._mainPushService.getAllChannelIDs()
+        .then(sendHelloMessage.bind(this),
+              sendHelloMessage.bind(this));
+    });
+  },
+
+  /**
+   * This statusCode is not the websocket protocol status code, but the TCP
+   * connection close status code.
+   *
+   * If we do not explicitly call ws.close() then statusCode is always
+   * NS_BASE_STREAM_CLOSED, even on a successful close.
+   */
+  _wsOnStop: function(context, statusCode) {
+    debug("wsOnStop()");
+    this._releaseWakeLock();
+
+    if (statusCode != Cr.NS_OK &&
+        !(statusCode == Cr.NS_BASE_STREAM_CLOSED && this._willBeWokenUpByUDP)) {
+      debug("Socket error " + statusCode);
+      this._reconnectAfterBackoff();
+    }
+
+    // Bug 896919. We always shutdown the WebSocket, even if we need to
+    // reconnect. This works because _reconnectAfterBackoff() is "async"
+    // (there is a minimum delay of the pref retryBaseInterval, which by default
+    // is 5000ms), so that function will open the WebSocket again.
+    this._shutdownWS();
+  },
+
+  _wsOnMessageAvailable: function(context, message) {
+    debug("wsOnMessageAvailable() " + message);
+
+    this._waitingForPong = false;
+
+    let reply = undefined;
+    try {
+      reply = JSON.parse(message);
+    } catch(e) {
+      debug("Parsing JSON failed. text : " + message);
+      return;
+    }
+
+    // If we are not waiting for a hello message, reset the retry fail count
+    if (this._currentState != STATE_WAITING_FOR_HELLO) {
+      debug('Reseting _retryFailCount and _pingIntervalRetryTimes');
+      this._retryFailCount = 0;
+      this._pingIntervalRetryTimes = {};
+    }
+
+    let doNotHandle = false;
+    if ((message === '{}') ||
+        (reply.messageType === undefined) ||
+        (reply.messageType === "ping") ||
+        (typeof reply.messageType != "string")) {
+      debug('Pong received');
+      this._calculateAdaptivePing(false);
+      doNotHandle = true;
+    }
+
+    // Reset the ping timer.  Note: This path is executed at every step of the
+    // handshake, so this alarm does not need to be set explicitly at startup.
+    this._mainPushService.setAlarm(prefs.get("pingInterval"));
+
+    // If it is a ping, do not handle the message.
+    if (doNotHandle) {
+      return;
+    }
+
+    // A whitelist of protocol handlers. Add to these if new messages are added
+    // in the protocol.
+    let handlers = ["Hello", "Register", "Notification"];
+
+    // Build up the handler name to call from messageType.
+    // e.g. messageType == "register" -> _handleRegisterReply.
+    let handlerName = reply.messageType[0].toUpperCase() +
+                      reply.messageType.slice(1).toLowerCase();
+
+    if (handlers.indexOf(handlerName) == -1) {
+      debug("No whitelisted handler " + handlerName + ". messageType: " +
+            reply.messageType);
+      return;
+    }
+
+    let handler = "_handle" + handlerName + "Reply";
+
+    if (typeof this[handler] !== "function") {
+      debug("Handler whitelisted but not implemented! " + handler);
+      return;
+    }
+
+    this[handler](reply);
+  },
+
+  /**
+   * The websocket should never be closed. Since we don't call ws.close(),
+   * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
+   * function), which calls reconnect and re-establishes the WebSocket
+   * connection.
+   *
+   * If the server said it'll use UDP for wakeup, we set _willBeWokenUpByUDP
+   * and stop reconnecting in _wsOnStop().
+   */
+  _wsOnServerClose: function(context, aStatusCode, aReason) {
+    debug("wsOnServerClose() " + aStatusCode + " " + aReason);
+
+    // Switch over to UDP.
+    if (aStatusCode == kUDP_WAKEUP_WS_STATUS_CODE) {
+      debug("Server closed with promise to wake up");
+      this._willBeWokenUpByUDP = true;
+      // TODO: there should be no pending requests
+    }
+  },
+
+  /**
+   * Rejects all pending requests with errors.
+   */
+  _cancelPendingRequests: function() {
+    for (let channelID in this._pendingRequests) {
+      let request = this._pendingRequests[channelID];
+      delete this._pendingRequests[channelID];
+      request.deferred.reject({status: 0, error: "AbortError"});
+    }
+  },
+
+  _makeUDPSocket: function() {
+    return Cc["@mozilla.org/network/udp-socket;1"]
+             .createInstance(Ci.nsIUDPSocket);
+  },
+
+  /**
+   * This method should be called only if the device is on a mobile network!
+   */
+  _listenForUDPWakeup: function() {
+    debug("listenForUDPWakeup()");
+
+    if (this._udpServer) {
+      debug("UDP Server already running");
+      return;
+    }
+
+    if (!prefs.get("udp.wakeupEnabled")) {
+      debug("UDP support disabled");
+      return;
+    }
+
+    let socket = this._makeUDPSocket();
+    if (!socket) {
+      return;
+    }
+    this._udpServer = socket.QueryInterface(Ci.nsIUDPSocket);
+    this._udpServer.init(-1, false, Services.scriptSecurityManager.getSystemPrincipal());
+    this._udpServer.asyncListen(this);
+    debug("listenForUDPWakeup listening on " + this._udpServer.port);
+
+    return this._udpServer.port;
+  },
+
+  /**
+   * Called by UDP Server Socket. As soon as a ping is recieved via UDP,
+   * reconnect the WebSocket and get the actual data.
+   */
+  onPacketReceived: function(aServ, aMessage) {
+    debug("Recv UDP datagram on port: " + this._udpServer.port);
+    this._beginWSSetup();
+  },
+
+  /**
+   * Called by UDP Server Socket if the socket was closed for some reason.
+   *
+   * If this happens, we reconnect the WebSocket to not miss out on
+   * notifications.
+   */
+  onStopListening: function(aServ, aStatus) {
+    debug("UDP Server socket was shutdown. Status: " + aStatus);
+    this._udpServer = undefined;
+    this._beginWSSetup();
+  }
+};
+
+let PushNetworkInfo = {
+  /**
+   * Returns information about MCC-MNC and the IP of the current connection.
+   */
+  getNetworkInformation: function() {
+    debug("getNetworkInformation()");
+
+    try {
+      if (!prefs.get("udp.wakeupEnabled")) {
+        debug("UDP support disabled, we do not send any carrier info");
+        throw new Error("UDP disabled");
+      }
+
+      let nm = Cc["@mozilla.org/network/manager;1"].getService(Ci.nsINetworkManager);
+      if (nm.active && nm.active.type == Ci.nsINetworkInterface.NETWORK_TYPE_MOBILE) {
+        let iccService = Cc["@mozilla.org/icc/iccservice;1"].getService(Ci.nsIIccService);
+        // TODO: Bug 927721 - PushService for multi-sim
+        // In Multi-sim, there is more than one client in iccService. Each
+        // client represents a icc handle. To maintain backward compatibility
+        // with single sim, we always use client 0 for now. Adding support
+        // for multiple sim will be addressed in bug 927721, if needed.
+        let clientId = 0;
+        let icc = iccService.getIccByServiceId(clientId);
+        let iccInfo = icc && icc.iccInfo;
+        if (iccInfo) {
+          debug("Running on mobile data");
+
+          let ips = {};
+          let prefixLengths = {};
+          nm.active.getAddresses(ips, prefixLengths);
+
+          return {
+            mcc: iccInfo.mcc,
+            mnc: iccInfo.mnc,
+            ip:  ips.value[0]
+          }
+        }
+      }
+    } catch (e) {
+      debug("Error recovering mobile network information: " + e);
+    }
+
+    debug("Running on wifi");
+    return {
+      mcc: 0,
+      mnc: 0,
+      ip: undefined
+    };
+  },
+
+  /**
+   * Get mobile network information to decide if the client is capable of being
+   * woken up by UDP (which currently just means having an mcc and mnc along
+   * with an IP, and optionally a netid).
+   */
+  getNetworkState: function(callback) {
+    debug("getNetworkState()");
+
+    if (typeof callback !== 'function') {
+      throw new Error("No callback method. Aborting push agent !");
+    }
+
+    var networkInfo = this.getNetworkInformation();
+
+    if (networkInfo.ip) {
+      this._getMobileNetworkId(networkInfo, function(netid) {
+        debug("Recovered netID = " + netid);
+        callback({
+          mcc: networkInfo.mcc,
+          mnc: networkInfo.mnc,
+          ip:  networkInfo.ip,
+          netid: netid
+        });
+      });
+    } else {
+      callback(networkInfo);
+    }
+  },
+
+  /*
+   * Get the mobile network ID (netid)
+   *
+   * @param networkInfo
+   *        Network information object { mcc, mnc, ip, port }
+   * @param callback
+   *        Callback function to invoke with the netid or null if not found
+   */
+  _getMobileNetworkId: function(networkInfo, callback) {
+    if (typeof callback !== 'function') {
+      return;
+    }
+
+    function queryDNSForDomain(domain) {
+      debug("[_getMobileNetworkId:queryDNSForDomain] Querying DNS for " +
+        domain);
+      let netIDDNSListener = {
+        onLookupComplete: function(aRequest, aRecord, aStatus) {
+          if (aRecord) {
+            let netid = aRecord.getNextAddrAsString();
+            debug("[_getMobileNetworkId:queryDNSForDomain] NetID found: " +
+              netid);
+            callback(netid);
+          } else {
+            debug("[_getMobileNetworkId:queryDNSForDomain] NetID not found");
+            callback(null);
+          }
+        }
+      };
+      gDNSService.asyncResolve(domain, 0, netIDDNSListener,
+        threadManager.currentThread);
+      return [];
+    }
+
+    debug("[_getMobileNetworkId:queryDNSForDomain] Getting mobile network ID");
+
+    let netidAddress = "wakeup.mnc" + ("00" + networkInfo.mnc).slice(-3) +
+      ".mcc" + ("00" + networkInfo.mcc).slice(-3) + ".3gppnetwork.org";
+    queryDNSForDomain(netidAddress, callback);
+  }
+};
--- a/dom/push/moz.build
+++ b/dom/push/moz.build
@@ -5,17 +5,19 @@
 
 EXTRA_COMPONENTS += [
     'Push.js',
     'Push.manifest',
     'PushNotificationService.js',
 ]
 
 EXTRA_PP_JS_MODULES += [
+    'PushDB.jsm',
     'PushService.jsm',
+    'PushServiceWebSocket.jsm',
 ]
 
 MOCHITEST_MANIFESTS += [
     'test/mochitest.ini',
 ]
 
 XPCSHELL_TESTS_MANIFESTS += [
     'test/xpcshell/xpcshell.ini',