Bug 1262021 - Ensure remote commands are applied once per sync. r=markh
authorKit Cambridge <kcambridge@mozilla.com>
Mon, 04 Apr 2016 15:39:37 -0700
changeset 291860 df3d3e4187022772c3dfa8ffb591abb90632f0bb
parent 291859 43f067177f2d15a3e3788b98cc18bd35441eb59e
child 291861 95f6c8a20a2a2f0c3bf651605be0d2ce70b030e8
push id74700
push usercbook@mozilla.com
push dateWed, 06 Apr 2016 10:06:38 +0000
treeherdermozilla-inbound@49d808d13f4f [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersmarkh
bugs1262021
milestone48.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1262021 - Ensure remote commands are applied once per sync. r=markh MozReview-Commit-ID: 703efPknMrr
services/sync/modules/engines/clients.js
services/sync/tests/unit/test_clients_engine.js
--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -21,16 +21,19 @@ XPCOMUtils.defineLazyModuleGetter(this, 
   "resource://gre/modules/FxAccounts.jsm");
 
 const CLIENTS_TTL = 1814400; // 21 days
 const CLIENTS_TTL_REFRESH = 604800; // 7 days
 
 const SUPPORTED_PROTOCOL_VERSIONS = ["1.1", "1.5"];
 
 function hasDupeCommand(commands, action) {
+  if (!commands) {
+    return false;
+  }
   return commands.some(other => other.command == action.command &&
     Utils.deepEquals(other.args, action.args));
 }
 
 this.ClientsRec = function ClientsRec(collection, id) {
   CryptoWrapper.call(this, collection, id);
 }
 ClientsRec.prototype = {
@@ -183,16 +186,21 @@ ClientEngine.prototype = {
       for (let staleID of staleIDs) {
         this._removeRemoteClient(staleID);
       }
     } finally {
       this._incomingClients = null;
     }
   },
 
+  _uploadOutgoing() {
+    this._clearedCommands = null;
+    SyncEngine.prototype._uploadOutgoing.call(this);
+  },
+
   _syncFinish() {
     // Record telemetry for our device types.
     for (let [deviceType, count] of this.deviceTypes) {
       let hid;
       switch (deviceType) {
         case "desktop":
           hid = "WEAVE_DEVICE_COUNT_DESKTOP";
           break;
@@ -228,16 +236,17 @@ ClientEngine.prototype = {
 
   // Treat reset the same as wiping for locally cached clients
   _resetClient() {
     this._wipeClient();
   },
 
   _wipeClient: function _wipeClient() {
     SyncEngine.prototype._resetClient.call(this);
+    delete this.localCommands;
     this._store.wipe();
   },
 
   removeClientData: function removeClientData() {
     let res = this.service.resource(this.engineURL + "/" + this.localID);
     res.delete();
   },
 
@@ -270,16 +279,22 @@ ClientEngine.prototype = {
     logout:      { args: 0, desc: "Log out client" },
     displayURI:  { args: 3, desc: "Instruct a client to display a URI" },
   },
 
   /**
    * Remove any commands for the local client and mark it for upload.
    */
   clearCommands: function clearCommands() {
+    if (!this._clearedCommands) {
+      this._clearedCommands = [];
+    }
+    // Keep track of cleared local commands until the next sync, so that we
+    // don't reupload them.
+    this._clearedCommands = this._clearedCommands.concat(this.localCommands);
     delete this.localCommands;
     this._tracker.addChangedID(this.localID);
   },
 
   /**
    * Sends a command+args pair to a specific client.
    *
    * @param command Command string
@@ -469,31 +484,66 @@ function ClientStore(name, engine) {
 ClientStore.prototype = {
   __proto__: Store.prototype,
 
   create(record) {
     this.update(record)
   },
 
   update: function update(record) {
+    if (record.id == this.engine.localID) {
+      this._updateLocalRecord(record);
+    } else {
+      this._updateRemoteRecord(record);
+    }
+  },
+
+  _updateLocalRecord(record) {
+    // Local changes for our client means we're clearing commands or
+    // uploading a new record.
+    let incomingCommands = record.commands;
+    if (incomingCommands) {
+      // Filter out incoming commands that we've cleared.
+      incomingCommands = incomingCommands.filter(action =>
+        !hasDupeCommand(this.engine._clearedCommands, action));
+      if (!incomingCommands.length) {
+        // Use `undefined` instead of `null` to avoid creating a null field
+        // in the uploaded record.
+        incomingCommands = undefined;
+      }
+    }
     // Only grab commands from the server; local name/type always wins
-    if (record.id == this.engine.localID)
-      this.engine.localCommands = record.commands;
-    else {
-      let currentRecord = this._remoteClients[record.id];
-      if (currentRecord && currentRecord.commands) {
-        // Merge commands.
-        for (let action of currentRecord.commands) {
-          if (!hasDupeCommand(record.cleartext.commands, action)) {
-            record.cleartext.commands.push(action);
-          }
-        }
+    this.engine.localCommands = incomingCommands;
+  },
+
+  _updateRemoteRecord(record) {
+    let currentRecord = this._remoteClients[record.id];
+    if (!currentRecord || !currentRecord.commands ||
+        !(record.id in this.engine._modified)) {
+
+      // If we have a new incoming record or no outgoing commands, use the
+      // full incoming record from the server.
+      this._remoteClients[record.id] = record.cleartext;
+      return;
+    }
+
+    // Otherwise, we have outgoing commands for a client, so merge them
+    // with the commands that we downloaded from the server.
+    for (let action of currentRecord.commands) {
+      if (hasDupeCommand(record.cleartext.commands, action)) {
+        // Ignore commands the server already knows about.
+        continue;
       }
-      this._remoteClients[record.id] = record.cleartext;
+      if (record.cleartext.commands) {
+        record.cleartext.commands.push(action);
+      } else {
+        record.cleartext.commands = [action];
+      }
     }
+    this._remoteClients[record.id] = record.cleartext;
   },
 
   createRecord: function createRecord(id, collection) {
     let record = new ClientsRec(collection, id);
 
     // Package the individual components into a record for the local client
     if (id == this.engine.localID) {
       let cb = Async.makeSpinningCallback();
--- a/services/sync/tests/unit/test_clients_engine.js
+++ b/services/sync/tests/unit/test_clients_engine.js
@@ -622,16 +622,20 @@ add_test(function test_send_uri_to_clien
   try {
     engine.sendURIToClientForDisplay(uri, unknownId);
   } catch (ex) {
     error = ex;
   }
 
   do_check_eq(error.message.indexOf("Unknown remote client ID: "), 0);
 
+  Svc.Prefs.resetBranch("");
+  Service.recordManager.clearCache();
+  engine._resetClient();
+
   run_next_test();
 });
 
 add_test(function test_receive_display_uri() {
   _("Ensure processing of received 'displayURI' commands works.");
 
   // We don't set up WBOs and perform syncing because other tests verify
   // the command API works as advertised. This saves us a little work.
@@ -661,18 +665,19 @@ add_test(function test_receive_display_u
 
     run_next_test();
   };
 
   Svc.Obs.add(ev, handler);
 
   do_check_true(engine.processIncomingCommands());
 
+  Svc.Prefs.resetBranch("");
+  Service.recordManager.clearCache();
   engine._resetClient();
-  run_next_test();
 });
 
 add_test(function test_optional_client_fields() {
   _("Ensure that we produce records with the fields added in Bug 1097222.");
 
   const SUPPORTED_PROTOCOL_VERSIONS = ["1.1", "1.5"];
   let local = engine._store.createRecord(engine.localID, "clients");
   do_check_eq(local.name, engine.localName);
@@ -688,16 +693,17 @@ add_test(function test_optional_client_f
   // ... and also that they're non-empty.
   do_check_true(!!local.os);
   do_check_true(!!local.appPackage);
   do_check_true(!!local.application);
 
   // We don't currently populate device or formfactor.
   // See Bug 1100722, Bug 1100723.
 
+  engine._resetClient();
   run_next_test();
 });
 
 add_test(function test_merge_commands() {
   _("Verifies local commands for remote clients are merged with the server's");
 
   let now = Date.now() / 1000;
   let contents = {
@@ -839,13 +845,72 @@ add_test(function test_deleted_commands(
     try {
       server.deleteCollections("foo");
     } finally {
       server.stop(run_next_test);
     }
   }
 });
 
+add_test(function test_send_uri_ack() {
+  _("Ensure a sent URI is deleted when the client syncs");
+
+  let now = Date.now() / 1000;
+  let contents = {
+    meta: {global: {engines: {clients: {version: engine.version,
+                                        syncID: engine.syncID}}}},
+    clients: {},
+    crypto: {}
+  };
+  let server = serverForUsers({"foo": "password"}, contents);
+  let user   = server.user("foo");
+
+  new SyncTestingInfrastructure(server.server);
+  generateNewKeys(Service.collectionKeys);
+
+  try {
+    let fakeSenderID = Utils.makeGUID();
+
+    _("Initial sync for empty clients collection");
+    engine._sync();
+    let collection = server.getCollection("foo", "clients");
+    let ourPayload = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
+    ok(ourPayload, "Should upload our client record");
+
+    _("Send a URL to the device on the server");
+    ourPayload.commands = [{
+      command: "displayURI",
+      args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
+    }];
+    server.insertWBO("foo", "clients", new ServerWBO(engine.localID, encryptPayload(ourPayload), now));
+
+    _("Sync again");
+    engine._sync();
+    deepEqual(engine.localCommands, [{
+      command: "displayURI",
+      args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
+    }], "Should receive incoming URI");
+    ok(engine.processIncomingCommands(), "Should process incoming commands");
+    ok(!engine.localCommands, "Should clear commands after processing");
+
+    _("Check that the command was removed on the server");
+    engine._sync();
+    ourPayload = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
+    ok(ourPayload, "Should upload the synced client record");
+    ok(!ourPayload.commands, "Should not reupload cleared commands");
+  } finally {
+    Svc.Prefs.resetBranch("");
+    Service.recordManager.clearCache();
+    engine._resetClient();
+
+    try {
+      server.deleteCollections("foo");
+    } finally {
+      server.stop(run_next_test);
+    }
+  }
+});
+
 function run_test() {
   initTestLogging("Trace");
   Log.repository.getLogger("Sync.Engine.Clients").level = Log.Level.Trace;
   run_next_test();
 }