Bug 1250531 - Unconditionally sync the clients collection. r=markh
authorKit Cambridge <kcambridge@mozilla.com>
Fri, 01 Apr 2016 10:55:10 -0700
changeset 291441 c7b00e87d6443d3e232fe4042f7fd7f111c6ddbf
parent 291440 e1a56a0d9db2880d37b650989bbeeb780a30dc60
child 291442 1d4afa71e193b7c882a4925bc13797a1b70ea5ab
push id19656
push usergwagner@mozilla.com
push dateMon, 04 Apr 2016 13:43:23 +0000
treeherderb2g-inbound@e99061fde28a [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersmarkh
bugs1250531
milestone48.0a1
Bug 1250531 - Unconditionally sync the clients collection. r=markh MozReview-Commit-ID: 4RHolqewNmx
services/sync/modules/engines/clients.js
services/sync/tests/unit/test_clients_engine.js
services/sync/tests/unit/test_interval_triggers.js
services/sync/tests/unit/test_syncscheduler.js
--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -20,16 +20,21 @@ Cu.import("resource://gre/modules/Servic
 XPCOMUtils.defineLazyModuleGetter(this, "fxAccounts",
   "resource://gre/modules/FxAccounts.jsm");
 
 const CLIENTS_TTL = 1814400; // 21 days
 const CLIENTS_TTL_REFRESH = 604800; // 7 days
 
 const SUPPORTED_PROTOCOL_VERSIONS = ["1.1", "1.5"];
 
+function hasDupeCommand(commands, action) {
+  return commands.some(other => other.command == action.command &&
+    Utils.deepEquals(other.args, action.args));
+}
+
 this.ClientsRec = function ClientsRec(collection, id) {
   CryptoWrapper.call(this, collection, id);
 }
 ClientsRec.prototype = {
   __proto__: CryptoWrapper.prototype,
   _logName: "Sync.Record.Clients",
   ttl: CLIENTS_TTL
 };
@@ -145,16 +150,37 @@ ClientEngine.prototype = {
     // Reupload new client record periodically.
     if (Date.now() / 1000 - this.lastRecordUpload > CLIENTS_TTL_REFRESH) {
       this._tracker.addChangedID(this.localID);
       this.lastRecordUpload = Date.now() / 1000;
     }
     SyncEngine.prototype._syncStartup.call(this);
   },
 
+  _processIncoming() {
+    // Fetch all records from the server.
+    this.lastSync = 0;
+    this._incomingClients = [];
+    try {
+      SyncEngine.prototype._processIncoming.call(this);
+      // Since clients are synced unconditionally, any records in the local store
+      // that don't exist on the server must be for disconnected clients. Remove
+      // them, so that we don't upload records with commands for clients that will
+      // never see them. We also do this to filter out stale clients from the
+      // tabs collection, since showing their list of tabs is confusing.
+      let remoteClientIDs = Object.keys(this._store._remoteClients);
+      let staleIDs = Utils.arraySub(remoteClientIDs, this._incomingClients);
+      for (let staleID of staleIDs) {
+        this._removeRemoteClient(staleID);
+      }
+    } finally {
+      this._incomingClients = null;
+    }
+  },
+
   _syncFinish() {
     // Record telemetry for our device types.
     for (let [deviceType, count] of this.deviceTypes) {
       let hid;
       switch (deviceType) {
         case "desktop":
           hid = "WEAVE_DEVICE_COUNT_DESKTOP";
           break;
@@ -165,19 +191,32 @@ ClientEngine.prototype = {
           this._log.warn(`Unexpected deviceType "${deviceType}" recording device telemetry.`);
           continue;
       }
       Services.telemetry.getHistogramById(hid).add(count);
     }
     SyncEngine.prototype._syncFinish.call(this);
   },
 
-  // Always process incoming items because they might have commands
-  _reconcile: function _reconcile() {
-    return true;
+  _reconcile: function _reconcile(item) {
+    // Every incoming record is reconciled, so we use this to track the
+    // contents of the collection on the server.
+    this._incomingClients.push(item.id);
+
+    if (!this._store.itemExists(item.id)) {
+      return true;
+    }
+    // Clients are synced unconditionally, so we'll always have new records.
+    // Unfortunately, this will cause the scheduler to use the immediate sync
+    // interval for the multi-device case, instead of the active interval. We
+    // work around this by updating the record during reconciliation, and
+    // returning false to indicate that the record doesn't need to be applied
+    // later.
+    this._store.update(item);
+    return false;
   },
 
   // Treat reset the same as wiping for locally cached clients
   _resetClient() {
     this._wipeClient();
   },
 
   _wipeClient: function _wipeClient() {
@@ -238,31 +277,26 @@ ClientEngine.prototype = {
   _sendCommandToClient: function sendCommandToClient(command, args, clientId) {
     this._log.trace("Sending " + command + " to " + clientId);
 
     let client = this._store._remoteClients[clientId];
     if (!client) {
       throw new Error("Unknown remote client ID: '" + clientId + "'.");
     }
 
-    // notDupe compares two commands and returns if they are not equal.
-    let notDupe = function(other) {
-      return other.command != command || !Utils.deepEquals(other.args, args);
-    };
-
     let action = {
       command: command,
       args: args,
     };
 
     if (!client.commands) {
       client.commands = [action];
     }
     // Add the new action if there are no duplicates.
-    else if (client.commands.every(notDupe)) {
+    else if (!hasDupeCommand(client.commands, action)) {
       client.commands.push(action);
     }
     // It must be a dupe. Skip.
     else {
       return;
     }
 
     this._log.trace("Client " + clientId + " got a new action: " + [command, args]);
@@ -404,35 +438,50 @@ ClientEngine.prototype = {
    *        send this.
    */
   _handleDisplayURI: function _handleDisplayURI(uri, clientId, title) {
     this._log.info("Received a URI for display: " + uri + " (" + title +
                    ") from " + clientId);
 
     let subject = {uri: uri, client: clientId, title: title};
     Svc.Obs.notify("weave:engine:clients:display-uri", subject);
-  }
+  },
+
+  _removeRemoteClient(id) {
+    delete this._store._remoteClients[id];
+    this._tracker.removeChangedID(id);
+  },
 };
 
 function ClientStore(name, engine) {
   Store.call(this, name, engine);
 }
 ClientStore.prototype = {
   __proto__: Store.prototype,
 
   create(record) {
     this.update(record)
   },
 
   update: function update(record) {
     // Only grab commands from the server; local name/type always wins
     if (record.id == this.engine.localID)
       this.engine.localCommands = record.commands;
-    else
+    else {
+      let currentRecord = this._remoteClients[record.id];
+      if (currentRecord && currentRecord.commands) {
+        // Merge commands.
+        for (let action of currentRecord.commands) {
+          if (!hasDupeCommand(record.cleartext.commands, action)) {
+            record.cleartext.commands.push(action);
+          }
+        }
+      }
       this._remoteClients[record.id] = record.cleartext;
+    }
   },
 
   createRecord: function createRecord(id, collection) {
     let record = new ClientsRec(collection, id);
 
     // Package the individual components into a record for the local client
     if (id == this.engine.localID) {
       let cb = Async.makeSpinningCallback();
--- a/services/sync/tests/unit/test_clients_engine.js
+++ b/services/sync/tests/unit/test_clients_engine.js
@@ -188,16 +188,89 @@ add_test(function test_properties() {
     engine.lastRecordUpload = now / 1000;
     do_check_eq(engine.lastRecordUpload, Math.floor(now / 1000));
   } finally {
     Svc.Prefs.resetBranch("");
     run_next_test();
   }
 });
 
+add_test(function test_full_sync() {
+  _("Ensure that Clients engine fetches all records for each sync.");
+
+  let now = Date.now() / 1000;
+  let contents = {
+    meta: {global: {engines: {clients: {version: engine.version,
+                                        syncID: engine.syncID}}}},
+    clients: {},
+    crypto: {}
+  };
+  let server = serverForUsers({"foo": "password"}, contents);
+  let user   = server.user("foo");
+
+  new SyncTestingInfrastructure(server.server);
+  generateNewKeys(Service.collectionKeys);
+
+  let activeID = Utils.makeGUID();
+  server.insertWBO("foo", "clients", new ServerWBO(activeID, encryptPayload({
+    id: activeID,
+    name: "Active client",
+    type: "desktop",
+    commands: [],
+    version: "48",
+    protocols: ["1.5"],
+  }), now - 10));
+
+  let deletedID = Utils.makeGUID();
+  server.insertWBO("foo", "clients", new ServerWBO(deletedID, encryptPayload({
+    id: deletedID,
+    name: "Client to delete",
+    type: "desktop",
+    commands: [],
+    version: "48",
+    protocols: ["1.5"],
+  }), now - 10));
+
+  try {
+    let store = engine._store;
+
+    _("First sync. 2 records downloaded; our record uploaded.");
+    strictEqual(engine.lastRecordUpload, 0);
+    engine._sync();
+    ok(engine.lastRecordUpload > 0);
+    deepEqual(user.collection("clients").keys().sort(),
+              [activeID, deletedID, engine.localID].sort(),
+              "Our record should be uploaded on first sync");
+    deepEqual(Object.keys(store.getAllIDs()).sort(),
+              [activeID, deletedID, engine.localID].sort(),
+              "Other clients should be downloaded on first sync");
+
+    _("Delete a record, then sync again");
+    let collection = server.getCollection("foo", "clients");
+    collection.remove(deletedID);
+    // Simulate a timestamp update in info/collections.
+    engine.lastModified = now;
+    engine._sync();
+
+    _("Record should be updated");
+    deepEqual(Object.keys(store.getAllIDs()).sort(),
+              [activeID, engine.localID].sort(),
+              "Deleted client should be removed on next sync");
+  } finally {
+    Svc.Prefs.resetBranch("");
+    Service.recordManager.clearCache();
+
+    try {
+      server.deleteCollections("foo");
+    } finally {
+      server.stop(run_next_test);
+    }
+  }
+});
+
 add_test(function test_sync() {
   _("Ensure that Clients engine uploads a new client record once a week.");
 
   let contents = {
     meta: {global: {engines: {clients: {version: engine.version,
                                         syncID: engine.syncID}}}},
     clients: {},
     crypto: {}
@@ -449,28 +522,39 @@ add_test(function test_command_sync() {
   let user     = server.user("foo");
   let remoteId = Utils.makeGUID();
 
   function clientWBO(id) {
     return user.collection("clients").wbo(id);
   }
 
   _("Create remote client record");
-  let rec = new ClientsRec("clients", remoteId);
-  engine._store.create(rec);
-  let remoteRecord = engine._store.createRecord(remoteId, "clients");
-  engine.sendCommand("wipeAll", []);
-
-  let clientRecord = engine._store._remoteClients[remoteId];
-  do_check_neq(clientRecord, undefined);
-  do_check_eq(clientRecord.commands.length, 1);
+  server.insertWBO("foo", "clients", new ServerWBO(remoteId, encryptPayload({
+    id: remoteId,
+    name: "Remote client",
+    type: "desktop",
+    commands: [],
+    version: "48",
+    protocols: ["1.5"],
+  }), Date.now() / 1000));
 
   try {
     _("Syncing.");
     engine._sync();
+
+    _("Checking remote record was downloaded.");
+    let clientRecord = engine._store._remoteClients[remoteId];
+    do_check_neq(clientRecord, undefined);
+    do_check_eq(clientRecord.commands.length, 0);
+
+    _("Send a command to the remote client.");
+    engine.sendCommand("wipeAll", []);
+    do_check_eq(clientRecord.commands.length, 1);
+    engine._sync();
+
     _("Checking record was uploaded.");
     do_check_neq(clientWBO(engine.localID).payload, undefined);
     do_check_true(engine.lastRecordUpload > 0);
 
     do_check_neq(clientWBO(remoteId).payload, undefined);
 
     Svc.Prefs.set("client.GUID", remoteId);
     engine._resetClient();
@@ -482,17 +566,23 @@ add_test(function test_command_sync() {
 
     let command = engine.localCommands[0];
     do_check_eq(command.command, "wipeAll");
     do_check_eq(command.args.length, 0);
 
   } finally {
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
-    server.stop(run_next_test);
+
+    try {
+      let collection = server.getCollection("foo", "clients");
+      collection.remove(remoteId);
+    } finally {
+      server.stop(run_next_test);
+    }
   }
 });
 
 add_test(function test_send_uri_to_client_for_display() {
   _("Ensure sendURIToClientForDisplay() sends command properly.");
 
   let tracker = engine._tracker;
   let store = engine._store;
@@ -570,16 +660,19 @@ add_test(function test_receive_display_u
     do_check_eq(data, null);
 
     run_next_test();
   };
 
   Svc.Obs.add(ev, handler);
 
   do_check_true(engine.processIncomingCommands());
+
+  engine._resetClient();
+  run_next_test();
 });
 
 add_test(function test_optional_client_fields() {
   _("Ensure that we produce records with the fields added in Bug 1097222.");
 
   const SUPPORTED_PROTOCOL_VERSIONS = ["1.1", "1.5"];
   let local = engine._store.createRecord(engine.localID, "clients");
   do_check_eq(local.name, engine.localName);
@@ -598,13 +691,161 @@ add_test(function test_optional_client_f
   do_check_true(!!local.application);
 
   // We don't currently populate device or formfactor.
   // See Bug 1100722, Bug 1100723.
 
   run_next_test();
 });
 
+add_test(function test_merge_commands() {
+  _("Verifies local commands for remote clients are merged with the server's");
+
+  let now = Date.now() / 1000;
+  let contents = {
+    meta: {global: {engines: {clients: {version: engine.version,
+                                        syncID: engine.syncID}}}},
+    clients: {},
+    crypto: {}
+  };
+  let server = serverForUsers({"foo": "password"}, contents);
+  let user   = server.user("foo");
+
+  new SyncTestingInfrastructure(server.server);
+  generateNewKeys(Service.collectionKeys);
+
+  let desktopID = Utils.makeGUID();
+  server.insertWBO("foo", "clients", new ServerWBO(desktopID, encryptPayload({
+    id: desktopID,
+    name: "Desktop client",
+    type: "desktop",
+    commands: [{
+      command: "displayURI",
+      args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
+    }],
+    version: "48",
+    protocols: ["1.5"],
+  }), now - 10));
+
+  let mobileID = Utils.makeGUID();
+  server.insertWBO("foo", "clients", new ServerWBO(mobileID, encryptPayload({
+    id: mobileID,
+    name: "Mobile client",
+    type: "mobile",
+    commands: [{
+      command: "logout",
+      args: [],
+    }],
+    version: "48",
+    protocols: ["1.5"],
+  }), now - 10));
+
+  try {
+    let store = engine._store;
+
+    _("First sync. 2 records downloaded.");
+    strictEqual(engine.lastRecordUpload, 0);
+    engine._sync();
+
+    _("Broadcast logout to all clients");
+    engine.sendCommand("logout", []);
+    engine._sync();
+
+    let collection = server.getCollection("foo", "clients");
+    let desktopPayload = JSON.parse(JSON.parse(collection.payload(desktopID)).ciphertext);
+    deepEqual(desktopPayload.commands, [{
+      command: "displayURI",
+      args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
+    }, {
+      command: "logout",
+      args: [],
+    }], "Should send the logout command to the desktop client");
+
+    let mobilePayload = JSON.parse(JSON.parse(collection.payload(mobileID)).ciphertext);
+    deepEqual(mobilePayload.commands, [{ command: "logout", args: [] }],
+      "Should not send a duplicate logout to the mobile client");
+  } finally {
+    Svc.Prefs.resetBranch("");
+    Service.recordManager.clearCache();
+    engine._resetClient();
+
+    try {
+      server.deleteCollections("foo");
+    } finally {
+      server.stop(run_next_test);
+    }
+  }
+});
+
+add_test(function test_deleted_commands() {
+  _("Verifies commands for a deleted client are discarded");
+
+  let now = Date.now() / 1000;
+  let contents = {
+    meta: {global: {engines: {clients: {version: engine.version,
+                                        syncID: engine.syncID}}}},
+    clients: {},
+    crypto: {}
+  };
+  let server = serverForUsers({"foo": "password"}, contents);
+  let user   = server.user("foo");
+
+  new SyncTestingInfrastructure(server.server);
+  generateNewKeys(Service.collectionKeys);
+
+  let activeID = Utils.makeGUID();
+  server.insertWBO("foo", "clients", new ServerWBO(activeID, encryptPayload({
+    id: activeID,
+    name: "Active client",
+    type: "desktop",
+    commands: [],
+    version: "48",
+    protocols: ["1.5"],
+  }), now - 10));
+
+  let deletedID = Utils.makeGUID();
+  server.insertWBO("foo", "clients", new ServerWBO(deletedID, encryptPayload({
+    id: deletedID,
+    name: "Client to delete",
+    type: "desktop",
+    commands: [],
+    version: "48",
+    protocols: ["1.5"],
+  }), now - 10));
+
+  try {
+    let store = engine._store;
+
+    _("First sync. 2 records downloaded.");
+    engine._sync();
+
+    _("Delete a record on the server.");
+    let collection = server.getCollection("foo", "clients");
+    collection.remove(deletedID);
+
+    _("Broadcast a command to all clients");
+    engine.sendCommand("logout", []);
+    engine._sync();
+
+    deepEqual(collection.keys().sort(), [activeID, engine.localID].sort(),
+      "Should not reupload deleted clients");
+
+    let activePayload = JSON.parse(JSON.parse(collection.payload(activeID)).ciphertext);
+    deepEqual(activePayload.commands, [{ command: "logout", args: [] }],
+      "Should send the command to the active client");
+  } finally {
+    Svc.Prefs.resetBranch("");
+    Service.recordManager.clearCache();
+    engine._resetClient();
+
+    try {
+      server.deleteCollections("foo");
+    } finally {
+      server.stop(run_next_test);
+    }
+  }
+});
+
 function run_test() {
   initTestLogging("Trace");
   Log.repository.getLogger("Sync.Engine.Clients").level = Log.Level.Trace;
   run_next_test();
 }
--- a/services/sync/tests/unit/test_interval_triggers.js
+++ b/services/sync/tests/unit/test_interval_triggers.js
@@ -8,16 +8,21 @@ Cu.import("resource://services-sync/util
 Cu.import("resource://testing-common/services/sync/utils.js");
 
 Svc.DefaultPrefs.set("registerEngines", "");
 Cu.import("resource://services-sync/service.js");
 
 var scheduler = Service.scheduler;
 var clientsEngine = Service.clientsEngine;
 
+// Don't remove stale clients when syncing. This is a test-only workaround
+// that lets us add clients directly to the store, without losing them on
+// the next sync.
+clientsEngine._removeRemoteClient = id => {};
+
 function promiseStopServer(server) {
   let deferred = Promise.defer();
   server.stop(deferred.resolve);
   return deferred.promise;
 }
 
 function sync_httpd_setup() {
   let global = new ServerWBO("global", {
--- a/services/sync/tests/unit/test_syncscheduler.js
+++ b/services/sync/tests/unit/test_syncscheduler.js
@@ -24,16 +24,21 @@ CatapultEngine.prototype = {
   }
 };
 
 Service.engineManager.register(CatapultEngine);
 
 var scheduler = new SyncScheduler(Service);
 var clientsEngine = Service.clientsEngine;
 
+// Don't remove stale clients when syncing. This is a test-only workaround
+// that lets us add clients directly to the store, without losing them on
+// the next sync.
+clientsEngine._removeRemoteClient = id => {};
+
 function sync_httpd_setup() {
   let global = new ServerWBO("global", {
     syncID: Service.syncID,
     storageVersion: STORAGE_VERSION,
     engines: {clients: {version: clientsEngine.version,
                         syncID: clientsEngine.syncID}}
   });
   let clientsColl = new ServerCollection({}, true);
@@ -64,16 +69,17 @@ function setUp(server) {
     deferred.resolve(result);
   });
   return deferred.promise;
 }
 
 function cleanUpAndGo(server) {
   let deferred = Promise.defer();
   Utils.nextTick(function () {
+    clientsEngine._store.wipe();
     Service.startOver();
     if (server) {
       server.stop(deferred.resolve);
     } else {
       deferred.resolve();
     }
   });
   return deferred.promise;