Bug 1289536 (part 2) - Add a unique flowID GUID to each command sent via the clients collection. r?rnewman draft
authorMark Hammond <mhammond@skippinet.com.au>
Fri, 04 Nov 2016 12:46:57 +1100
changeset 457464 55358cbdf90814482fb52d6592b74b7f4fb84e62
parent 457463 339a0501a0dd6edc1917104067ab281c0c567f5b
child 457515 6d78761111d41394d4a67e42f967e9145d1645c8
push id40775
push userbmo:markh@mozilla.com
push dateMon, 09 Jan 2017 00:31:29 +0000
reviewersrnewman
bugs1289536
milestone53.0a1
Bug 1289536 (part 2) - Add a unique flowID GUID to each command sent via the clients collection. r?rnewman MozReview-Commit-ID: 4eTFDq4Yr7S
services/sync/modules/engines/clients.js
services/sync/tests/unit/test_clients_engine.js
--- a/services/sync/modules/engines/clients.js
+++ b/services/sync/modules/engines/clients.js
@@ -246,20 +246,21 @@ ClientEngine.prototype = {
       })
     );
   },
 
   _addClientCommand(clientId, command) {
     const allCommands = this._readCommands();
     const clientCommands = allCommands[clientId] || [];
     if (hasDupeCommand(clientCommands, command)) {
-      return;
+      return false;
     }
     allCommands[clientId] = clientCommands.concat(command);
     this._saveCommands(allCommands);
+    return true;
   },
 
   _syncStartup: function _syncStartup() {
     // Reupload new client record periodically.
     if (Date.now() / 1000 - this.lastRecordUpload > CLIENTS_TTL_REFRESH) {
       this._tracker.addChangedID(this.localID);
       this.lastRecordUpload = Date.now() / 1000;
     }
@@ -472,35 +473,45 @@ ClientEngine.prototype = {
 
   /**
    * Sends a command+args pair to a specific client.
    *
    * @param command Command string
    * @param args Array of arguments/data for command
    * @param clientId Client to send command to
    */
-  _sendCommandToClient: function sendCommandToClient(command, args, clientId) {
+  _sendCommandToClient: function sendCommandToClient(command, args, clientId, flowID = null) {
     this._log.trace("Sending " + command + " to " + clientId);
 
     let client = this._store._remoteClients[clientId];
     if (!client) {
       throw new Error("Unknown remote client ID: '" + clientId + "'.");
     }
     if (client.stale) {
       throw new Error("Stale remote client ID: '" + clientId + "'.");
     }
 
     let action = {
       command: command,
       args: args,
+      flowID: flowID || Utils.makeGUID(), // used for telemetry.
     };
 
-    this._log.trace("Client " + clientId + " got a new action: " + [command, args]);
-    this._addClientCommand(clientId, action);
-    this._tracker.addChangedID(clientId);
+    if (this._addClientCommand(clientId, action)) {
+      this._log.trace(`Client ${clientId} got a new action`, [command, args]);
+      this._tracker.addChangedID(clientId);
+      let deviceID;
+      try {
+        deviceID = this.service.identity.hashedDeviceID(clientId);
+      } catch (_) {}
+      this.service.recordTelemetryEvent("sendcommand", command, undefined,
+                                        { flowID: action.flowID, deviceID });
+    } else {
+      this._log.trace(`Client ${clientId} got a duplicate action`, [command, args]);
+    }
   },
 
   /**
    * Check if the local client has any remote commands and perform them.
    *
    * @return false to abort sync
    */
   processIncomingCommands: function processIncomingCommands() {
@@ -510,19 +521,22 @@ ClientEngine.prototype = {
       }
 
       const clearedCommands = this._readCommands()[this.localID];
       const commands = this.localCommands.filter(command => !hasDupeCommand(clearedCommands, command));
 
       let URIsToDisplay = [];
       // Process each command in order.
       for (let rawCommand of commands) {
-        let {command, args} = rawCommand;
+        let {command, args, flowID} = rawCommand;
         this._log.debug("Processing command: " + command + "(" + args + ")");
 
+        this.service.recordTelemetryEvent("processcommand", command, undefined,
+                                          { flowID });
+
         let engines = [args[0]];
         switch (command) {
           case "resetAll":
             engines = null;
             // Fallthrough
           case "resetEngine":
             this.service.resetClient(engines);
             break;
@@ -565,37 +579,40 @@ ClientEngine.prototype = {
    *
    * @param command
    *        Command to invoke on remote clients
    * @param args
    *        Array of arguments to give to the command
    * @param clientId
    *        Client ID to send command to. If undefined, send to all remote
    *        clients.
+   * @param flowID
+   *        A unique identifier used to track success for this operation across
+   *        devices.
    */
-  sendCommand: function sendCommand(command, args, clientId) {
+  sendCommand: function sendCommand(command, args, clientId, flowID = null) {
     let commandData = this._commands[command];
     // Don't send commands that we don't know about.
     if (!commandData) {
       this._log.error("Unknown command to send: " + command);
       return;
     }
     // Don't send a command with the wrong number of arguments.
     else if (!args || args.length != commandData.args) {
       this._log.error("Expected " + commandData.args + " args for '" +
                       command + "', but got " + args);
       return;
     }
 
     if (clientId) {
-      this._sendCommandToClient(command, args, clientId);
+      this._sendCommandToClient(command, args, clientId, flowID);
     } else {
       for (let [id, record] of Object.entries(this._store._remoteClients)) {
         if (!record.stale) {
-          this._sendCommandToClient(command, args, id);
+          this._sendCommandToClient(command, args, id, flowID);
         }
       }
     }
   },
 
   /**
    * Send a URI to another client for display.
    *
--- a/services/sync/tests/unit/test_clients_engine.js
+++ b/services/sync/tests/unit/test_clients_engine.js
@@ -384,16 +384,17 @@ add_test(function test_send_command() {
   let clientCommands = engine._readCommands()[remoteId];
   notEqual(newRecord, undefined);
   equal(clientCommands.length, 1);
 
   let command = clientCommands[0];
   equal(command.command, action);
   equal(command.args.length, 2);
   deepEqual(command.args, args);
+  ok(command.flowID);
 
   notEqual(tracker.changedIDs[remoteId], undefined);
 
   engine._tracker.clearChangedIDs();
   run_next_test();
 });
 
 add_test(function test_command_validation() {
@@ -627,22 +628,22 @@ add_task(async function test_filter_dupl
     equal(counts.newFailed, 0);
 
     _("Broadcast logout to all clients");
     engine.sendCommand("logout", []);
     engine._sync();
 
     let collection = server.getCollection("foo", "clients");
     let recentPayload = JSON.parse(JSON.parse(collection.payload(recentID)).ciphertext);
-    deepEqual(recentPayload.commands, [{ command: "logout", args: [] }],
-              "Should send commands to the recent client");
+    compareCommands(recentPayload.commands, [{ command: "logout", args: [] }],
+                    "Should send commands to the recent client");
 
     let oldPayload = JSON.parse(JSON.parse(collection.payload(oldID)).ciphertext);
-    deepEqual(oldPayload.commands, [{ command: "logout", args: [] }],
-              "Should send commands to the week-old client");
+    compareCommands(oldPayload.commands, [{ command: "logout", args: [] }],
+                    "Should send commands to the week-old client");
 
     let dupePayload = JSON.parse(JSON.parse(collection.payload(dupeID)).ciphertext);
     deepEqual(dupePayload.commands, [],
               "Should not send commands to the dupe client");
 
     _("Update the dupe client's modified time");
     server.insertWBO("foo", "clients", new ServerWBO(dupeID, encryptPayload({
       id: dupeID,
@@ -909,29 +910,31 @@ add_task(async function test_merge_comma
   let desktopID = Utils.makeGUID();
   server.insertWBO("foo", "clients", new ServerWBO(desktopID, encryptPayload({
     id: desktopID,
     name: "Desktop client",
     type: "desktop",
     commands: [{
       command: "displayURI",
       args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
+      flowID: Utils.makeGUID(),
     }],
     version: "48",
     protocols: ["1.5"],
   }), now - 10));
 
   let mobileID = Utils.makeGUID();
   server.insertWBO("foo", "clients", new ServerWBO(mobileID, encryptPayload({
     id: mobileID,
     name: "Mobile client",
     type: "mobile",
     commands: [{
       command: "logout",
       args: [],
+      flowID: Utils.makeGUID(),
     }],
     version: "48",
     protocols: ["1.5"],
   }), now - 10));
 
   try {
     let store = engine._store;
 
@@ -940,27 +943,27 @@ add_task(async function test_merge_comma
     engine._sync();
 
     _("Broadcast logout to all clients");
     engine.sendCommand("logout", []);
     engine._sync();
 
     let collection = server.getCollection("foo", "clients");
     let desktopPayload = JSON.parse(JSON.parse(collection.payload(desktopID)).ciphertext);
-    deepEqual(desktopPayload.commands, [{
+    compareCommands(desktopPayload.commands, [{
       command: "displayURI",
       args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
     }, {
       command: "logout",
       args: [],
     }], "Should send the logout command to the desktop client");
 
     let mobilePayload = JSON.parse(JSON.parse(collection.payload(mobileID)).ciphertext);
-    deepEqual(mobilePayload.commands, [{ command: "logout", args: [] }],
-      "Should not send a duplicate logout to the mobile client");
+    compareCommands(mobilePayload.commands, [{ command: "logout", args: [] }],
+                    "Should not send a duplicate logout to the mobile client");
   } finally {
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
     engine._resetClient();
 
     try {
       server.deleteCollections("foo");
     } finally {
@@ -1017,17 +1020,17 @@ add_task(async function test_duplicate_r
     }), now - 10));
 
     _("Send another tab to the desktop client");
     engine.sendCommand("displayURI", ["https://foobar.com", engine.localID, "Foo bar!"], desktopID);
     engine._sync();
 
     let collection = server.getCollection("foo", "clients");
     let desktopPayload = JSON.parse(JSON.parse(collection.payload(desktopID)).ciphertext);
-    deepEqual(desktopPayload.commands, [{
+    compareCommands(desktopPayload.commands, [{
       command: "displayURI",
       args: ["https://foobar.com", engine.localID, "Foo bar!"],
     }], "Should only send the second command to the desktop client");
   } finally {
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
     engine._resetClient();
 
@@ -1057,17 +1060,19 @@ add_task(async function test_upload_afte
 
   let deviceBID = Utils.makeGUID();
   let deviceCID = Utils.makeGUID();
   server.insertWBO("foo", "clients", new ServerWBO(deviceBID, encryptPayload({
     id: deviceBID,
     name: "Device B",
     type: "desktop",
     commands: [{
-      command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+      command: "displayURI",
+      args: ["https://deviceclink.com", deviceCID, "Device C link"],
+      flowID: Utils.makeGUID(),
     }],
     version: "48",
     protocols: ["1.5"],
   }), now - 10));
   server.insertWBO("foo", "clients", new ServerWBO(deviceCID, encryptPayload({
     id: deviceCID,
     name: "Device C",
     type: "desktop",
@@ -1087,17 +1092,17 @@ add_task(async function test_upload_afte
     engine.sendCommand("displayURI", ["https://example.com", engine.localID, "Yak Herders Anonymous"], deviceBID);
 
     const oldUploadOutgoing = SyncEngine.prototype._uploadOutgoing;
     SyncEngine.prototype._uploadOutgoing = () => engine._onRecordsWritten.call(engine, [], [deviceBID]);
     engine._sync();
 
     let collection = server.getCollection("foo", "clients");
     let deviceBPayload = JSON.parse(JSON.parse(collection.payload(deviceBID)).ciphertext);
-    deepEqual(deviceBPayload.commands, [{
+    compareCommands(deviceBPayload.commands, [{
       command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
     }], "Should be the same because the upload failed");
 
     _("Simulate the client B consuming the command and syncing to the server");
     server.insertWBO("foo", "clients", new ServerWBO(deviceBID, encryptPayload({
       id: deviceBID,
       name: "Device B",
       type: "desktop",
@@ -1108,17 +1113,17 @@ add_task(async function test_upload_afte
 
     // Simulate reboot
     SyncEngine.prototype._uploadOutgoing = oldUploadOutgoing;
     engine = Service.clientsEngine = new ClientEngine(Service);
 
     engine._sync();
 
     deviceBPayload = JSON.parse(JSON.parse(collection.payload(deviceBID)).ciphertext);
-    deepEqual(deviceBPayload.commands, [{
+    compareCommands(deviceBPayload.commands, [{
       command: "displayURI",
       args: ["https://example.com", engine.localID, "Yak Herders Anonymous"],
     }], "Should only had written our outgoing command");
   } finally {
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
     engine._resetClient();
 
@@ -1148,20 +1153,24 @@ add_task(async function test_keep_cleare
 
   let deviceBID = Utils.makeGUID();
   let deviceCID = Utils.makeGUID();
   server.insertWBO("foo", "clients", new ServerWBO(engine.localID, encryptPayload({
     id: engine.localID,
     name: "Device A",
     type: "desktop",
     commands: [{
-      command: "displayURI", args: ["https://deviceblink.com", deviceBID, "Device B link"]
+      command: "displayURI",
+      args: ["https://deviceblink.com", deviceBID, "Device B link"],
+      flowID: Utils.makeGUID(),
     },
     {
-      command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+      command: "displayURI",
+      args: ["https://deviceclink.com", deviceCID, "Device C link"],
+      flowID: Utils.makeGUID(),
     }],
     version: "48",
     protocols: ["1.5"],
   }), now - 10));
   server.insertWBO("foo", "clients", new ServerWBO(deviceBID, encryptPayload({
     id: deviceBID,
     name: "Device B",
     type: "desktop",
@@ -1190,36 +1199,42 @@ add_task(async function test_keep_cleare
     let commandsProcessed = 0;
     engine._handleDisplayURIs = (uris) => { commandsProcessed = uris.length };
 
     engine._sync();
     engine.processIncomingCommands(); // Not called by the engine.sync(), gotta call it ourselves
     equal(commandsProcessed, 2, "We processed 2 commands");
 
     let localRemoteRecord = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
-    deepEqual(localRemoteRecord.commands, [{
+    compareCommands(localRemoteRecord.commands, [{
       command: "displayURI", args: ["https://deviceblink.com", deviceBID, "Device B link"]
     },
     {
       command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
     }], "Should be the same because the upload failed");
 
     // Another client sends another link
     server.insertWBO("foo", "clients", new ServerWBO(engine.localID, encryptPayload({
       id: engine.localID,
       name: "Device A",
       type: "desktop",
       commands: [{
-        command: "displayURI", args: ["https://deviceblink.com", deviceBID, "Device B link"]
+        command: "displayURI",
+        args: ["https://deviceblink.com", deviceBID, "Device B link"],
+        flowID: Utils.makeGUID(),
       },
       {
-        command: "displayURI", args: ["https://deviceclink.com", deviceCID, "Device C link"]
+        command: "displayURI",
+        args: ["https://deviceclink.com", deviceCID, "Device C link"],
+        flowID: Utils.makeGUID(),
       },
       {
-        command: "displayURI", args: ["https://deviceclink2.com", deviceCID, "Device C link 2"]
+        command: "displayURI",
+        args: ["https://deviceclink2.com", deviceCID, "Device C link 2"],
+        flowID: Utils.makeGUID(),
       }],
       version: "48",
       protocols: ["1.5"],
     }), now - 10));
 
     // Simulate reboot
     SyncEngine.prototype._uploadOutgoing = oldUploadOutgoing;
     engine = Service.clientsEngine = new ClientEngine(Service);
@@ -1297,17 +1312,17 @@ add_task(async function test_deleted_com
     _("Broadcast a command to all clients");
     engine.sendCommand("logout", []);
     engine._sync();
 
     deepEqual(collection.keys().sort(), [activeID, engine.localID].sort(),
       "Should not reupload deleted clients");
 
     let activePayload = JSON.parse(JSON.parse(collection.payload(activeID)).ciphertext);
-    deepEqual(activePayload.commands, [{ command: "logout", args: [] }],
+    compareCommands(activePayload.commands, [{ command: "logout", args: [] }],
       "Should send the command to the active client");
   } finally {
     Svc.Prefs.resetBranch("");
     Service.recordManager.clearCache();
     engine._resetClient();
 
     try {
       server.deleteCollections("foo");
@@ -1341,28 +1356,29 @@ add_task(async function test_send_uri_ac
     let collection = server.getCollection("foo", "clients");
     let ourPayload = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
     ok(ourPayload, "Should upload our client record");
 
     _("Send a URL to the device on the server");
     ourPayload.commands = [{
       command: "displayURI",
       args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
+      flowID: Utils.makeGUID(),
     }];
     server.insertWBO("foo", "clients", new ServerWBO(engine.localID, encryptPayload(ourPayload), now));
 
     _("Sync again");
     engine._sync();
-    deepEqual(engine.localCommands, [{
+    compareCommands(engine.localCommands, [{
       command: "displayURI",
       args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
     }], "Should receive incoming URI");
     ok(engine.processIncomingCommands(), "Should process incoming commands");
     const clearedCommands = engine._readCommands()[engine.localID];
-    deepEqual(clearedCommands, [{
+    compareCommands(clearedCommands, [{
       command: "displayURI",
       args: ["https://example.com", fakeSenderID, "Yak Herders Anonymous"],
     }], "Should mark the commands as cleared after processing");
 
     _("Check that the command was removed on the server");
     engine._sync();
     ourPayload = JSON.parse(JSON.parse(collection.payload(engine.localID)).ciphertext);
     ok(ourPayload, "Should upload the synced client record");