Bug 1289932 - Send/Handle push messages for send tab to device. r=markh
authorEdouard Oger <eoger@fastmail.com>
Tue, 02 Aug 2016 10:09:30 -0700
changeset 309276 5acd4f7ea715daeef7ac6abf88e805c5c5883be7
parent 309275 e4437c4da937e08593dc896ce7cf39dc48e1e2ad
child 309277 51600f3f15c23697b5062752a83f5addc4c0e7e0
push id30560
push userkwierso@gmail.com
push dateMon, 15 Aug 2016 21:07:46 +0000
treeherdermozilla-central@7b74ee1d97db [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersmarkh
bugs1289932
milestone51.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1289932 - Send/Handle push messages for send tab to device. r=markh MozReview-Commit-ID: WD4XzRtl86
services/fxaccounts/FxAccounts.jsm
services/fxaccounts/FxAccountsClient.jsm
services/fxaccounts/FxAccountsCommon.js
services/fxaccounts/FxAccountsPush.js
services/sync/modules/constants.js
services/sync/modules/engines.js
services/sync/modules/engines/clients.js
services/sync/modules/service.js
services/sync/tests/unit/test_clients_engine.js
--- a/services/fxaccounts/FxAccounts.jsm
+++ b/services/fxaccounts/FxAccounts.jsm
@@ -45,16 +45,17 @@ var publicProperties = [
   "getKeys",
   "getOAuthToken",
   "getSignedInUser",
   "getSignedInUserProfile",
   "handleDeviceDisconnection",
   "invalidateCertificate",
   "loadAndPoll",
   "localtimeOffsetMsec",
+  "notifyDevices",
   "now",
   "promiseAccountsChangeProfileURI",
   "promiseAccountsForceSigninURI",
   "promiseAccountsManageURI",
   "removeCachedOAuthToken",
   "resendVerificationEmail",
   "resetCredentials",
   "sessionStatus",
@@ -396,16 +397,39 @@ FxAccountsInternal.prototype = {
   // A hook-point for tests who may want a mocked AccountState or mocked storage.
   newAccountState(credentials) {
     let storage = new FxAccountsStorageManager();
     storage.initialize(credentials);
     return new AccountState(storage);
   },
 
   /**
+   * Send a message to a set of devices in the same account
+   *
+   * @return Promise
+   */
+  notifyDevices: function(deviceIds, payload, TTL) {
+    if (!Array.isArray(deviceIds)) {
+      deviceIds = [deviceIds];
+    }
+    return this.currentAccountState.getUserAccountData()
+      .then(data => {
+        if (!data) {
+          throw this._error(ERROR_NO_ACCOUNT);
+        }
+        if (!data.sessionToken) {
+          throw this._error(ERROR_AUTH_ERROR,
+            "notifyDevices called without a session token");
+        }
+        return this.fxAccountsClient.notifyDevices(data.sessionToken, deviceIds,
+          payload, TTL);
+    });
+  },
+
+  /**
    * Return the current time in milliseconds as an integer.  Allows tests to
    * manipulate the date to simulate certificate expiration.
    */
   now: function() {
     return this.fxAccountsClient.now();
   },
 
   getAccountsClient: function() {
--- a/services/fxaccounts/FxAccountsClient.jsm
+++ b/services/fxaccounts/FxAccountsClient.jsm
@@ -414,16 +414,41 @@ this.FxAccountsClient.prototype = {
       body.pushPublicKey = options.pushPublicKey;
       body.pushAuthKey = options.pushAuthKey;
     }
 
     return this._request(path, "POST", creds, body);
   },
 
   /**
+   * Sends a message to other devices. Must conform with the push payload schema:
+   * https://github.com/mozilla/fxa-auth-server/blob/master/docs/pushpayloads.schema.json
+   *
+   * @method notifyDevice
+   * @param  sessionTokenHex
+   *         Session token obtained from signIn
+   * @param  deviceIds
+   *         Devices to send the message to
+   * @param  payload
+   *         Data to send with the message
+   * @return Promise
+   *         Resolves to an empty object:
+   *         {}
+   */
+  notifyDevices(sessionTokenHex, deviceIds, payload, TTL = 0) {
+    const body = {
+      to: deviceIds,
+      payload,
+      TTL
+    };
+    return this._request("/account/devices/notify", "POST",
+      deriveHawkCredentials(sessionTokenHex, "sessionToken"), body);
+  },
+
+  /**
    * Update the session or name for an existing device
    *
    * @method updateDevice
    * @param  sessionTokenHex
    *         Session token obtained from signIn
    * @param  id
    *         Device identifier
    * @param  name
--- a/services/fxaccounts/FxAccountsCommon.js
+++ b/services/fxaccounts/FxAccountsCommon.js
@@ -87,16 +87,17 @@ exports.POLL_SESSION       = 1000 * 60 *
 exports.ONLOGIN_NOTIFICATION = "fxaccounts:onlogin";
 exports.ONVERIFIED_NOTIFICATION = "fxaccounts:onverified";
 exports.ONLOGOUT_NOTIFICATION = "fxaccounts:onlogout";
 // Internal to services/fxaccounts only
 exports.ON_FXA_UPDATE_NOTIFICATION = "fxaccounts:update";
 exports.ON_DEVICE_DISCONNECTED_NOTIFICATION = "fxaccounts:device_disconnected";
 exports.ON_PASSWORD_CHANGED_NOTIFICATION = "fxaccounts:password_changed";
 exports.ON_PASSWORD_RESET_NOTIFICATION = "fxaccounts:password_reset";
+exports.ON_COLLECTION_CHANGED_NOTIFICATION = "sync:collection_changed";
 
 exports.FXA_PUSH_SCOPE_ACCOUNT_UPDATE = "chrome://fxa-device-update";
 
 exports.ON_PROFILE_CHANGE_NOTIFICATION = "fxaccounts:profilechange";
 exports.ON_ACCOUNT_STATE_CHANGE_NOTIFICATION = "fxaccounts:statechange";
 
 // UI Requests.
 exports.UI_REQUEST_SIGN_IN_FLOW = "signInFlow";
--- a/services/fxaccounts/FxAccountsPush.js
+++ b/services/fxaccounts/FxAccountsPush.js
@@ -164,16 +164,18 @@ FxAccountsPushService.prototype = {
     switch (payload.command) {
       case ON_DEVICE_DISCONNECTED_NOTIFICATION:
         return this.fxAccounts.handleDeviceDisconnection(payload.data.id);
         break;
       case ON_PASSWORD_CHANGED_NOTIFICATION:
       case ON_PASSWORD_RESET_NOTIFICATION:
         return this._onPasswordChanged();
         break;
+      case ON_COLLECTION_CHANGED_NOTIFICATION:
+        Services.obs.notifyObservers(null, ON_COLLECTION_CHANGED_NOTIFICATION, payload.data.collections);
       default:
         this.log.warn("FxA Push command unrecognized: " + payload.command);
     }
   },
   /**
    * Check the FxA session status after a password change/reset event.
    * If the session is invalid, reset credentials and notify listeners of
    * ON_ACCOUNT_STATE_CHANGE_NOTIFICATION that the account may have changed
--- a/services/sync/modules/constants.js
+++ b/services/sync/modules/constants.js
@@ -40,17 +40,17 @@ SYNC_KEY_HYPHENATED_LENGTH:            3
 
 NO_SYNC_NODE_INTERVAL:                 10 * 60 * 1000, // 10 minutes
 
 MAX_ERROR_COUNT_BEFORE_BACKOFF:        3,
 MAX_IGNORE_ERROR_COUNT:                5,
 
 // Backoff intervals
 MINIMUM_BACKOFF_INTERVAL:              15 * 60 * 1000,      // 15 minutes
-MAXIMUM_BACKOFF_INTERVAL:              8 * 60 * 60 * 1000,  // 8 hours 
+MAXIMUM_BACKOFF_INTERVAL:              8 * 60 * 60 * 1000,  // 8 hours
 
 // HMAC event handling timeout.
 // 10 minutes: a compromise between the multi-desktop sync interval
 // and the mobile sync interval.
 HMAC_EVENT_INTERVAL:                   600000,
 
 // How long to wait between sync attempts if the Master Password is locked.
 MASTER_PASSWORD_LOCKED_RETRY_INTERVAL: 15 * 60 * 1000,   // 15 minutes
@@ -96,16 +96,19 @@ IDLE_OBSERVER_BACK_DELAY:              1
 
 // Max number of records or bytes to upload in a single POST - we'll do multiple POSTS if either
 // MAX_UPLOAD_RECORDS or MAX_UPLOAD_BYTES is hit)
 MAX_UPLOAD_RECORDS:                    100,
 MAX_UPLOAD_BYTES:                      1024 * 1023, // just under 1MB
 MAX_HISTORY_UPLOAD:                    5000,
 MAX_HISTORY_DOWNLOAD:                  5000,
 
+// TTL of the message sent to another device when sending a tab
+NOTIFY_TAB_SENT_TTL_SECS:              1 * 3600, // 1 hour
+
 // Top-level statuses:
 STATUS_OK:                             "success.status_ok",
 SYNC_FAILED:                           "error.sync.failed",
 LOGIN_FAILED:                          "error.login.failed",
 SYNC_FAILED_PARTIAL:                   "error.sync.failed_partial",
 CLIENT_NOT_CONFIGURED:                 "service.client_not_configured",
 STATUS_DISABLED:                       "service.disabled",
 MASTER_PASSWORD_LOCKED:                "service.master_password_locked",
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -16,16 +16,19 @@ Cu.import("resource://services-common/as
 Cu.import("resource://gre/modules/Log.jsm");
 Cu.import("resource://services-common/observers.js");
 Cu.import("resource://services-sync/constants.js");
 Cu.import("resource://services-sync/identity.js");
 Cu.import("resource://services-sync/record.js");
 Cu.import("resource://services-sync/resource.js");
 Cu.import("resource://services-sync/util.js");
 
+XPCOMUtils.defineLazyModuleGetter(this, "fxAccounts",
+  "resource://gre/modules/FxAccounts.jsm");
+
 /*
  * Trackers are associated with a single engine and deal with
  * listening for changes to their particular data type.
  *
  * There are two things they keep track of:
  * 1) A score, indicating how urgently the engine wants to sync
  * 2) A list of IDs for all the changed items that need to be synced
  * and updating their 'score', indicating how urgently they
@@ -1472,20 +1475,22 @@ SyncEngine.prototype = {
         let failed_ids = Object.keys(resp.obj.failed);
         counts.failed += failed_ids.length;
         if (failed_ids.length)
           this._log.debug("Records that will be uploaded again because "
                           + "the server couldn't store them: "
                           + failed_ids.join(", "));
 
         // Clear successfully uploaded objects.
-        for (let key in resp.obj.success) {
-          let id = resp.obj.success[key];
+        const succeeded_ids = Object.values(resp.obj.success);
+        for (let id of succeeded_ids) {
           delete this._modified[id];
         }
+
+        this._onRecordsWritten(succeeded_ids, failed_ids);
       }
 
       let postQueue = up.newPostQueue(this._log, handleResponse);
 
       for (let id of modifiedIDs) {
         let out;
         let ok = false;
         try {
@@ -1506,16 +1511,21 @@ SyncEngine.prototype = {
         }
         this._store._sleep(0);
       }
       postQueue.flush();
       Observers.notify("weave:engine:sync:uploaded", counts, this.name);
     }
   },
 
+  _onRecordsWritten(succeeded, failed) {
+    // Implement this method to take specific actions against successfully
+    // uploaded records and failed records.
+  },
+
   // Any cleanup necessary.
   // Save the current snapshot so as to calculate changes at next sync
   _syncFinish: function () {
     this._log.trace("Finishing up sync");
     this._tracker.resetScore();
 
     let doDelete = Utils.bind2(this, function(key, val) {
       let coll = new Collection(this.engineURL, this._recordObj, this.service);
--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -43,17 +43,18 @@ ClientsRec.prototype = {
   _logName: "Sync.Record.Clients",
   ttl: CLIENTS_TTL
 };
 
 Utils.deferGetSet(ClientsRec,
                   "cleartext",
                   ["name", "type", "commands",
                    "version", "protocols",
-                   "formfactor", "os", "appPackage", "application", "device"]);
+                   "formfactor", "os", "appPackage", "application", "device",
+                   "fxaDeviceId"]);
 
 
 this.ClientEngine = function ClientEngine(service) {
   SyncEngine.call(this, "Clients", service);
 
   // Reset the client on every startup so that we fetch recent clients
   this._resetClient();
 }
@@ -171,16 +172,23 @@ ClientEngine.prototype = {
   getClientName(id) {
     if (id == this.localID) {
       return this.localName;
     }
     let client = this._store._remoteClients[id];
     return client ? client.name : "";
   },
 
+  getClientFxaDeviceId(id) {
+    if (this._store._remoteClients[id]) {
+      return this._store._remoteClients[id].fxaDeviceId;
+    }
+    return null;
+  },
+
   isMobile: function isMobile(id) {
     if (this._store._remoteClients[id])
       return this._store._remoteClients[id].type == DEVICE_TYPE_MOBILE;
     return false;
   },
 
   _syncStartup: function _syncStartup() {
     // Reupload new client record periodically.
@@ -232,16 +240,41 @@ ClientEngine.prototype = {
     }
   },
 
   _uploadOutgoing() {
     this._clearedCommands = null;
     SyncEngine.prototype._uploadOutgoing.call(this);
   },
 
+  _onRecordsWritten(succeeded, failed) {
+    // Notify other devices that their own client collection changed
+    const idsToNotify = succeeded.reduce((acc, id) => {
+      if (id == this.localID) {
+        return acc;
+      }
+      const fxaDeviceId = this.getClientFxaDeviceId(id);
+      return fxaDeviceId ? acc.concat(fxaDeviceId) : acc;
+    }, []);
+    if (idsToNotify.length > 0) {
+      this._notifyCollectionChanged(idsToNotify);
+    }
+  },
+
+  _notifyCollectionChanged(ids) {
+    const message = {
+      version: 1,
+      command: "sync:collection_changed",
+      data: {
+        collections: ["clients"]
+      }
+    };
+    fxAccounts.notifyDevices(ids, message, NOTIFY_TAB_SENT_TTL_SECS);
+  },
+
   _syncFinish() {
     // Record histograms for our device types, and also write them to a pref
     // so non-histogram telemetry (eg, UITelemetry) has easy access to them.
     for (let [deviceType, count] of this.deviceTypes) {
       let hid;
       let prefName = this.name + ".devices.";
       switch (deviceType) {
         case "desktop":
--- a/services/sync/modules/service.js
+++ b/services/sync/modules/service.js
@@ -354,16 +354,17 @@ Sync11Service.prototype = {
 
     if (!this._checkCrypto()) {
       this.enabled = false;
       this._log.info("Could not load the Weave crypto component. Disabling " +
                       "Weave, since it will not work correctly.");
     }
 
     Svc.Obs.add("weave:service:setup-complete", this);
+    Svc.Obs.add("sync:collection_changed", this); // Pulled from FxAccountsCommon
     Svc.Prefs.observe("engine.", this);
 
     this.scheduler = new SyncScheduler(this);
 
     if (!this.enabled) {
       this._log.info("Firefox Sync disabled.");
     }
 
@@ -480,16 +481,23 @@ Sync11Service.prototype = {
 
   QueryInterface: XPCOMUtils.generateQI([Ci.nsIObserver,
                                          Ci.nsISupportsWeakReference]),
 
   // nsIObserver
 
   observe: function observe(subject, topic, data) {
     switch (topic) {
+      // Ideally this observer should be in the SyncScheduler, but it would require
+      // some work to know about the sync specific engines. We should move this there once it does.
+      case "sync:collection_changed":
+        if (data.includes("clients")) {
+          this.sync([]); // [] = clients collection only
+        }
+        break;
       case "weave:service:setup-complete":
         let status = this._checkSetup();
         if (status != STATUS_DISABLED && status != CLIENT_NOT_CONFIGURED)
             Svc.Obs.notify("weave:engine:start-tracking");
         break;
       case "nsPref:changed":
         if (this._ignorePrefObserver)
           return;
--- a/services/sync/tests/unit/test_clients_engine.js
+++ b/services/sync/tests/unit/test_clients_engine.js
@@ -26,20 +26,20 @@ function check_record_version(user, id) 
     rec.collection = "clients";
     rec.ciphertext = payload.ciphertext;
     rec.hmac = payload.hmac;
     rec.IV = payload.IV;
 
     let cleartext = rec.decrypt(Service.collectionKeys.keyForCollection("clients"));
 
     _("Payload is " + JSON.stringify(cleartext));
-    do_check_eq(Services.appinfo.version, cleartext.version);
-    do_check_eq(2, cleartext.protocols.length);
-    do_check_eq("1.1", cleartext.protocols[0]);
-    do_check_eq("1.5", cleartext.protocols[1]);
+    equal(Services.appinfo.version, cleartext.version);
+    equal(2, cleartext.protocols.length);
+    equal("1.1", cleartext.protocols[0]);
+    equal("1.5", cleartext.protocols[1]);
 }
 
 add_test(function test_bad_hmac() {
   _("Ensure that Clients engine deletes corrupt records.");
   let contents = {
     meta: {global: {engines: {clients: {version: engine.version,
                                         syncID: engine.syncID}}}},
     clients: {},
@@ -59,62 +59,62 @@ add_test(function test_bad_hmac() {
   let server = serverForUsers({"foo": "password"}, contents, callback);
   let user   = server.user("foo");
 
   function check_clients_count(expectedCount) {
     let stack = Components.stack.caller;
     let coll  = user.collection("clients");
 
     // Treat a non-existent collection as empty.
-    do_check_eq(expectedCount, coll ? coll.count() : 0, stack);
+    equal(expectedCount, coll ? coll.count() : 0, stack);
   }
 
   function check_client_deleted(id) {
     let coll = user.collection("clients");
     let wbo  = coll.wbo(id);
     return !wbo || !wbo.payload;
   }
 
   function uploadNewKeys() {
     generateNewKeys(Service.collectionKeys);
     let serverKeys = Service.collectionKeys.asWBO("crypto", "keys");
     serverKeys.encrypt(Service.identity.syncKeyBundle);
-    do_check_true(serverKeys.upload(Service.resource(Service.cryptoKeysURL)).success);
+    ok(serverKeys.upload(Service.resource(Service.cryptoKeysURL)).success);
   }
 
   try {
     ensureLegacyIdentityManager();
     let passphrase     = "abcdeabcdeabcdeabcdeabcdea";
     Service.serverURL  = server.baseURI;
     Service.login("foo", "ilovejane", passphrase);
 
     generateNewKeys(Service.collectionKeys);
 
     _("First sync, client record is uploaded");
-    do_check_eq(engine.lastRecordUpload, 0);
+    equal(engine.lastRecordUpload, 0);
     check_clients_count(0);
     engine._sync();
     check_clients_count(1);
-    do_check_true(engine.lastRecordUpload > 0);
+    ok(engine.lastRecordUpload > 0);
 
     // Our uploaded record has a version.
     check_record_version(user, engine.localID);
 
     // Initial setup can wipe the server, so clean up.
     deletedCollections = [];
     deletedItems       = [];
 
     _("Change our keys and our client ID, reupload keys.");
     let oldLocalID  = engine.localID;     // Preserve to test for deletion!
     engine.localID = Utils.makeGUID();
     engine.resetClient();
     generateNewKeys(Service.collectionKeys);
     let serverKeys = Service.collectionKeys.asWBO("crypto", "keys");
     serverKeys.encrypt(Service.identity.syncKeyBundle);
-    do_check_true(serverKeys.upload(Service.resource(Service.cryptoKeysURL)).success);
+    ok(serverKeys.upload(Service.resource(Service.cryptoKeysURL)).success);
 
     _("Sync.");
     engine._sync();
 
     _("Old record " + oldLocalID + " was deleted, new one uploaded.");
     check_clients_count(1);
     check_client_deleted(oldLocalID);
 
@@ -125,18 +125,18 @@ add_test(function test_bad_hmac() {
     engine.resetClient();
     generateNewKeys(Service.collectionKeys);
     deletedCollections = [];
     deletedItems       = [];
     check_clients_count(1);
     engine._sync();
 
     _("Old record was not deleted, new one uploaded.");
-    do_check_eq(deletedCollections.length, 0);
-    do_check_eq(deletedItems.length, 0);
+    equal(deletedCollections.length, 0);
+    equal(deletedItems.length, 0);
     check_clients_count(2);
 
     _("Now try the scenario where our keys are wrong *and* there's a bad record.");
     // Clean up and start fresh.
     user.collection("clients")._wbos = {};
     Service.lastHMACEvent = 0;
     engine.localID = Utils.makeGUID();
     engine.resetClient();
@@ -157,41 +157,41 @@ add_test(function test_bad_hmac() {
     // as the object on the server. We'll download the new keys and also delete
     // the bad client record.
     oldLocalID  = engine.localID;         // Preserve to test for deletion!
     engine.localID = Utils.makeGUID();
     engine.resetClient();
     generateNewKeys(Service.collectionKeys);
     let oldKey = Service.collectionKeys.keyForCollection();
 
-    do_check_eq(deletedCollections.length, 0);
-    do_check_eq(deletedItems.length, 0);
+    equal(deletedCollections.length, 0);
+    equal(deletedItems.length, 0);
     engine._sync();
-    do_check_eq(deletedItems.length, 1);
+    equal(deletedItems.length, 1);
     check_client_deleted(oldLocalID);
     check_clients_count(1);
     let newKey = Service.collectionKeys.keyForCollection();
-    do_check_false(oldKey.equals(newKey));
+    ok(!oldKey.equals(newKey));
 
   } finally {
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
     server.stop(run_next_test);
   }
 });
 
 add_test(function test_properties() {
   _("Test lastRecordUpload property");
   try {
-    do_check_eq(Svc.Prefs.get("clients.lastRecordUpload"), undefined);
-    do_check_eq(engine.lastRecordUpload, 0);
+    equal(Svc.Prefs.get("clients.lastRecordUpload"), undefined);
+    equal(engine.lastRecordUpload, 0);
 
     let now = Date.now();
     engine.lastRecordUpload = now / 1000;
-    do_check_eq(engine.lastRecordUpload, Math.floor(now / 1000));
+    equal(engine.lastRecordUpload, Math.floor(now / 1000));
   } finally {
     Svc.Prefs.resetBranch("");
     run_next_test();
   }
 });
 
 add_test(function test_full_sync() {
   _("Ensure that Clients engine fetches all records for each sync.");
@@ -283,40 +283,40 @@ add_test(function test_sync() {
 
   function clientWBO() {
     return user.collection("clients").wbo(engine.localID);
   }
 
   try {
 
     _("First sync. Client record is uploaded.");
-    do_check_eq(clientWBO(), undefined);
-    do_check_eq(engine.lastRecordUpload, 0);
+    equal(clientWBO(), undefined);
+    equal(engine.lastRecordUpload, 0);
     engine._sync();
-    do_check_true(!!clientWBO().payload);
-    do_check_true(engine.lastRecordUpload > 0);
+    ok(!!clientWBO().payload);
+    ok(engine.lastRecordUpload > 0);
 
     _("Let's time travel more than a week back, new record should've been uploaded.");
     engine.lastRecordUpload -= MORE_THAN_CLIENTS_TTL_REFRESH;
     let lastweek = engine.lastRecordUpload;
     clientWBO().payload = undefined;
     engine._sync();
-    do_check_true(!!clientWBO().payload);
-    do_check_true(engine.lastRecordUpload > lastweek);
+    ok(!!clientWBO().payload);
+    ok(engine.lastRecordUpload > lastweek);
 
     _("Remove client record.");
     engine.removeClientData();
-    do_check_eq(clientWBO().payload, undefined);
+    equal(clientWBO().payload, undefined);
 
     _("Time travel one day back, no record uploaded.");
     engine.lastRecordUpload -= LESS_THAN_CLIENTS_TTL_REFRESH;
     let yesterday = engine.lastRecordUpload;
     engine._sync();
-    do_check_eq(clientWBO().payload, undefined);
-    do_check_eq(engine.lastRecordUpload, yesterday);
+    equal(clientWBO().payload, undefined);
+    equal(engine.lastRecordUpload, yesterday);
 
   } finally {
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
     server.stop(run_next_test);
   }
 });
 
@@ -331,26 +331,26 @@ add_test(function test_client_name_chang
   Svc.Obs.notify("weave:engine:start-tracking");
   _("initial name: " + initialName);
 
   // Tracker already has data, so clear it.
   tracker.clearChangedIDs();
 
   let initialScore = tracker.score;
 
-  do_check_eq(Object.keys(tracker.changedIDs).length, 0);
+  equal(Object.keys(tracker.changedIDs).length, 0);
 
   Svc.Prefs.set("client.name", "new name");
 
   _("new name: " + engine.localName);
-  do_check_neq(initialName, engine.localName);
-  do_check_eq(Object.keys(tracker.changedIDs).length, 1);
-  do_check_true(engine.localID in tracker.changedIDs);
-  do_check_true(tracker.score > initialScore);
-  do_check_true(tracker.score >= SCORE_INCREMENT_XLARGE);
+  notEqual(initialName, engine.localName);
+  equal(Object.keys(tracker.changedIDs).length, 1);
+  ok(engine.localID in tracker.changedIDs);
+  ok(tracker.score > initialScore);
+  ok(tracker.score >= SCORE_INCREMENT_XLARGE);
 
   Svc.Obs.notify("weave:engine:stop-tracking");
 
   run_next_test();
 });
 
 add_test(function test_send_command() {
   _("Verifies _sendCommandToClient puts commands in the outbound queue.");
@@ -364,25 +364,25 @@ add_test(function test_send_command() {
   let remoteRecord = store.createRecord(remoteId, "clients");
 
   let action = "testCommand";
   let args = ["foo", "bar"];
 
   engine._sendCommandToClient(action, args, remoteId);
 
   let newRecord = store._remoteClients[remoteId];
-  do_check_neq(newRecord, undefined);
-  do_check_eq(newRecord.commands.length, 1);
+  notEqual(newRecord, undefined);
+  equal(newRecord.commands.length, 1);
 
   let command = newRecord.commands[0];
-  do_check_eq(command.command, action);
-  do_check_eq(command.args.length, 2);
-  do_check_eq(command.args, args);
+  equal(command.command, action);
+  equal(command.args.length, 2);
+  equal(command.args, args);
 
-  do_check_neq(tracker.changedIDs[remoteId], undefined);
+  notEqual(tracker.changedIDs[remoteId], undefined);
 
   run_next_test();
 });
 
 add_test(function test_command_validation() {
   _("Verifies that command validation works properly.");
 
   let store = engine._store;
@@ -406,34 +406,34 @@ add_test(function test_command_validatio
     let rec = new ClientsRec("clients", remoteId);
 
     store.create(rec);
     store.createRecord(remoteId, "clients");
 
     engine.sendCommand(action, args, remoteId);
 
     let newRecord = store._remoteClients[remoteId];
-    do_check_neq(newRecord, undefined);
+    notEqual(newRecord, undefined);
 
     if (expectedResult) {
       _("Ensuring command is sent: " + action);
-      do_check_eq(newRecord.commands.length, 1);
+      equal(newRecord.commands.length, 1);
 
       let command = newRecord.commands[0];
-      do_check_eq(command.command, action);
-      do_check_eq(command.args, args);
+      equal(command.command, action);
+      equal(command.args, args);
 
-      do_check_neq(engine._tracker, undefined);
-      do_check_neq(engine._tracker.changedIDs[remoteId], undefined);
+      notEqual(engine._tracker, undefined);
+      notEqual(engine._tracker.changedIDs[remoteId], undefined);
     } else {
       _("Ensuring command is scrubbed: " + action);
-      do_check_eq(newRecord.commands, undefined);
+      equal(newRecord.commands, undefined);
 
       if (store._tracker) {
-        do_check_eq(engine._tracker[remoteId], undefined);
+        equal(engine._tracker[remoteId], undefined);
       }
     }
 
   }
   run_next_test();
 });
 
 add_test(function test_command_duplication() {
@@ -447,46 +447,46 @@ add_test(function test_command_duplicati
 
   let action = "resetAll";
   let args = [];
 
   engine.sendCommand(action, args, remoteId);
   engine.sendCommand(action, args, remoteId);
 
   let newRecord = store._remoteClients[remoteId];
-  do_check_eq(newRecord.commands.length, 1);
+  equal(newRecord.commands.length, 1);
 
   _("Check variant args length");
   newRecord.commands = [];
 
   action = "resetEngine";
   engine.sendCommand(action, [{ x: "foo" }], remoteId);
   engine.sendCommand(action, [{ x: "bar" }], remoteId);
 
   _("Make sure we spot a real dupe argument.");
   engine.sendCommand(action, [{ x: "bar" }], remoteId);
 
-  do_check_eq(newRecord.commands.length, 2);
+  equal(newRecord.commands.length, 2);
 
   run_next_test();
 });
 
 add_test(function test_command_invalid_client() {
   _("Ensures invalid client IDs are caught");
 
   let id = Utils.makeGUID();
   let error;
 
   try {
     engine.sendCommand("wipeAll", [], id);
   } catch (ex) {
     error = ex;
   }
 
-  do_check_eq(error.message.indexOf("Unknown remote client ID: "), 0);
+  equal(error.message.indexOf("Unknown remote client ID: "), 0);
 
   run_next_test();
 });
 
 add_test(function test_process_incoming_commands() {
   _("Ensures local commands are executed");
 
   engine.localCommands = [{ command: "logout", args: [] }];
@@ -501,17 +501,17 @@ add_test(function test_process_incoming_
     engine._resetClient();
 
     run_next_test();
   };
 
   Svc.Obs.add(ev, handler);
 
   // logout command causes processIncomingCommands to return explicit false.
-  do_check_false(engine.processIncomingCommands());
+  ok(!engine.processIncomingCommands());
 });
 
 add_test(function test_filter_duplicate_names() {
   _("Ensure that we exclude clients with identical names that haven't synced in a week.");
 
   let now = Date.now() / 1000;
   let contents = {
     meta: {global: {engines: {clients: {version: engine.version,
@@ -698,41 +698,41 @@ add_test(function test_command_sync() {
   }), Date.now() / 1000));
 
   try {
     _("Syncing.");
     engine._sync();
 
     _("Checking remote record was downloaded.");
     let clientRecord = engine._store._remoteClients[remoteId];
-    do_check_neq(clientRecord, undefined);
-    do_check_eq(clientRecord.commands.length, 0);
+    notEqual(clientRecord, undefined);
+    equal(clientRecord.commands.length, 0);
 
     _("Send a command to the remote client.");
     engine.sendCommand("wipeAll", []);
-    do_check_eq(clientRecord.commands.length, 1);
+    equal(clientRecord.commands.length, 1);
     engine._sync();
 
     _("Checking record was uploaded.");
-    do_check_neq(clientWBO(engine.localID).payload, undefined);
-    do_check_true(engine.lastRecordUpload > 0);
+    notEqual(clientWBO(engine.localID).payload, undefined);
+    ok(engine.lastRecordUpload > 0);
 
-    do_check_neq(clientWBO(remoteId).payload, undefined);
+    notEqual(clientWBO(remoteId).payload, undefined);
 
     Svc.Prefs.set("client.GUID", remoteId);
     engine._resetClient();
-    do_check_eq(engine.localID, remoteId);
+    equal(engine.localID, remoteId);
     _("Performing sync on resetted client.");
     engine._sync();
-    do_check_neq(engine.localCommands, undefined);
-    do_check_eq(engine.localCommands.length, 1);
+    notEqual(engine.localCommands, undefined);
+    equal(engine.localCommands.length, 1);
 
     let command = engine.localCommands[0];
-    do_check_eq(command.command, "wipeAll");
-    do_check_eq(command.args.length, 0);
+    equal(command.command, "wipeAll");
+    equal(command.args.length, 0);
 
   } finally {
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
 
     try {
       let collection = server.getCollection("foo", "clients");
       collection.remove(remoteId);
@@ -758,40 +758,40 @@ add_test(function test_send_uri_to_clien
   let initialScore = tracker.score;
 
   let uri = "http://www.mozilla.org/";
   let title = "Title of the Page";
   engine.sendURIToClientForDisplay(uri, remoteId, title);
 
   let newRecord = store._remoteClients[remoteId];
 
-  do_check_neq(newRecord, undefined);
-  do_check_eq(newRecord.commands.length, 1);
+  notEqual(newRecord, undefined);
+  equal(newRecord.commands.length, 1);
 
   let command = newRecord.commands[0];
-  do_check_eq(command.command, "displayURI");
-  do_check_eq(command.args.length, 3);
-  do_check_eq(command.args[0], uri);
-  do_check_eq(command.args[1], engine.localID);
-  do_check_eq(command.args[2], title);
+  equal(command.command, "displayURI");
+  equal(command.args.length, 3);
+  equal(command.args[0], uri);
+  equal(command.args[1], engine.localID);
+  equal(command.args[2], title);
 
-  do_check_true(tracker.score > initialScore);
-  do_check_true(tracker.score - initialScore >= SCORE_INCREMENT_XLARGE);
+  ok(tracker.score > initialScore);
+  ok(tracker.score - initialScore >= SCORE_INCREMENT_XLARGE);
 
   _("Ensure unknown client IDs result in exception.");
   let unknownId = Utils.makeGUID();
   let error;
 
   try {
     engine.sendURIToClientForDisplay(uri, unknownId);
   } catch (ex) {
     error = ex;
   }
 
-  do_check_eq(error.message.indexOf("Unknown remote client ID: "), 0);
+  equal(error.message.indexOf("Unknown remote client ID: "), 0);
 
   Svc.Prefs.resetBranch("");
   Service.recordManager.clearCache();
   engine._resetClient();
 
   run_next_test();
 });
 
@@ -814,52 +814,52 @@ add_test(function test_receive_display_u
 
   // Received 'displayURI' command should result in the topic defined below
   // being called.
   let ev = "weave:engine:clients:display-uris";
 
   let handler = function(subject, data) {
     Svc.Obs.remove(ev, handler);
 
-    do_check_eq(subject[0].uri, uri);
-    do_check_eq(subject[0].clientId, remoteId);
-    do_check_eq(subject[0].title, title);
-    do_check_eq(data, null);
+    equal(subject[0].uri, uri);
+    equal(subject[0].clientId, remoteId);
+    equal(subject[0].title, title);
+    equal(data, null);
 
     run_next_test();
   };
 
   Svc.Obs.add(ev, handler);
 
-  do_check_true(engine.processIncomingCommands());
+  ok(engine.processIncomingCommands());
 
   Svc.Prefs.resetBranch("");
   Service.recordManager.clearCache();
   engine._resetClient();
 });
 
 add_test(function test_optional_client_fields() {
   _("Ensure that we produce records with the fields added in Bug 1097222.");
 
   const SUPPORTED_PROTOCOL_VERSIONS = ["1.1", "1.5"];
   let local = engine._store.createRecord(engine.localID, "clients");
-  do_check_eq(local.name, engine.localName);
-  do_check_eq(local.type, engine.localType);
-  do_check_eq(local.version, Services.appinfo.version);
-  do_check_array_eq(local.protocols, SUPPORTED_PROTOCOL_VERSIONS);
+  equal(local.name, engine.localName);
+  equal(local.type, engine.localType);
+  equal(local.version, Services.appinfo.version);
+  deepEqual(local.protocols, SUPPORTED_PROTOCOL_VERSIONS);
 
   // Optional fields.
   // Make sure they're what they ought to be...
-  do_check_eq(local.os, Services.appinfo.OS);
-  do_check_eq(local.appPackage, Services.appinfo.ID);
+  equal(local.os, Services.appinfo.OS);
+  equal(local.appPackage, Services.appinfo.ID);
 
   // ... and also that they're non-empty.
-  do_check_true(!!local.os);
-  do_check_true(!!local.appPackage);
-  do_check_true(!!local.application);
+  ok(!!local.os);
+  ok(!!local.appPackage);
+  ok(!!local.application);
 
   // We don't currently populate device or formfactor.
   // See Bug 1100722, Bug 1100723.
 
   engine._resetClient();
   run_next_test();
 });
 
@@ -1065,13 +1065,85 @@ add_test(function test_send_uri_ack() {
     try {
       server.deleteCollections("foo");
     } finally {
       server.stop(run_next_test);
     }
   }
 });
 
+add_test(function test_command_sync() {
+  _("Notify other clients when writing their record.");
+
+  engine._store.wipe();
+  generateNewKeys(Service.collectionKeys);
+
+  let contents = {
+    meta: {global: {engines: {clients: {version: engine.version,
+                                        syncID: engine.syncID}}}},
+    clients: {},
+    crypto: {}
+  };
+  let server    = serverForUsers({"foo": "password"}, contents);
+  new SyncTestingInfrastructure(server.server);
+
+  let user       = server.user("foo");
+  let collection = server.getCollection("foo", "clients");
+  let remoteId   = Utils.makeGUID();
+  let remoteId2  = Utils.makeGUID();
+
+  function clientWBO(id) {
+    return user.collection("clients").wbo(id);
+  }
+
+  _("Create remote client record 1");
+  server.insertWBO("foo", "clients", new ServerWBO(remoteId, encryptPayload({
+    id: remoteId,
+    name: "Remote client",
+    type: "desktop",
+    commands: [],
+    version: "48",
+    protocols: ["1.5"]
+  }), Date.now() / 1000));
+
+  _("Create remote client record 2");
+  server.insertWBO("foo", "clients", new ServerWBO(remoteId2, encryptPayload({
+    id: remoteId2,
+    name: "Remote client 2",
+    type: "mobile",
+    commands: [],
+    version: "48",
+    protocols: ["1.5"]
+  }), Date.now() / 1000));
+
+  try {
+    equal(collection.count(), 2, "2 remote records written");
+    engine._sync();
+    equal(collection.count(), 3, "3 remote records written (+1 for the synced local record)");
+
+    let notifiedIds;
+    engine.sendCommand("wipeAll", []);
+    engine._tracker.addChangedID(engine.localID);
+    engine.getClientFxaDeviceId = (id) => "fxa-" + id;
+    engine._notifyCollectionChanged = (ids) => (notifiedIds = ids);
+    _("Syncing.");
+    engine._sync();
+    deepEqual(notifiedIds, ["fxa-fake-guid-00","fxa-fake-guid-01"]);
+    ok(!notifiedIds.includes(engine.getClientFxaDeviceId(engine.localID)),
+      "We never notify the local device");
+
+  } finally {
+    Svc.Prefs.resetBranch("");
+    Service.recordManager.clearCache();
+
+    try {
+      server.deleteCollections("foo");
+    } finally {
+      server.stop(run_next_test);
+    }
+  }
+});
+
 function run_test() {
   initTestLogging("Trace");
   Log.repository.getLogger("Sync.Engine.Clients").level = Log.Level.Trace;
   run_next_test();
 }