Bug 1440022: initial proof-of-concept r?kitcambridge draft
authorEthan Glasser-Camp <ethan@betacantrips.com>
Wed, 11 Apr 2018 10:53:20 -0400
changeset 789217 a31ae18bab0da0f237a9235527ebc8591eba6c60
parent 789216 187c745468227ba15ceeb7d63f4159755f7fec86
push id108225
push userbmo:eglassercamp@mozilla.com
push dateFri, 27 Apr 2018 20:37:19 +0000
reviewerskitcambridge
bugs1440022
milestone61.0a1
Bug 1440022: initial proof-of-concept r?kitcambridge MozReview-Commit-ID: GMnGfpUSnox
dom/push/PushBroadcastService.jsm
dom/push/PushService.jsm
dom/push/PushServiceAndroidGCM.jsm
dom/push/PushServiceHttp2.jsm
dom/push/PushServiceWebSocket.jsm
dom/push/moz.build
new file mode 100644
--- /dev/null
+++ b/dom/push/PushBroadcastService.jsm
@@ -0,0 +1,167 @@
+/* jshint moz: true, esnext: true */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+ChromeUtils.import("resource://gre/modules/osfile.jsm");
+ChromeUtils.import("resource://gre/modules/XPCOMUtils.jsm");
+ChromeUtils.defineModuleGetter(this, "JSONFile", "resource://gre/modules/JSONFile.jsm");
+
+var EXPORTED_SYMBOLS = ["pushBroadcastService"];
+
+// We are supposed to ignore any updates with this version.
+// FIXME: what is the actual "dummy" version?
+const DUMMY_VERSION_STRING = "dummy";
+
+XPCOMUtils.defineLazyGetter(this, "console", () => {
+  let {ConsoleAPI} = ChromeUtils.import("resource://gre/modules/Console.jsm", {});
+  return new ConsoleAPI({
+    maxLogLevelPref: "dom.push.loglevel",
+    prefix: "BroadcastService",
+  });
+});
+ChromeUtils.defineModuleGetter(this, "PushService", "resource://gre/modules/PushService.jsm");
+
+class InvalidSourceInfo extends Error {
+}
+
+var BroadcastService = class {
+  constructor(pushService, path) {
+    this.pushService = pushService;
+    this.jsonFile = new JSONFile({path});
+    this.initializePromise = this.loadListenersFromStorage();
+  }
+
+  /**
+   * Convert the listeners from our on-disk format to the format
+   * needed by a subscribe message.
+   */
+  async getListeners() {
+    await this.initializePromise;
+    return Object.entries(this.jsonFile.data.listeners).reduce((acc, [k, v]) => {
+      acc[k] = v.version;
+      return acc;
+    }, {});
+  }
+
+  async loadListenersFromStorage() {
+    await this.jsonFile.load();
+    if (!this.jsonFile.data.hasOwnProperty("listeners")) {
+      this.jsonFile.data.listeners = {};
+    }
+  }
+
+  /**
+   * Reset to a state akin to what you would get in a new profile.
+   * In particular, wipe anything from storage.
+   *
+   * Used mainly for testing.
+   */
+  async _resetListeners() {
+    await this.initializePromise;
+    this.jsonFile.data = {};
+    this.initializePromise = this.loadListenersFromStorage();
+  }
+
+  /**
+   * Ensure that a sourceInfo is correct (has the expected fields).
+   */
+  _validateSourceInfo(sourceInfo) {
+    const {moduleURI, attributeName} = sourceInfo;
+    if (!moduleURI) {
+      throw new InvalidSourceInfo("missing moduleURI");
+    }
+    if (!attributeName) {
+      throw new InvalidSourceInfo("missing attributeName");
+    }
+  }
+
+  /**
+   * Add an entry for a given listener if it isn't present, or update
+   * one if it is already present.
+   */
+  async addListener(serviceId, version, sourceInfo) {
+    await this.initializePromise;
+    this._validateSourceInfo(sourceInfo);
+    const isNew = !this.jsonFile.data.listeners.hasOwnProperty(serviceId);
+    if (isNew) {
+      await this.pushService.subscribeBroadcast(serviceId, version);
+    }
+
+    this.jsonFile.data.listeners[serviceId] = {version, sourceInfo};
+    this.jsonFile.saveSoon();
+  }
+
+  async receivedBroadcastMessage(data) {
+    await this.initializePromise;
+    for (const serviceId in data.broadcasts) {
+      const version = data.broadcasts[serviceId];
+      if (version === DUMMY_VERSION_STRING) {
+        continue;
+      }
+      // We don't know this serviceID. This is probably a bug?
+      if (!this.jsonFile.data.listeners.hasOwnProperty(serviceId)) {
+        console.warn("receivedBroadcastMessage: unknown serviceId", serviceId);
+        continue;
+      }
+
+      const {sourceInfo} = this.jsonFile.data.listeners[serviceId];
+      try {
+        this._validateSourceInfo(sourceInfo);
+      } catch (e) {
+        console.error("receivedBroadcastMessage: malformed sourceInfo", sourceInfo, e);
+        continue;
+      }
+
+      const {moduleURI, attributeName} = sourceInfo;
+
+      const module = {};
+      ChromeUtils.import(moduleURI, module);
+      if (!module) {
+        console.error("receivedBroadcastMessage: couldn't invoke", serviceId, "because module missing", moduleName);
+        continue;
+      }
+
+      if (!module[attributeName]) {
+        console.error("receivedBroadcastMessage: couldn't invoke", serviceId,
+                      "because module", moduleName, "missing attribute", attributeName);
+        continue;
+      }
+
+      const handler = module[attributeName];
+
+      if (!handler.receivedBroadcastMessage) {
+        console.error("receivedBroadcastMessage: couldn't invoke", serviceId,
+                      "because handler returned by", `${moduleName}.${attributeName}`,
+                      "has no receivedBroadcastMessage method");
+        continue;
+      }
+
+      await handler.receivedBroadcastMessage(version);
+
+      if (this.jsonFile.data.listeners[serviceId].version < version) {
+        this.jsonFile.data.listeners[serviceId].version = version;
+        this.jsonFile.saveSoon();
+      }
+    }
+  }
+
+  // For test only.
+  _saveImmediately() {
+    return this.jsonFile._save();
+  }
+}
+
+function initializeBroadcastService() {
+  // Fallback path for xpcshell tests.
+  let path = "broadcast-listeners.json";
+  if (OS.Constants.Path.profileDir) {
+    // Real path for use in a real profile.
+    path = OS.Path.join(OS.Constants.Path.profileDir, path);
+  }
+  return new BroadcastService(PushService, path);
+};
+
+var pushBroadcastService = initializeBroadcastService();
--- a/dom/push/PushService.jsm
+++ b/dom/push/PushService.jsm
@@ -27,16 +27,17 @@ const CONNECTION_PROTOCOLS = (function()
     const {PushServiceAndroidGCM} = ChromeUtils.import("resource://gre/modules/PushServiceAndroidGCM.jsm");
     return [PushServiceAndroidGCM];
   }
 })();
 
 XPCOMUtils.defineLazyServiceGetter(this, "gPushNotifier",
                                    "@mozilla.org/push/Notifier;1",
                                    "nsIPushNotifier");
+ChromeUtils.defineModuleGetter(this, "pushBroadcastService", "resource://gre/modules/PushBroadcastService.jsm");
 
 var EXPORTED_SYMBOLS = ["PushService"];
 
 XPCOMUtils.defineLazyGetter(this, "console", () => {
   let {ConsoleAPI} = ChromeUtils.import("resource://gre/modules/Console.jsm", {});
   return new ConsoleAPI({
     maxLogLevelPref: "dom.push.loglevel",
     prefix: "PushService",
@@ -235,24 +236,21 @@ var PushService = {
     if (this._state == PUSH_SERVICE_RUNNING) {
       // PushService was not in the offline state, but got notification to
       // go online (a offline notification has not been sent).
       // Disconnect first.
       this._service.disconnect();
     }
 
     let records = await this.getAllUnexpired();
+    let broadcastListeners = await pushBroadcastService.getListeners();
 
     this._setState(PUSH_SERVICE_RUNNING);
 
-    if (records.length > 0 || prefs.get("alwaysConnect")) {
-      // Connect if we have existing subscriptions, or if the always-on pref
-      // is set.
-      this._service.connect(records);
-    }
+    this._service.connect(records, broadcastListeners);
   },
 
   _changeStateConnectionEnabledEvent: function(enabled) {
     console.debug("changeStateConnectionEnabledEvent()", enabled);
 
     if (this._state < PUSH_SERVICE_CONNECTION_DISABLE &&
         this._state != PUSH_SERVICE_ACTIVATING) {
       return Promise.resolve();
@@ -481,23 +479,23 @@ var PushService = {
     this._setState(PUSH_SERVICE_ACTIVATING);
 
     prefs.observe("serverURL", this);
     Services.obs.addObserver(this, "quit-application");
 
     if (options.serverURI) {
       // this is use for xpcshell test.
 
-      this._stateChangeProcessEnqueue(_ =>
+      return this._stateChangeProcessEnqueue(_ =>
         this._changeServerURL(options.serverURI, STARTING_SERVICE_EVENT, options));
 
     } else {
       // This is only used for testing. Different tests require connecting to
       // slightly different URLs.
-      this._stateChangeProcessEnqueue(_ =>
+      return this._stateChangeProcessEnqueue(_ =>
         this._changeServerURL(prefs.get("serverURL"), STARTING_SERVICE_EVENT));
     }
   },
 
   _startObservers: function() {
     console.debug("startObservers()");
 
     if (this._state != PUSH_SERVICE_ACTIVATING) {
@@ -760,16 +758,23 @@ var PushService = {
       return this._decryptAndNotifyApp(record, messageID, headers, data);
     }).catch(error => {
       console.error("receivedPushMessage: Error notifying app", error);
       return Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED;
     });
   },
 
   /**
+   * Dispatches a broadcast notification to the BroadcastService.
+   */
+  receivedBroadcastMessage(message) {
+    pushBroadcastService.receivedBroadcastMessage(message);
+  },
+
+  /**
    * Updates a registration record after receiving a push message.
    *
    * @param {String} keyID The push registration ID.
    * @param {Function} updateFunc The function passed to `receivedPushMessage`.
    * @returns {Promise} Resolves with the updated record, or rejects if the
    *  record was not updated.
    */
   _updateRecordAfterPush(keyID, updateFunc) {
@@ -1077,16 +1082,27 @@ var PushService = {
           }
           throw new Error("Push subscription expired");
         }).then(_ => this._lookupOrPutPendingRequest(aPageRecord));
       }
       return record.toSubscription();
     });
   },
 
+  async subscribeBroadcast(serviceId, version) {
+    if (this._state != PUSH_SERVICE_RUNNING) {
+      // Ignore any request to subscribe before we send a hello.
+      // We'll send all the broadcast listeners as part of the hello
+      // anyhow.
+      return;
+    }
+
+    await this._service.sendSubscribeBroadcast(serviceId, version);
+  },
+
   /**
    * Called on message from the child process.
    *
    * Why is the record being deleted from the local database before the server
    * is told?
    *
    * Unregistration is for the benefit of the app and the AppServer
    * so that the AppServer does not keep pinging a channel the UserAgent isn't
--- a/dom/push/PushServiceAndroidGCM.jsm
+++ b/dom/push/PushServiceAndroidGCM.jsm
@@ -162,17 +162,17 @@ var PushServiceAndroidGCM = {
     Services.obs.removeObserver(this, "PushServiceAndroidGCM:ReceivedPushMessage");
     prefs.ignore("debug", this);
   },
 
   onAlarmFired: function() {
     // No action required.
   },
 
-  connect: function(records) {
+  connect: function(records, broadcastListeners) {
     console.debug("connect:", records);
     // It's possible for the registration or subscriptions backing the
     // PushService to not be registered with the underlying AndroidPushService.
     // Expire those that are unrecognized.
     return EventDispatcher.instance.sendRequestForResult({
       type: "PushServiceAndroidGCM:DumpSubscriptions",
     })
     .then(subscriptions => {
@@ -189,16 +189,20 @@ var PushServiceAndroidGCM = {
         return this._mainPushService.dropRegistrationAndNotifyApp(record.keyID)
           .catch(error => {
             console.error("connect: Error dropping registration", record.keyID, error);
           });
       }));
     });
   },
 
+  sendSubscribeBroadcast: async function(serviceId, version) {
+    // Not implemented yet
+  },
+
   isConnected: function() {
     return this._mainPushService != null;
   },
 
   disconnect: function() {
     console.debug("disconnect");
   },
 
--- a/dom/push/PushServiceHttp2.jsm
+++ b/dom/push/PushServiceHttp2.jsm
@@ -426,20 +426,24 @@ var PushServiceHttp2 = {
 
   validServerURI: function(serverURI) {
     if (serverURI.scheme == "http") {
       return !!prefs.getBoolPref("testing.allowInsecureServerURL", false);
     }
     return serverURI.scheme == "https";
   },
 
-  connect: function(subscriptions) {
+  connect: function(subscriptions, broadcastListeners) {
     this.startConnections(subscriptions);
   },
 
+  sendSubscribeBroadcast: async function(serviceId, version) {
+    // Not implemented yet
+  },
+
   isConnected: function() {
     return this._mainPushService != null;
   },
 
   disconnect: function() {
     this._shutdownConnections(false);
   },
 
--- a/dom/push/PushServiceWebSocket.jsm
+++ b/dom/push/PushServiceWebSocket.jsm
@@ -309,16 +309,18 @@ var PushServiceWebSocket = {
     this._ws.sendMsg(msg);
   },
 
   init: function(options, mainPushService, serverURI) {
     console.debug("init()");
 
     this._mainPushService = mainPushService;
     this._serverURI = serverURI;
+    // Filled in at connect() time
+    this._broadcastListeners = null;
 
     // Override the default WebSocket factory function. The returned object
     // must be null or satisfy the nsIWebSocketChannel interface. Used by
     // the tests to provide a mock WebSocket implementation.
     if (options.makeWebSocket) {
       this._makeWebSocket = options.makeWebSocket;
     }
 
@@ -507,18 +509,19 @@ var PushServiceWebSocket = {
       this._currentState = STATE_WAITING_FOR_WS_START;
     } catch(e) {
       console.error("beginWSSetup: Error opening websocket.",
         "asyncOpen failed", e);
       this._reconnect();
     }
   },
 
-  connect: function(records) {
-    console.debug("connect()");
+  connect: function(records, broadcastListeners) {
+    console.debug("connect()", broadcastListeners);
+    this._broadcastListeners = broadcastListeners;
     this._beginWSSetup();
   },
 
   isConnected: function() {
     return !!this._ws;
   },
 
   /**
@@ -742,16 +745,20 @@ var PushServiceWebSocket = {
       if (typeof version === "number" && version >= 0) {
         // FIXME(nsm): this relies on app update notification being infallible!
         // eventually fix this
         this._receivedUpdate(update.channelID, version);
       }
     }
   },
 
+  _handleBroadcastReply: function(reply) {
+    this._mainPushService.receivedBroadcastMessage(reply);
+  },
+
   reportDeliveryError(messageID, reason) {
     console.debug("reportDeliveryError()");
     let code = kDELIVERY_REASON_TO_CODE[reason];
     if (!code) {
       throw new Error('Invalid delivery error reason');
     }
     let data = {messageType: 'nack',
                 version: messageID,
@@ -940,16 +947,17 @@ var PushServiceWebSocket = {
     if (this._currentState != STATE_WAITING_FOR_WS_START) {
       console.error("wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
         "state", this._currentState, "Skipping");
       return;
     }
 
     let data = {
       messageType: "hello",
+      broadcasts: this._broadcastListeners,
       use_webpush: true,
     };
 
     if (this._UAID) {
       data.uaid = this._UAID;
     }
 
     this._wsSendMessage(data);
@@ -1008,17 +1016,17 @@ var PushServiceWebSocket = {
 
     // If it is a ping, do not handle the message.
     if (doNotHandle) {
       return;
     }
 
     // A whitelist of protocol handlers. Add to these if new messages are added
     // in the protocol.
-    let handlers = ["Hello", "Register", "Unregister", "Notification"];
+    let handlers = ["Hello", "Register", "Unregister", "Notification", "Broadcast"];
 
     // Build up the handler name to call from messageType.
     // e.g. messageType == "register" -> _handleRegisterReply.
     let handlerName = reply.messageType[0].toUpperCase() +
                       reply.messageType.slice(1).toLowerCase();
 
     if (!handlers.includes(handlerName)) {
       console.warn("wsOnMessageAvailable: No whitelisted handler", handlerName,
@@ -1107,16 +1115,27 @@ var PushServiceWebSocket = {
       return null;
     }
     this._pendingRequests.delete(key);
     if (!this._hasPendingRequests()) {
       this._requestTimeoutTimer.cancel();
     }
     return request;
   },
+
+  sendSubscribeBroadcast(serviceId, version) {
+    let data = {
+      messageType: "broadcast_subscribe",
+      broadcasts: {
+        [serviceId]: version
+      },
+    };
+
+    this._queueRequest(data);
+  },
 };
 
 function PushRecordWebSocket(record) {
   PushRecord.call(this, record);
   this.channelID = record.channelID;
   this.version = record.version;
 }
 
--- a/dom/push/moz.build
+++ b/dom/push/moz.build
@@ -8,16 +8,17 @@ with Files("**"):
 
 EXTRA_COMPONENTS += [
     'Push.js',
     'Push.manifest',
     'PushComponents.js',
 ]
 
 EXTRA_JS_MODULES += [
+    'PushBroadcastService.jsm',
     'PushCrypto.jsm',
     'PushDB.jsm',
     'PushRecord.jsm',
     'PushService.jsm',
 ]
 
 if CONFIG['MOZ_BUILD_APP'] != 'mobile/android':
     # Everything but Fennec.