Bug 1440022: initial implementation r=lina
authorEthan Glasser-Camp <ethan@betacantrips.com>
Wed, 11 Apr 2018 10:53:20 -0400
changeset 477027 4b750a5cd250aacbc7e31b9ca73d6fdac59be65b
parent 477026 0c6883fcf169ad9d4dd8ddf2b677363c468428a1
child 477028 5402f880b4276aa215824b0ae60e9f195b62c9a2
push id9374
push userjlund@mozilla.com
push dateMon, 18 Jun 2018 21:43:20 +0000
treeherdermozilla-beta@160e085dfb0b [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerslina
bugs1440022
milestone62.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1440022: initial implementation r=lina MozReview-Commit-ID: GMnGfpUSnox
dom/push/PushBroadcastService.jsm
dom/push/PushService.jsm
dom/push/PushServiceAndroidGCM.jsm
dom/push/PushServiceHttp2.jsm
dom/push/PushServiceWebSocket.jsm
dom/push/moz.build
new file mode 100644
--- /dev/null
+++ b/dom/push/PushBroadcastService.jsm
@@ -0,0 +1,216 @@
+/* jshint moz: true, esnext: true */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+ChromeUtils.import("resource://gre/modules/osfile.jsm");
+ChromeUtils.import("resource://gre/modules/XPCOMUtils.jsm");
+ChromeUtils.defineModuleGetter(this, "JSONFile", "resource://gre/modules/JSONFile.jsm");
+
+var EXPORTED_SYMBOLS = ["pushBroadcastService"];
+
+// We are supposed to ignore any updates with this version.
+// FIXME: what is the actual "dummy" version?
+// See bug 1467550.
+const DUMMY_VERSION_STRING = "dummy";
+
+XPCOMUtils.defineLazyGetter(this, "console", () => {
+  let {ConsoleAPI} = ChromeUtils.import("resource://gre/modules/Console.jsm", {});
+  return new ConsoleAPI({
+    maxLogLevelPref: "dom.push.loglevel",
+    prefix: "BroadcastService",
+  });
+});
+ChromeUtils.defineModuleGetter(this, "PushService", "resource://gre/modules/PushService.jsm");
+
+class InvalidSourceInfo extends Error {
+  constructor(message) {
+    super(message);
+    this.name = 'InvalidSourceInfo';
+  }
+}
+
+const BROADCAST_SERVICE_VERSION = 1;
+
+var BroadcastService = class {
+  constructor(pushService, path) {
+    this.pushService = pushService;
+    this.jsonFile = new JSONFile({
+      path,
+      dataPostProcessor: this._initializeJSONFile,
+    });
+    this.initializePromise = this.jsonFile.load();
+  }
+
+  /**
+   * Convert the listeners from our on-disk format to the format
+   * needed by a hello message.
+   */
+  async getListeners() {
+    await this.initializePromise;
+    return Object.entries(this.jsonFile.data.listeners).reduce((acc, [k, v]) => {
+      acc[k] = v.version;
+      return acc;
+    }, {});
+  }
+
+  _initializeJSONFile(data) {
+    if (!data.version) {
+      data.version = BROADCAST_SERVICE_VERSION;
+    }
+    if (!data.hasOwnProperty("listeners")) {
+      data.listeners = {};
+    }
+    return data;
+  }
+
+  /**
+   * Reset to a state akin to what you would get in a new profile.
+   * In particular, wipe anything from storage.
+   *
+   * Used mainly for testing.
+   */
+  async _resetListeners() {
+    await this.initializePromise;
+    this.jsonFile.data = this._initializeJSONFile({});
+    this.initializePromise = Promise.resolve();
+  }
+
+  /**
+   * Ensure that a sourceInfo is correct (has the expected fields).
+   */
+  _validateSourceInfo(sourceInfo) {
+    const {moduleURI, symbolName} = sourceInfo;
+    if (typeof moduleURI !== "string") {
+      throw new InvalidSourceInfo(`moduleURI must be a string (got ${typeof moduleURI})`);
+    }
+    if (typeof symbolName !== "string") {
+      throw new InvalidSourceInfo(`symbolName must be a string (got ${typeof symbolName})`);
+    }
+  }
+
+  /**
+   * Add an entry for a given listener if it isn't present, or update
+   * one if it is already present.
+   *
+   * Note that this means only a single listener can be set for a
+   * given subscription. This is a limitation in the current API that
+   * stems from the fact that there can only be one source of truth
+   * for the subscriber's version. As a workaround, you can define a
+   * listener which calls multiple other listeners.
+   *
+   * @param {string} broadcastId The broadcastID to listen for
+   * @param {string} version The most recent version we have for
+   *   updates from this broadcastID
+   * @param {Object} sourceInfo A description of the handler for
+   *   updates on this broadcastID
+   */
+  async addListener(broadcastId, version, sourceInfo) {
+    console.info("addListener: adding listener", broadcastId, version, sourceInfo);
+    await this.initializePromise;
+    this._validateSourceInfo(sourceInfo);
+    if (typeof version !== "string") {
+      throw new TypeError("version should be a string");
+    }
+    const isNew = !this.jsonFile.data.listeners.hasOwnProperty(broadcastId);
+
+    // Update listeners before telling the pushService to subscribe,
+    // in case it would disregard the update in the small window
+    // between getting listeners and setting state to RUNNING.
+    this.jsonFile.data.listeners[broadcastId] = {version, sourceInfo};
+    this.jsonFile.saveSoon();
+
+    if (isNew) {
+      await this.pushService.subscribeBroadcast(broadcastId, version);
+    }
+  }
+
+  async receivedBroadcastMessage(broadcasts) {
+    console.info("receivedBroadcastMessage:", broadcasts);
+    await this.initializePromise;
+    for (const broadcastId in broadcasts) {
+      const version = broadcasts[broadcastId];
+      if (version === DUMMY_VERSION_STRING) {
+        console.info("Ignoring", version, "because it's the dummy version");
+        continue;
+      }
+      // We don't know this broadcastID. This is probably a bug?
+      if (!this.jsonFile.data.listeners.hasOwnProperty(broadcastId)) {
+        console.warn("receivedBroadcastMessage: unknown broadcastId", broadcastId);
+        continue;
+      }
+
+      const {sourceInfo} = this.jsonFile.data.listeners[broadcastId];
+      try {
+        this._validateSourceInfo(sourceInfo);
+      } catch (e) {
+        console.error("receivedBroadcastMessage: malformed sourceInfo", sourceInfo, e);
+        continue;
+      }
+
+      const {moduleURI, symbolName} = sourceInfo;
+
+      const module = {};
+      try {
+        ChromeUtils.import(moduleURI, module);
+      } catch (e) {
+        console.error("receivedBroadcastMessage: couldn't invoke", broadcastId,
+                      "because import of module", moduleURI,
+                      "failed", e);
+        continue;
+      }
+
+      if (!module[symbolName]) {
+        console.error("receivedBroadcastMessage: couldn't invoke", broadcastId,
+                      "because module", moduleName, "missing attribute", symbolName);
+        continue;
+      }
+
+      const handler = module[symbolName];
+
+      if (!handler.receivedBroadcastMessage) {
+        console.error("receivedBroadcastMessage: couldn't invoke", broadcastId,
+                      "because handler returned by", `${moduleURI}.${symbolName}`,
+                      "has no receivedBroadcastMessage method");
+        continue;
+      }
+
+      try {
+        await handler.receivedBroadcastMessage(version, broadcastId);
+      } catch (e) {
+        console.error("receivedBroadcastMessage: handler for", broadcastId,
+                      "threw error:", e);
+        continue;
+      }
+
+      // Broadcast message applied successfully. Update the version we
+      // received if it's different than the one we had.  We don't
+      // enforce an ordering here (i.e. we use != instead of <)
+      // because we don't know what the ordering of the service's
+      // versions is going to be.
+      if (this.jsonFile.data.listeners[broadcastId].version != version) {
+        this.jsonFile.data.listeners[broadcastId].version = version;
+        this.jsonFile.saveSoon();
+      }
+    }
+  }
+
+  // For test only.
+  _saveImmediately() {
+    return this.jsonFile._save();
+  }
+}
+
+function initializeBroadcastService() {
+  // Fallback path for xpcshell tests.
+  let path = "broadcast-listeners.json";
+  if (OS.Constants.Path.profileDir) {
+    // Real path for use in a real profile.
+    path = OS.Path.join(OS.Constants.Path.profileDir, path);
+  }
+  return new BroadcastService(PushService, path);
+};
+
+var pushBroadcastService = initializeBroadcastService();
--- a/dom/push/PushService.jsm
+++ b/dom/push/PushService.jsm
@@ -30,16 +30,17 @@ const CONNECTION_PROTOCOLS = (function()
 })();
 
 XPCOMUtils.defineLazyServiceGetter(this, "gPushNotifier",
                                    "@mozilla.org/push/Notifier;1",
                                    "nsIPushNotifier");
 XPCOMUtils.defineLazyServiceGetter(this, "eTLDService",
                                    "@mozilla.org/network/effective-tld-service;1",
                                    "nsIEffectiveTLDService");
+ChromeUtils.defineModuleGetter(this, "pushBroadcastService", "resource://gre/modules/PushBroadcastService.jsm");
 
 var EXPORTED_SYMBOLS = ["PushService"];
 
 XPCOMUtils.defineLazyGetter(this, "console", () => {
   let {ConsoleAPI} = ChromeUtils.import("resource://gre/modules/Console.jsm", {});
   return new ConsoleAPI({
     maxLogLevelPref: "dom.push.loglevel",
     prefix: "PushService",
@@ -211,23 +212,34 @@ var PushService = {
     if (this._state == PUSH_SERVICE_RUNNING) {
       // PushService was not in the offline state, but got notification to
       // go online (a offline notification has not been sent).
       // Disconnect first.
       this._service.disconnect();
     }
 
     let records = await this.getAllUnexpired();
+    let broadcastListeners = await pushBroadcastService.getListeners();
 
+    // In principle, a listener could be added to the
+    // pushBroadcastService here, after we have gotten listeners and
+    // before we're RUNNING, but this can't happen in practice because
+    // the only caller that can add listeners is PushBroadcastService,
+    // and it waits on the same promise we are before it can add
+    // listeners. If PushBroadcastService gets woken first, it will
+    // update the value that is eventually returned from
+    // getListeners.
     this._setState(PUSH_SERVICE_RUNNING);
 
     if (records.length > 0 || prefs.get("alwaysConnect")) {
       // Connect if we have existing subscriptions, or if the always-on pref
-      // is set.
-      this._service.connect(records);
+      // is set. We gate on the pref to let us do load testing before
+      // turning it on for everyone, but if the user has push
+      // subscriptions, we need to connect them anyhow.
+      this._service.connect(records, broadcastListeners);
     }
   },
 
   _changeStateConnectionEnabledEvent: function(enabled) {
     console.debug("changeStateConnectionEnabledEvent()", enabled);
 
     if (this._state < PUSH_SERVICE_CONNECTION_DISABLE &&
         this._state != PUSH_SERVICE_ACTIVATING) {
@@ -457,23 +469,23 @@ var PushService = {
     this._setState(PUSH_SERVICE_ACTIVATING);
 
     prefs.observe("serverURL", this);
     Services.obs.addObserver(this, "quit-application");
 
     if (options.serverURI) {
       // this is use for xpcshell test.
 
-      this._stateChangeProcessEnqueue(_ =>
+      return this._stateChangeProcessEnqueue(_ =>
         this._changeServerURL(options.serverURI, STARTING_SERVICE_EVENT, options));
 
     } else {
       // This is only used for testing. Different tests require connecting to
       // slightly different URLs.
-      this._stateChangeProcessEnqueue(_ =>
+      return this._stateChangeProcessEnqueue(_ =>
         this._changeServerURL(prefs.get("serverURL"), STARTING_SERVICE_EVENT));
     }
   },
 
   _startObservers: function() {
     console.debug("startObservers()");
 
     if (this._state != PUSH_SERVICE_ACTIVATING) {
@@ -736,16 +748,26 @@ var PushService = {
       return this._decryptAndNotifyApp(record, messageID, headers, data);
     }).catch(error => {
       console.error("receivedPushMessage: Error notifying app", error);
       return Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED;
     });
   },
 
   /**
+   * Dispatches a broadcast notification to the BroadcastService.
+   */
+  receivedBroadcastMessage(message) {
+    pushBroadcastService.receivedBroadcastMessage(message.broadcasts)
+      .catch(e => {
+        console.error(e);
+      });;
+  },
+
+  /**
    * Updates a registration record after receiving a push message.
    *
    * @param {String} keyID The push registration ID.
    * @param {Function} updateFunc The function passed to `receivedPushMessage`.
    * @returns {Promise} Resolves with the updated record, or rejects if the
    *  record was not updated.
    */
   _updateRecordAfterPush(keyID, updateFunc) {
@@ -1053,16 +1075,31 @@ var PushService = {
           }
           throw new Error("Push subscription expired");
         }).then(_ => this._lookupOrPutPendingRequest(aPageRecord));
       }
       return record.toSubscription();
     });
   },
 
+  /*
+   * Called only by the PushBroadcastService on the receipt of a new
+   * subscription. Don't call this directly. Go through PushBroadcastService.
+   */
+  async subscribeBroadcast(broadcastId, version) {
+    if (this._state != PUSH_SERVICE_RUNNING) {
+      // Ignore any request to subscribe before we send a hello.
+      // We'll send all the broadcast listeners as part of the hello
+      // anyhow.
+      return;
+    }
+
+    await this._service.sendSubscribeBroadcast(broadcastId, version);
+  },
+
   /**
    * Called on message from the child process.
    *
    * Why is the record being deleted from the local database before the server
    * is told?
    *
    * Unregistration is for the benefit of the app and the AppServer
    * so that the AppServer does not keep pinging a channel the UserAgent isn't
--- a/dom/push/PushServiceAndroidGCM.jsm
+++ b/dom/push/PushServiceAndroidGCM.jsm
@@ -162,17 +162,17 @@ var PushServiceAndroidGCM = {
     Services.obs.removeObserver(this, "PushServiceAndroidGCM:ReceivedPushMessage");
     prefs.ignore("debug", this);
   },
 
   onAlarmFired: function() {
     // No action required.
   },
 
-  connect: function(records) {
+  connect: function(records, broadcastListeners) {
     console.debug("connect:", records);
     // It's possible for the registration or subscriptions backing the
     // PushService to not be registered with the underlying AndroidPushService.
     // Expire those that are unrecognized.
     return EventDispatcher.instance.sendRequestForResult({
       type: "PushServiceAndroidGCM:DumpSubscriptions",
     })
     .then(subscriptions => {
@@ -189,16 +189,20 @@ var PushServiceAndroidGCM = {
         return this._mainPushService.dropRegistrationAndNotifyApp(record.keyID)
           .catch(error => {
             console.error("connect: Error dropping registration", record.keyID, error);
           });
       }));
     });
   },
 
+  sendSubscribeBroadcast: async function(serviceId, version) {
+    // Not implemented yet
+  },
+
   isConnected: function() {
     return this._mainPushService != null;
   },
 
   disconnect: function() {
     console.debug("disconnect");
   },
 
--- a/dom/push/PushServiceHttp2.jsm
+++ b/dom/push/PushServiceHttp2.jsm
@@ -421,20 +421,24 @@ var PushServiceHttp2 = {
 
   validServerURI: function(serverURI) {
     if (serverURI.scheme == "http") {
       return !!prefs.getBoolPref("testing.allowInsecureServerURL", false);
     }
     return serverURI.scheme == "https";
   },
 
-  connect: function(subscriptions) {
+  connect: function(subscriptions, broadcastListeners) {
     this.startConnections(subscriptions);
   },
 
+  sendSubscribeBroadcast: async function(serviceId, version) {
+    // Not implemented yet
+  },
+
   isConnected: function() {
     return this._mainPushService != null;
   },
 
   disconnect: function() {
     this._shutdownConnections(false);
   },
 
--- a/dom/push/PushServiceWebSocket.jsm
+++ b/dom/push/PushServiceWebSocket.jsm
@@ -309,16 +309,18 @@ var PushServiceWebSocket = {
     this._ws.sendMsg(msg);
   },
 
   init: function(options, mainPushService, serverURI) {
     console.debug("init()");
 
     this._mainPushService = mainPushService;
     this._serverURI = serverURI;
+    // Filled in at connect() time
+    this._broadcastListeners = null;
 
     // Override the default WebSocket factory function. The returned object
     // must be null or satisfy the nsIWebSocketChannel interface. Used by
     // the tests to provide a mock WebSocket implementation.
     if (options.makeWebSocket) {
       this._makeWebSocket = options.makeWebSocket;
     }
 
@@ -507,18 +509,19 @@ var PushServiceWebSocket = {
       this._currentState = STATE_WAITING_FOR_WS_START;
     } catch(e) {
       console.error("beginWSSetup: Error opening websocket.",
         "asyncOpen failed", e);
       this._reconnect();
     }
   },
 
-  connect: function(records) {
-    console.debug("connect()");
+  connect: function(records, broadcastListeners) {
+    console.debug("connect()", broadcastListeners);
+    this._broadcastListeners = broadcastListeners;
     this._beginWSSetup();
   },
 
   isConnected: function() {
     return !!this._ws;
   },
 
   /**
@@ -561,16 +564,23 @@ var PushServiceWebSocket = {
       this._sendPendingRequests();
     };
 
     function finishHandshake() {
       this._UAID = reply.uaid;
       this._currentState = STATE_READY;
       prefs.observe("userAgentID", this);
 
+      // Handle broadcasts received in response to the "hello" message.
+      if (reply.broadcasts) {
+        // The reply isn't technically a broadcast message, but it has
+        // the shape of a broadcast message (it has a broadcasts field).
+        this._mainPushService.receivedBroadcastMessage(reply);
+      }
+
       this._dataEnabled = !!reply.use_webpush;
       if (this._dataEnabled) {
         this._mainPushService.getAllUnexpired().then(records =>
           Promise.all(records.map(record =>
             this._mainPushService.ensureCrypto(record).catch(error => {
               console.error("finishHandshake: Error updating record",
                 record.keyID, error);
             })
@@ -742,16 +752,20 @@ var PushServiceWebSocket = {
       if (typeof version === "number" && version >= 0) {
         // FIXME(nsm): this relies on app update notification being infallible!
         // eventually fix this
         this._receivedUpdate(update.channelID, version);
       }
     }
   },
 
+  _handleBroadcastReply: function(reply) {
+    this._mainPushService.receivedBroadcastMessage(reply);
+  },
+
   reportDeliveryError(messageID, reason) {
     console.debug("reportDeliveryError()");
     let code = kDELIVERY_REASON_TO_CODE[reason];
     if (!code) {
       throw new Error('Invalid delivery error reason');
     }
     let data = {messageType: 'nack',
                 version: messageID,
@@ -940,16 +954,17 @@ var PushServiceWebSocket = {
     if (this._currentState != STATE_WAITING_FOR_WS_START) {
       console.error("wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
         "state", this._currentState, "Skipping");
       return;
     }
 
     let data = {
       messageType: "hello",
+      broadcasts: this._broadcastListeners,
       use_webpush: true,
     };
 
     if (this._UAID) {
       data.uaid = this._UAID;
     }
 
     this._wsSendMessage(data);
@@ -1008,17 +1023,17 @@ var PushServiceWebSocket = {
 
     // If it is a ping, do not handle the message.
     if (doNotHandle) {
       return;
     }
 
     // A whitelist of protocol handlers. Add to these if new messages are added
     // in the protocol.
-    let handlers = ["Hello", "Register", "Unregister", "Notification"];
+    let handlers = ["Hello", "Register", "Unregister", "Notification", "Broadcast"];
 
     // Build up the handler name to call from messageType.
     // e.g. messageType == "register" -> _handleRegisterReply.
     let handlerName = reply.messageType[0].toUpperCase() +
                       reply.messageType.slice(1).toLowerCase();
 
     if (!handlers.includes(handlerName)) {
       console.warn("wsOnMessageAvailable: No whitelisted handler", handlerName,
@@ -1107,16 +1122,27 @@ var PushServiceWebSocket = {
       return null;
     }
     this._pendingRequests.delete(key);
     if (!this._hasPendingRequests()) {
       this._requestTimeoutTimer.cancel();
     }
     return request;
   },
+
+  sendSubscribeBroadcast(serviceId, version) {
+    let data = {
+      messageType: "broadcast_subscribe",
+      broadcasts: {
+        [serviceId]: version
+      },
+    };
+
+    this._queueRequest(data);
+  },
 };
 
 function PushRecordWebSocket(record) {
   PushRecord.call(this, record);
   this.channelID = record.channelID;
   this.version = record.version;
 }
 
--- a/dom/push/moz.build
+++ b/dom/push/moz.build
@@ -8,16 +8,17 @@ with Files("**"):
 
 EXTRA_COMPONENTS += [
     'Push.js',
     'Push.manifest',
     'PushComponents.js',
 ]
 
 EXTRA_JS_MODULES += [
+    'PushBroadcastService.jsm',
     'PushCrypto.jsm',
     'PushDB.jsm',
     'PushRecord.jsm',
     'PushService.jsm',
 ]
 
 if CONFIG['MOZ_BUILD_APP'] != 'mobile/android':
     # Everything but Fennec.