Bug 1469564 - Split RemoteSettings client into dedicated module r=glasserc
authorMathieu Leplatre <mathieu@mozilla.com>
Tue, 27 Nov 2018 10:34:16 +0000
changeset 504740 d4382e31f5e4560776a548c597014554bb76b74f
parent 504739 614d2236358a744b6650d3f1b6f3b9c30a0357a9
child 504741 a3d441ca4bb9e93d2cd9352e305f52de4460476d
push id10290
push userffxbld-merge
push dateMon, 03 Dec 2018 16:23:23 +0000
treeherdermozilla-beta@700bed2445e6 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersglasserc
bugs1469564
milestone65.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1469564 - Split RemoteSettings client into dedicated module r=glasserc Take RemoteSettingsClient code out of remote-settings.js Differential Revision: https://phabricator.services.mozilla.com/D12775
services/settings/RemoteSettingsClient.jsm
services/settings/moz.build
services/settings/remote-settings.js
new file mode 100644
--- /dev/null
+++ b/services/settings/RemoteSettingsClient.jsm
@@ -0,0 +1,478 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+var EXPORTED_SYMBOLS = [
+  "RemoteSettingsClient",
+];
+
+ChromeUtils.import("resource://gre/modules/Services.jsm");
+ChromeUtils.import("resource://gre/modules/XPCOMUtils.jsm");
+
+ChromeUtils.defineModuleGetter(this, "Kinto",
+                               "resource://services-common/kinto-offline-client.js");
+ChromeUtils.defineModuleGetter(this, "KintoHttpClient",
+                               "resource://services-common/kinto-http-client.js");
+ChromeUtils.defineModuleGetter(this, "UptakeTelemetry",
+                               "resource://services-common/uptake-telemetry.js");
+ChromeUtils.defineModuleGetter(this, "ClientEnvironmentBase",
+                               "resource://gre/modules/components-utils/ClientEnvironment.jsm");
+ChromeUtils.defineModuleGetter(this, "RemoteSettingsWorker",
+                               "resource://services-settings/RemoteSettingsWorker.jsm");
+
+XPCOMUtils.defineLazyGlobalGetters(this, ["fetch"]);
+
+// IndexedDB name.
+const DB_NAME = "remote-settings";
+
+const INVALID_SIGNATURE = "Invalid content signature";
+const MISSING_SIGNATURE = "Missing signature";
+
+XPCOMUtils.defineLazyPreferenceGetter(this, "gServerURL",
+                                      "services.settings.server");
+XPCOMUtils.defineLazyPreferenceGetter(this, "gVerifySignature",
+                                      "services.settings.verify_signature", true);
+
+/**
+ * cacheProxy returns an object Proxy that will memoize properties of the target.
+ * @param {Object} target the object to wrap.
+ * @returns {Proxy}
+ */
+function cacheProxy(target) {
+  const cache = new Map();
+  return new Proxy(target, {
+    get(target, prop, receiver) {
+      if (!cache.has(prop)) {
+        cache.set(prop, target[prop]);
+      }
+      return cache.get(prop);
+    },
+  });
+}
+
+class ClientEnvironment extends ClientEnvironmentBase {
+  static get appID() {
+    // eg. Firefox is "{ec8030f7-c20a-464f-9b0e-13a3a9e97384}".
+    Services.appinfo.QueryInterface(Ci.nsIXULAppInfo);
+    return Services.appinfo.ID;
+  }
+
+  static get toolkitVersion() {
+    Services.appinfo.QueryInterface(Ci.nsIPlatformInfo);
+    return Services.appinfo.platformVersion;
+  }
+}
+
+/**
+ * Retrieve the Autograph signature information from the collection metadata.
+ *
+ * @param {String} bucket Bucket name.
+ * @param {String} collection Collection name.
+ * @param {int} expectedTimestamp Timestamp to be used for cache busting.
+ * @returns {Promise<{String, String}>}
+ */
+async function fetchCollectionSignature(bucket, collection, expectedTimestamp) {
+  const client = new KintoHttpClient(gServerURL);
+  const { signature: signaturePayload } = await client.bucket(bucket)
+    .collection(collection)
+    .getData({ query: { _expected: expectedTimestamp } });
+  if (!signaturePayload) {
+    throw new Error(MISSING_SIGNATURE);
+  }
+  const { x5u, signature } = signaturePayload;
+  const certChainResponse = await fetch(x5u);
+  const certChain = await certChainResponse.text();
+
+  return { signature, certChain };
+}
+
+/**
+ * Retrieve the current list of remote records.
+ *
+ * @param {String} bucket Bucket name.
+ * @param {String} collection Collection name.
+ * @param {int} expectedTimestamp Timestamp to be used for cache busting.
+ */
+async function fetchRemoteRecords(bucket, collection, expectedTimestamp) {
+  const client = new KintoHttpClient(gServerURL);
+  return client.bucket(bucket)
+    .collection(collection)
+    .listRecords({ sort: "id", filters: { _expected: expectedTimestamp } });
+}
+
+/**
+ * Minimalist event emitter.
+ *
+ * Note: we don't use `toolkit/modules/EventEmitter` because **we want** to throw
+ * an error when a listener fails to execute.
+ */
+class EventEmitter {
+
+  constructor(events) {
+    this._listeners = new Map();
+    for (const event of events) {
+      this._listeners.set(event, []);
+    }
+  }
+
+  /**
+   * Event emitter: will execute the registered listeners in the order and
+   * sequentially.
+   *
+   * @param {string} event    the event name
+   * @param {Object} payload  the event payload to call the listeners with
+   */
+  async emit(event, payload) {
+    const callbacks = this._listeners.get("sync");
+    let lastError;
+    for (const cb of callbacks) {
+      try {
+        await cb(payload);
+      } catch (e) {
+        lastError = e;
+      }
+    }
+    if (lastError) {
+      throw lastError;
+    }
+  }
+
+  on(event, callback) {
+    if (!this._listeners.has(event)) {
+      throw new Error(`Unknown event type ${event}`);
+    }
+    this._listeners.get(event).push(callback);
+  }
+
+  off(event, callback) {
+    if (!this._listeners.has(event)) {
+      throw new Error(`Unknown event type ${event}`);
+    }
+    const callbacks = this._listeners.get(event);
+    const i = callbacks.indexOf(callback);
+    if (i < 0) {
+      throw new Error(`Unknown callback`);
+    } else {
+      callbacks.splice(i, 1);
+    }
+  }
+}
+
+
+class RemoteSettingsClient extends EventEmitter {
+
+  constructor(collectionName, { bucketNamePref, signerName, filterFunc, localFields = [], lastCheckTimePref }) {
+    super(["sync"]);
+
+    this.collectionName = collectionName;
+    this.signerName = signerName;
+    this.filterFunc = filterFunc;
+    this.localFields = localFields;
+    this._lastCheckTimePref = lastCheckTimePref;
+
+    // The bucket preference value can be changed (eg. `main` to `main-preview`) in order
+    // to preview the changes to be approved in a real client.
+    this.bucketNamePref = bucketNamePref;
+    XPCOMUtils.defineLazyPreferenceGetter(this, "bucketName", this.bucketNamePref);
+
+    XPCOMUtils.defineLazyGetter(this, "_kinto", () => new Kinto({
+      bucket: this.bucketName,
+      adapter: Kinto.adapters.IDB,
+      adapterOptions: { dbName: DB_NAME, migrateOldData: false },
+    }));
+  }
+
+  get identifier() {
+    return `${this.bucketName}/${this.collectionName}`;
+  }
+
+  get lastCheckTimePref() {
+    return this._lastCheckTimePref || `services.settings.${this.bucketName}.${this.collectionName}.last_check`;
+  }
+
+  /**
+   * Open the underlying Kinto collection, using the appropriate adapter and options.
+   */
+  async openCollection() {
+    const options = {
+      localFields: this.localFields,
+      bucket: this.bucketName,
+    };
+    return this._kinto.collection(this.collectionName, options);
+  }
+
+  /**
+   * Lists settings.
+   *
+   * @param  {Object} options         The options object.
+   * @param  {Object} options.filters Filter the results (default: `{}`).
+   * @param  {Object} options.order   The order to apply (eg. `-last_modified`).
+   * @return {Promise}
+   */
+  async get(options = {}) {
+    const {
+      filters = {},
+      order = "", // not sorted by default.
+    } = options;
+
+    const c = await this.openCollection();
+
+    const timestamp = await c.db.getLastModified();
+    if (timestamp == null) {
+      // The local database for this collection was never synchronized.
+      // Before returning an empty list, we attempt to load a packaged JSON dump.
+      try {
+        // Load JSON dump if there is one.
+        await RemoteSettingsWorker.importJSONDump(this.bucketName, this.collectionName);
+      } catch (e) {
+        // Report but return an empty list since there will be no data anyway.
+        Cu.reportError(e);
+        return [];
+      }
+    }
+
+    // Read from the local DB.
+    const { data } = await c.list({ filters, order });
+    // Filter the records based on `this.filterFunc` results.
+    return this._filterEntries(data);
+  }
+
+  /**
+   * Synchronize from Kinto server, if necessary.
+   *
+   * @param {int}    expectedTimestamp the lastModified date (on the server) for the remote collection.
+   *                                   This will be compared to the local timestamp, and will be used for
+   *                                   cache busting if local data is out of date.
+   * @param {int}   serverTimeMillis   the current date return by the server.
+   *                                   This is only used to track the last check or synchronization.
+   * @param {Object} options           additional advanced options.
+   * @param {bool}   options.loadDump  load initial dump from disk on first sync (default: true)
+   * @return {Promise}                 which rejects on sync or process failure.
+   */
+  async maybeSync(expectedTimestamp, serverTimeMillis, options = { loadDump: true }) {
+    const { loadDump } = options;
+
+    let reportStatus = null;
+    try {
+      const collection = await this.openCollection();
+      // Synchronize remote data into a local Sqlite DB.
+      let collectionLastModified = await collection.db.getLastModified();
+
+      // If there is no data currently in the collection, attempt to import
+      // initial data from the application defaults.
+      // This allows to avoid synchronizing the whole collection content on
+      // cold start.
+      if (!collectionLastModified && loadDump) {
+        try {
+          await RemoteSettingsWorker.importJSONDump(this.bucketName, this.collectionName);
+          collectionLastModified = await collection.db.getLastModified();
+        } catch (e) {
+          // Report but go-on.
+          Cu.reportError(e);
+        }
+      }
+
+      // If the data is up to date, there's no need to sync. We still need
+      // to record the fact that a check happened.
+      if (expectedTimestamp <= collectionLastModified) {
+        this._updateLastCheck(serverTimeMillis);
+        reportStatus = UptakeTelemetry.STATUS.UP_TO_DATE;
+        return;
+      }
+
+      // If there is a `signerName` and collection signing is enforced, add a
+      // hook for incoming changes that validates the signature.
+      if (this.signerName && gVerifySignature) {
+        collection.hooks["incoming-changes"] = [async (payload, collection) => {
+          await this._validateCollectionSignature(payload.changes,
+                                                  payload.lastModified,
+                                                  collection,
+                                                  { expectedTimestamp });
+          // In case the signature is valid, apply the changes locally.
+          return payload;
+        }];
+      }
+
+      // Fetch changes from server.
+      let syncResult;
+      try {
+        // Server changes have priority during synchronization.
+        const strategy = Kinto.syncStrategy.SERVER_WINS;
+        syncResult = await collection.sync({ remote: gServerURL, strategy, expectedTimestamp });
+        const { ok } = syncResult;
+        if (!ok) {
+          // Some synchronization conflicts occured.
+          reportStatus = UptakeTelemetry.STATUS.CONFLICT_ERROR;
+          throw new Error("Sync failed");
+        }
+      } catch (e) {
+        if (e.message.includes(INVALID_SIGNATURE)) {
+          // Signature verification failed during synchronzation.
+          reportStatus = UptakeTelemetry.STATUS.SIGNATURE_ERROR;
+          // if sync fails with a signature error, it's likely that our
+          // local data has been modified in some way.
+          // We will attempt to fix this by retrieving the whole
+          // remote collection.
+          const payload = await fetchRemoteRecords(collection.bucket, collection.name, expectedTimestamp);
+          try {
+            await this._validateCollectionSignature(payload.data,
+                                                    payload.last_modified,
+                                                    collection,
+                                                    { expectedTimestamp, ignoreLocal: true });
+          } catch (e) {
+            reportStatus = UptakeTelemetry.STATUS.SIGNATURE_RETRY_ERROR;
+            throw e;
+          }
+
+          // The signature is good (we haven't thrown).
+          // Now we will Inspect what we had locally.
+          const { data: oldData } = await collection.list({ order: "" }); // no need to sort.
+
+          // We build a sync result as if a diff-based sync was performed.
+          syncResult = { created: [], updated: [], deleted: [] };
+
+          // If the remote last_modified is newer than the local last_modified,
+          // replace the local data
+          const localLastModified = await collection.db.getLastModified();
+          if (payload.last_modified >= localLastModified) {
+            const { data: newData } = payload;
+            await collection.clear();
+            await collection.loadDump(newData);
+
+            // Compare local and remote to populate the sync result
+            const oldById = new Map(oldData.map(e => [e.id, e]));
+            for (const r of newData) {
+              const old = oldById.get(r.id);
+              if (old) {
+                if (old.last_modified != r.last_modified) {
+                  syncResult.updated.push({ old, new: r });
+                }
+                oldById.delete(r.id);
+              } else {
+                syncResult.created.push(r);
+              }
+            }
+            // Records that remain in our map now are those missing from remote
+            syncResult.deleted = Array.from(oldById.values());
+          }
+
+        } else {
+          // The sync has thrown, it can be related to metadata, network or a general error.
+          if (e.message == MISSING_SIGNATURE) {
+            // Collection metadata has no signature info, no need to retry.
+            reportStatus = UptakeTelemetry.STATUS.SIGNATURE_ERROR;
+          } else if (/NetworkError/.test(e.message)) {
+            reportStatus = UptakeTelemetry.STATUS.NETWORK_ERROR;
+          } else if (/Backoff/.test(e.message)) {
+            reportStatus = UptakeTelemetry.STATUS.BACKOFF;
+          } else {
+            reportStatus = UptakeTelemetry.STATUS.SYNC_ERROR;
+          }
+          throw e;
+        }
+      }
+
+      // Handle the obtained records (ie. apply locally through events).
+      // Build the event data list. It should be filtered (ie. by application target)
+      const { created: allCreated, updated: allUpdated, deleted: allDeleted } = syncResult;
+      const [created, deleted, updatedFiltered] = await Promise.all(
+          [allCreated, allDeleted, allUpdated.map(e => e.new)].map(this._filterEntries.bind(this))
+        );
+      // For updates, keep entries whose updated form matches the target.
+      const updatedFilteredIds = new Set(updatedFiltered.map(e => e.id));
+      const updated = allUpdated.filter(({ new: { id } }) => updatedFilteredIds.has(id));
+
+      // If every changed entry is filtered, we don't even fire the event.
+      if (created.length || updated.length || deleted.length) {
+        // Read local collection of records (also filtered).
+        const { data: allData } = await collection.list({ order: "" }); // no need to sort.
+        const current = await this._filterEntries(allData);
+        const payload = { data: { current, created, updated, deleted } };
+        try {
+          await this.emit("sync", payload);
+        } catch (e) {
+          reportStatus = UptakeTelemetry.STATUS.APPLY_ERROR;
+          throw e;
+        }
+      }
+
+      // Track last update.
+      this._updateLastCheck(serverTimeMillis);
+
+    } catch (e) {
+      // No specific error was tracked, mark it as unknown.
+      if (reportStatus === null) {
+        reportStatus = UptakeTelemetry.STATUS.UNKNOWN_ERROR;
+      }
+      throw e;
+    } finally {
+      // No error was reported, this is a success!
+      if (reportStatus === null) {
+        reportStatus = UptakeTelemetry.STATUS.SUCCESS;
+      }
+      // Report success/error status to Telemetry.
+      UptakeTelemetry.report(this.identifier, reportStatus);
+    }
+  }
+
+  /**
+   *
+   * @param {Array<Object>} remoteRecords
+   * @param {int} timestamp
+   * @param {Collection} collection
+   * @param {Object} options
+   * @returns {Promise}
+   */
+  async _validateCollectionSignature(remoteRecords, timestamp, kintoCollection, options = {}) {
+    const { expectedTimestamp, ignoreLocal = false } = options;
+    // this is a content-signature field from an autograph response.
+    const { name: collection, bucket } = kintoCollection;
+    const { signature, certChain } = await fetchCollectionSignature(bucket, collection, expectedTimestamp);
+
+    let localRecords = [];
+    if (!ignoreLocal) {
+      const { data } = await kintoCollection.list({ order: "" }); // no need to sort.
+      // Local fields are stripped to compute the collection signature (server does not have them).
+      localRecords = data.map(r => kintoCollection.cleanLocalFields(r));
+    }
+
+    const serialized = await RemoteSettingsWorker.canonicalStringify(localRecords,
+                                                                     remoteRecords,
+                                                                     timestamp);
+    const verifier = Cc["@mozilla.org/security/contentsignatureverifier;1"]
+      .createInstance(Ci.nsIContentSignatureVerifier);
+    if (!verifier.verifyContentSignature(serialized,
+                                         "p384ecdsa=" + signature,
+                                         certChain,
+                                         this.signerName)) {
+      throw new Error(INVALID_SIGNATURE + ` (${bucket}/${collection})`);
+    }
+  }
+
+  /**
+   * Save last time server was checked in users prefs.
+   *
+   * @param {int} serverTimeMillis   the current date return by server.
+   */
+  _updateLastCheck(serverTimeMillis) {
+    const checkedServerTimeInSeconds = Math.round(serverTimeMillis / 1000);
+    Services.prefs.setIntPref(this.lastCheckTimePref, checkedServerTimeInSeconds);
+  }
+
+  /**
+   *
+   * @param {Array<Objet>} data
+   */
+  async _filterEntries(data) {
+    // Filter entries for which calls to `this.filterFunc` returns null.
+    if (!this.filterFunc) {
+      return data;
+    }
+    const environment = cacheProxy(ClientEnvironment);
+    const dataPromises = data.map(e => this.filterFunc(e, environment));
+    const results = await Promise.all(dataPromises);
+    return results.filter(Boolean);
+  }
+}
--- a/services/settings/moz.build
+++ b/services/settings/moz.build
@@ -11,13 +11,14 @@ DIRS += [
 
 EXTRA_COMPONENTS += [
     'RemoteSettingsComponents.js',
     'servicesSettings.manifest',
 ]
 
 EXTRA_JS_MODULES['services-settings'] += [
     'remote-settings.js',
+    'RemoteSettingsClient.jsm',
     'RemoteSettingsWorker.js',
     'RemoteSettingsWorker.jsm',
 ]
 
 XPCSHELL_TESTS_MANIFESTS += ['test/unit/xpcshell.ini']
--- a/services/settings/remote-settings.js
+++ b/services/settings/remote-settings.js
@@ -9,95 +9,57 @@
 var EXPORTED_SYMBOLS = [
   "RemoteSettings",
   "jexlFilterFunc",
   "remoteSettingsBroadcastHandler",
 ];
 
 ChromeUtils.import("resource://gre/modules/Services.jsm");
 ChromeUtils.import("resource://gre/modules/XPCOMUtils.jsm");
-const { OS } = ChromeUtils.import("resource://gre/modules/osfile.jsm", {});
-XPCOMUtils.defineLazyGlobalGetters(this, ["fetch", "indexedDB"]);
 
-ChromeUtils.defineModuleGetter(this, "Kinto",
-                               "resource://services-common/kinto-offline-client.js");
-ChromeUtils.defineModuleGetter(this, "KintoHttpClient",
-                               "resource://services-common/kinto-http-client.js");
-ChromeUtils.defineModuleGetter(this, "CanonicalJSON",
-                               "resource://gre/modules/CanonicalJSON.jsm");
 ChromeUtils.defineModuleGetter(this, "UptakeTelemetry",
                                "resource://services-common/uptake-telemetry.js");
-ChromeUtils.defineModuleGetter(this, "ClientEnvironmentBase",
-                               "resource://gre/modules/components-utils/ClientEnvironment.jsm");
+ChromeUtils.defineModuleGetter(this, "pushBroadcastService",
+                               "resource://gre/modules/PushBroadcastService.jsm");
+ChromeUtils.defineModuleGetter(this, "RemoteSettingsClient",
+                               "resource://services-settings/RemoteSettingsClient.jsm");
 ChromeUtils.defineModuleGetter(this, "FilterExpressions",
                                "resource://gre/modules/components-utils/FilterExpressions.jsm");
-ChromeUtils.defineModuleGetter(this, "pushBroadcastService",
-                               "resource://gre/modules/PushBroadcastService.jsm");
-ChromeUtils.defineModuleGetter(this, "RemoteSettingsWorker",
-                               "resource://services-settings/RemoteSettingsWorker.jsm");
+
+XPCOMUtils.defineLazyGlobalGetters(this, ["fetch"]);
 
 const PREF_SETTINGS_DEFAULT_BUCKET     = "services.settings.default_bucket";
 const PREF_SETTINGS_BRANCH             = "services.settings.";
 const PREF_SETTINGS_SERVER             = "server";
 const PREF_SETTINGS_DEFAULT_SIGNER     = "default_signer";
-const PREF_SETTINGS_VERIFY_SIGNATURE   = "verify_signature";
 const PREF_SETTINGS_SERVER_BACKOFF     = "server.backoff";
 const PREF_SETTINGS_CHANGES_PATH       = "changes.path";
 const PREF_SETTINGS_LAST_UPDATE        = "last_update_seconds";
 const PREF_SETTINGS_LAST_ETAG          = "last_etag";
 const PREF_SETTINGS_CLOCK_SKEW_SECONDS = "clock_skew_seconds";
 const PREF_SETTINGS_LOAD_DUMP          = "load_dump";
 
-// IndexedDB name.
-const DB_NAME = "remote-settings";
 
 // Telemetry update source identifier.
 const TELEMETRY_HISTOGRAM_KEY = "settings-changes-monitoring";
-
-const INVALID_SIGNATURE = "Invalid content signature";
-const MISSING_SIGNATURE = "Missing signature";
+// Push broadcast id.
+const BROADCAST_ID = "remote-settings/monitor_changes";
 
 XPCOMUtils.defineLazyGetter(this, "gPrefs", () => {
   return Services.prefs.getBranch(PREF_SETTINGS_BRANCH);
 });
-XPCOMUtils.defineLazyPreferenceGetter(this, "gVerifySignature", PREF_SETTINGS_BRANCH + PREF_SETTINGS_VERIFY_SIGNATURE, true);
 XPCOMUtils.defineLazyPreferenceGetter(this, "gServerURL", PREF_SETTINGS_BRANCH + PREF_SETTINGS_SERVER);
 XPCOMUtils.defineLazyPreferenceGetter(this, "gChangesPath", PREF_SETTINGS_BRANCH + PREF_SETTINGS_CHANGES_PATH);
 
 /**
- * cacheProxy returns an object Proxy that will memoize properties of the target.
- */
-function cacheProxy(target) {
-  const cache = new Map();
-  return new Proxy(target, {
-    get(target, prop, receiver) {
-      if (!cache.has(prop)) {
-        cache.set(prop, target[prop]);
-      }
-      return cache.get(prop);
-    },
-  });
-}
-
-class ClientEnvironment extends ClientEnvironmentBase {
-  static get appID() {
-    // eg. Firefox is "{ec8030f7-c20a-464f-9b0e-13a3a9e97384}".
-    Services.appinfo.QueryInterface(Ci.nsIXULAppInfo);
-    return Services.appinfo.ID;
-  }
-
-  static get toolkitVersion() {
-    Services.appinfo.QueryInterface(Ci.nsIPlatformInfo);
-    return Services.appinfo.platformVersion;
-  }
-}
-
-/**
  * Default entry filtering function, in charge of excluding remote settings entries
  * where the JEXL expression evaluates into a falsy value.
+ * @param {Object}            entry       The Remote Settings entry to be excluded or kept.
+ * @param {ClientEnvironment} environment Information about version, language, platform etc.
+ * @returns {?Object} the entry or null if excluded.
  */
 async function jexlFilterFunc(entry, environment) {
   const { filter_expression } = entry;
   if (!filter_expression) {
     return entry;
   }
   let result;
   try {
@@ -106,31 +68,16 @@ async function jexlFilterFunc(entry, env
     };
     result = await FilterExpressions.eval(filter_expression, context);
   } catch (e) {
     Cu.reportError(e);
   }
   return result ? entry : null;
 }
 
-async function fetchCollectionMetadata(remote, collection, expectedTimestamp) {
-  const client = new KintoHttpClient(remote);
-  const { signature } = await client.bucket(collection.bucket)
-                                    .collection(collection.name)
-                                    .getData({ query: { _expected: expectedTimestamp }});
-  return signature;
-}
-
-async function fetchRemoteCollection(collection, expectedTimestamp) {
-  const client = new KintoHttpClient(gServerURL);
-  return client.bucket(collection.bucket)
-           .collection(collection.name)
-           .listRecords({ sort: "id", filters: { _expected: expectedTimestamp } });
-}
-
 /**
  * Fetch the list of remote collections and their timestamp.
  * @param {String} url               The poll URL (eg. `http://${server}{pollingEndpoint}`)
  * @param {String} lastEtag          (optional) The Etag of the latest poll to be matched
  *                                    by the server (eg. `"123456789"`).
  * @param {int}    expectedTimestamp The timestamp that the server is supposed to return.
  *                                   We obtained it from the Megaphone notification payload,
  *                                   and we use it only for cache busting (Bug 1497159).
@@ -196,372 +143,17 @@ async function fetchLatestChanges(url, l
   let backoffSeconds;
   if (response.headers.has("Backoff")) {
     const value = parseInt(response.headers.get("Backoff"), 10);
     if (!isNaN(value)) {
       backoffSeconds = value;
     }
   }
 
-  return {changes, currentEtag, serverTimeMillis, backoffSeconds};
-}
-
-
-class RemoteSettingsClient {
-
-  constructor(collectionName, { bucketNamePref, signerName, filterFunc = jexlFilterFunc, localFields = [], lastCheckTimePref }) {
-    this.collectionName = collectionName;
-    this.signerName = signerName;
-    this.filterFunc = filterFunc;
-    this.localFields = localFields;
-    this._lastCheckTimePref = lastCheckTimePref;
-
-    // The bucket preference value can be changed (eg. `main` to `main-preview`) in order
-    // to preview the changes to be approved in a real client.
-    this.bucketNamePref = bucketNamePref;
-    XPCOMUtils.defineLazyPreferenceGetter(this, "bucketName", this.bucketNamePref);
-
-    this._listeners = new Map();
-    this._listeners.set("sync", []);
-  }
-
-  get identifier() {
-    return `${this.bucketName}/${this.collectionName}`;
-  }
-
-  get lastCheckTimePref() {
-    return this._lastCheckTimePref || `services.settings.${this.bucketName}.${this.collectionName}.last_check`;
-  }
-
-  /**
-   * Event emitter: will execute the registered listeners in the order and
-   * sequentially.
-   *
-   * Note: we don't use `toolkit/modules/EventEmitter` because we want to throw
-   * an error when a listener fails to execute.
-   *
-   * @param {string} event    the event name
-   * @param {Object} payload  the event payload to call the listeners with
-   */
-  async emit(event, payload) {
-    const callbacks = this._listeners.get("sync");
-    let firstError;
-    for (const cb of callbacks) {
-      try {
-        await cb(payload);
-      } catch (e) {
-        firstError = e;
-      }
-    }
-    if (firstError) {
-      throw firstError;
-    }
-  }
-
-  on(event, callback) {
-    if (!this._listeners.has(event)) {
-      throw new Error(`Unknown event type ${event}`);
-    }
-    this._listeners.get(event).push(callback);
-  }
-
-  off(event, callback) {
-    if (!this._listeners.has(event)) {
-      throw new Error(`Unknown event type ${event}`);
-    }
-    const callbacks = this._listeners.get(event);
-    const i = callbacks.indexOf(callback);
-    if (i < 0) {
-      throw new Error(`Unknown callback`);
-    } else {
-      callbacks.splice(i, 1);
-    }
-  }
-
-  /**
-   * Open the underlying Kinto collection, using the appropriate adapter and
-   * options.
-   */
-  async openCollection() {
-    if (!this._kinto) {
-      this._kinto = new Kinto({
-        bucket: this.bucketName,
-        adapter: Kinto.adapters.IDB,
-        adapterOptions: { dbName: DB_NAME, migrateOldData: false },
-      });
-    }
-    const options = {
-      localFields: this.localFields,
-      bucket: this.bucketName,
-    };
-    return this._kinto.collection(this.collectionName, options);
-  }
-
-  /**
-   * Lists settings.
-   *
-   * @param  {Object} options         The options object.
-   * @param  {Object} options.filters Filter the results (default: `{}`).
-   * @param  {Object} options.order   The order to apply   (default: `-last_modified`).
-   * @return {Promise}
-   */
-  async get(options = {}) {
-    // In Bug 1451031, we will do some jexl filtering to limit the list items
-    // whose target is matched.
-    const { filters = {}, order = "" } = options; // not sorted by default.
-    const c = await this.openCollection();
-
-    const timestamp = await c.db.getLastModified();
-    // If the local database was never synchronized, then we attempt to load
-    // a packaged JSON dump.
-    if (timestamp == null) {
-      try {
-        await RemoteSettingsWorker.importJSONDump(this.bucketName, this.collectionName);
-      } catch (e) {
-        // Report but return an empty list since there will be no data anyway.
-        Cu.reportError(e);
-        return [];
-      }
-    }
-
-    const { data } = await c.list({ filters, order });
-    return this._filterEntries(data);
-  }
-
-  /**
-   * Synchronize from Kinto server, if necessary.
-   *
-   * @param {int}  expectedTimestamp       the lastModified date (on the server) for
-                                      the remote collection.
-   * @param {Date}   serverTime       the current date return by the server.
-   * @param {Object} options          additional advanced options.
-   * @param {bool}   options.loadDump load initial dump from disk on first sync (default: true)
-   * @return {Promise}                which rejects on sync or process failure.
-   */
-  async maybeSync(expectedTimestamp, serverTime, options = { loadDump: true }) {
-    const {loadDump} = options;
-
-    let reportStatus = null;
-    try {
-      const collection = await this.openCollection();
-      // Synchronize remote data into a local Sqlite DB.
-      let collectionLastModified = await collection.db.getLastModified();
-
-      // If there is no data currently in the collection, attempt to import
-      // initial data from the application defaults.
-      // This allows to avoid synchronizing the whole collection content on
-      // cold start.
-      if (!collectionLastModified && loadDump) {
-        try {
-          await RemoteSettingsWorker.importJSONDump(this.bucketName, this.collectionName);
-          collectionLastModified = await collection.db.getLastModified();
-        } catch (e) {
-          // Report but go-on.
-          Cu.reportError(e);
-        }
-      }
-
-      // If the data is up to date, there's no need to sync. We still need
-      // to record the fact that a check happened.
-      if (expectedTimestamp <= collectionLastModified) {
-        this._updateLastCheck(serverTime);
-        reportStatus = UptakeTelemetry.STATUS.UP_TO_DATE;
-        return;
-      }
-
-      // If there is a `signerName` and collection signing is enforced, add a
-      // hook for incoming changes that validates the signature.
-      if (this.signerName && gVerifySignature) {
-        collection.hooks["incoming-changes"] = [async (payload, collection) => {
-          await this._validateCollectionSignature(payload.changes,
-                                                  payload.lastModified,
-                                                  collection,
-                                                  { expectedTimestamp });
-          // In case the signature is valid, apply the changes locally.
-          return payload;
-        }];
-      }
-
-      // Fetch changes from server.
-      let syncResult;
-      try {
-        // Server changes have priority during synchronization.
-        const strategy = Kinto.syncStrategy.SERVER_WINS;
-        //
-        // XXX: https://github.com/Kinto/kinto.js/issues/859
-        //
-        syncResult = await collection.sync({ remote: gServerURL, strategy, expectedTimestamp });
-        const { ok } = syncResult;
-        if (!ok) {
-          // Some synchronization conflicts occured.
-          reportStatus = UptakeTelemetry.STATUS.CONFLICT_ERROR;
-          throw new Error("Sync failed");
-        }
-      } catch (e) {
-        if (e.message.includes(INVALID_SIGNATURE)) {
-          // Signature verification failed during synchronzation.
-          reportStatus = UptakeTelemetry.STATUS.SIGNATURE_ERROR;
-          // if sync fails with a signature error, it's likely that our
-          // local data has been modified in some way.
-          // We will attempt to fix this by retrieving the whole
-          // remote collection.
-          const payload = await fetchRemoteCollection(collection, expectedTimestamp);
-          try {
-            await this._validateCollectionSignature(payload.data,
-                                                    payload.last_modified,
-                                                    collection,
-                                                    { expectedTimestamp, ignoreLocal: true });
-          } catch (e) {
-            reportStatus = UptakeTelemetry.STATUS.SIGNATURE_RETRY_ERROR;
-            throw e;
-          }
-
-          // The signature is good (we haven't thrown).
-          // Now we will Inspect what we had locally.
-          const { data: oldData } = await collection.list({ order: "" }); // no need to sort.
-
-          // We build a sync result as if a diff-based sync was performed.
-          syncResult = { created: [], updated: [], deleted: [] };
-
-          // If the remote last_modified is newer than the local last_modified,
-          // replace the local data
-          const localLastModified = await collection.db.getLastModified();
-          if (payload.last_modified >= localLastModified) {
-            const { data: newData } = payload;
-            await collection.clear();
-            await collection.loadDump(newData);
-
-            // Compare local and remote to populate the sync result
-            const oldById = new Map(oldData.map(e => [e.id, e]));
-            for (const r of newData) {
-              const old = oldById.get(r.id);
-              if (old) {
-                if (old.last_modified != r.last_modified) {
-                  syncResult.updated.push({ old, new: r });
-                }
-                oldById.delete(r.id);
-              } else {
-                syncResult.created.push(r);
-              }
-            }
-            // Records that remain in our map now are those missing from remote
-            syncResult.deleted = Array.from(oldById.values());
-          }
-
-        } else {
-          // The sync has thrown, it can be related to metadata, network or a general error.
-          if (e.message == MISSING_SIGNATURE) {
-            // Collection metadata has no signature info, no need to retry.
-            reportStatus = UptakeTelemetry.STATUS.SIGNATURE_ERROR;
-          } else if (/NetworkError/.test(e.message)) {
-            reportStatus = UptakeTelemetry.STATUS.NETWORK_ERROR;
-          } else if (/Backoff/.test(e.message)) {
-            reportStatus = UptakeTelemetry.STATUS.BACKOFF;
-          } else {
-            reportStatus = UptakeTelemetry.STATUS.SYNC_ERROR;
-          }
-          throw e;
-        }
-      }
-
-      // Handle the obtained records (ie. apply locally through events).
-      // Build the event data list. It should be filtered (ie. by application target)
-      const { created: allCreated, updated: allUpdated, deleted: allDeleted } = syncResult;
-      const [created, deleted, updatedFiltered] = await Promise.all(
-          [allCreated, allDeleted, allUpdated.map(e => e.new)].map(this._filterEntries.bind(this))
-        );
-      // For updates, keep entries whose updated form is matches the target.
-      const updatedFilteredIds = new Set(updatedFiltered.map(e => e.id));
-      const updated = allUpdated.filter(({ new: { id } }) => updatedFilteredIds.has(id));
-
-      // If every changed entry is filtered, we don't even fire the event.
-      if (created.length || updated.length || deleted.length) {
-        // Read local collection of records (also filtered).
-        const { data: allData } = await collection.list({ order: "" }); // no need to sort.
-        const current = await this._filterEntries(allData);
-        const payload = { data: { current, created, updated, deleted } };
-        try {
-          await this.emit("sync", payload);
-        } catch (e) {
-          reportStatus = UptakeTelemetry.STATUS.APPLY_ERROR;
-          throw e;
-        }
-      }
-
-      // Track last update.
-      this._updateLastCheck(serverTime);
-
-    } catch (e) {
-      // No specific error was tracked, mark it as unknown.
-      if (reportStatus === null) {
-        reportStatus = UptakeTelemetry.STATUS.UNKNOWN_ERROR;
-      }
-      throw e;
-    } finally {
-      // No error was reported, this is a success!
-      if (reportStatus === null) {
-        reportStatus = UptakeTelemetry.STATUS.SUCCESS;
-      }
-      // Report success/error status to Telemetry.
-      UptakeTelemetry.report(this.identifier, reportStatus);
-    }
-  }
-
-  async _validateCollectionSignature(remoteRecords, timestamp, collection, options = {}) {
-    const { expectedTimestamp, ignoreLocal = false } = options;
-    // this is a content-signature field from an autograph response.
-    const signaturePayload = await fetchCollectionMetadata(gServerURL, collection, expectedTimestamp);
-    if (!signaturePayload) {
-      throw new Error(MISSING_SIGNATURE);
-    }
-    const {x5u, signature} = signaturePayload;
-    const certChainResponse = await fetch(x5u);
-    const certChain = await certChainResponse.text();
-
-    const verifier = Cc["@mozilla.org/security/contentsignatureverifier;1"]
-                       .createInstance(Ci.nsIContentSignatureVerifier);
-
-    let localRecords = [];
-    if (!ignoreLocal) {
-      const { data } = await collection.list({ order: "" }); // no need to sort.
-      // Local fields are stripped to compute the collection signature (server does not have them).
-      localRecords = data.map(r => collection.cleanLocalFields(r));
-    }
-
-    const serialized = await RemoteSettingsWorker.canonicalStringify(localRecords,
-                                                                     remoteRecords,
-                                                                     timestamp);
-    if (!verifier.verifyContentSignature(serialized,
-                                         "p384ecdsa=" + signature,
-                                         certChain,
-                                         this.signerName)) {
-      throw new Error(INVALID_SIGNATURE + ` (${collection.bucket}/${collection.name})`);
-    }
-  }
-
-  /**
-   * Save last time server was checked in users prefs.
-   *
-   * @param {Date} serverTime   the current date return by server.
-   */
-  _updateLastCheck(serverTime) {
-    const checkedServerTimeInSeconds = Math.round(serverTime / 1000);
-    Services.prefs.setIntPref(this.lastCheckTimePref, checkedServerTimeInSeconds);
-  }
-
-  async _filterEntries(data) {
-    // Filter entries for which calls to `this.filterFunc` returns null.
-    if (!this.filterFunc) {
-      return data;
-    }
-    const environment = cacheProxy(ClientEnvironment);
-    const dataPromises = data.map(e => this.filterFunc(e, environment));
-    const results = await Promise.all(dataPromises);
-    return results.filter(v => !!v);
-  }
+  return { changes, currentEtag, serverTimeMillis, backoffSeconds };
 }
 
 /**
  * Check if local data exist for the specified client.
  *
  * @param {RemoteSettingsClient} client
  * @return {bool} Whether it exists or not.
  */
@@ -591,16 +183,17 @@ async function hasLocalDump(bucket, coll
 function remoteSettingsFunction() {
   const _clients = new Map();
 
   // If not explicitly specified, use the default signer.
   const defaultSigner = gPrefs.getCharPref(PREF_SETTINGS_DEFAULT_SIGNER);
   const defaultOptions = {
     bucketNamePref: PREF_SETTINGS_DEFAULT_BUCKET,
     signerName: defaultSigner,
+    filterFunc: jexlFilterFunc,
   };
 
   /**
    * RemoteSettings constructor.
    *
    * @param {String} collectionName The remote settings identifier
    * @param {Object} options Advanced options
    * @returns {RemoteSettingsClient} An instance of a Remote Settings client.
@@ -795,28 +388,27 @@ function remoteSettingsFunction() {
     };
   };
 
   /**
    * Startup function called from nsBrowserGlue.
    */
   remoteSettings.init = () => {
     // Hook the Push broadcast and RemoteSettings polling.
-    const broadcastID = "remote-settings/monitor_changes";
     // When we start on a new profile there will be no ETag stored.
     // Use an arbitrary ETag that is guaranteed not to occur.
     // This will trigger a broadcast message but that's fine because we
     // will check the changes on each collection and retrieve only the
     // changes (e.g. nothing if we have a dump with the same data).
     const currentVersion = gPrefs.getStringPref(PREF_SETTINGS_LAST_ETAG, "\"0\"");
     const moduleInfo = {
       moduleURI: __URI__,
       symbolName: "remoteSettingsBroadcastHandler",
     };
-    pushBroadcastService.addListener(broadcastID, currentVersion, moduleInfo);
+    pushBroadcastService.addListener(BROADCAST_ID, currentVersion, moduleInfo);
   };
 
   return remoteSettings;
 }
 
 var RemoteSettings = remoteSettingsFunction();
 
 var remoteSettingsBroadcastHandler = {