Bug 1528622 - Debounce FxA Send Tab commands. r=markh,rfkelly
authorEdouard Oger <eoger@fastmail.com>
Tue, 12 Mar 2019 01:05:03 +0000
changeset 521469 fddb40ccfe5a
parent 521468 29b1d70e8032
child 521470 ee1ceae66888
push id10866
push usernerli@mozilla.com
push dateTue, 12 Mar 2019 18:59:09 +0000
treeherdermozilla-beta@445c24a51727 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersmarkh, rfkelly
bugs1528622
milestone67.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1528622 - Debounce FxA Send Tab commands. r=markh,rfkelly Differential Revision: https://phabricator.services.mozilla.com/D21286
browser/base/content/browser-sync.js
browser/components/BrowserGlue.jsm
services/fxaccounts/FxAccountsCommands.js
services/fxaccounts/FxAccountsPush.jsm
services/fxaccounts/tests/xpcshell/test_commands.js
services/fxaccounts/tests/xpcshell/test_push_service.js
services/sync/modules/policies.js
--- a/browser/base/content/browser-sync.js
+++ b/browser/base/content/browser-sync.js
@@ -640,17 +640,17 @@ var gSync = {
     }
     const state = UIState.get();
     if (state.status == UIState.STATUS_SIGNED_IN) {
       this.updateSyncStatus({ syncing: true });
       Services.tm.dispatchToMainThread(() => {
         // We are pretty confident that push helps us pick up all FxA commands,
         // but some users might have issues with push, so let's unblock them
         // by fetching the missed FxA commands on manual sync.
-        fxAccounts.commands.fetchMissedRemoteCommands().catch(e => {
+        fxAccounts.commands.pollDeviceCommands().catch(e => {
           console.error("Fetching missed remote commands failed.", e);
         });
         Weave.Service.sync();
       });
     }
   },
 
   openPrefs(entryPoint = "syncbutton", origin = undefined) {
--- a/browser/components/BrowserGlue.jsm
+++ b/browser/components/BrowserGlue.jsm
@@ -2855,18 +2855,18 @@ BrowserGlue.prototype = {
         }
         tab.setAttribute("attention", true);
         return tab;
       };
 
       const firstTab = await openTab(URIs[0]);
       await Promise.all(URIs.slice(1).map(URI => openTab(URI)));
 
+      const deviceName = URIs[0].sender && URIs[0].sender.name;
       let title, body;
-      const deviceName = URIs[0].sender.name;
       const bundle = Services.strings.createBundle("chrome://browser/locale/accounts.properties");
       if (URIs.length == 1) {
         // Due to bug 1305895, tabs from iOS may not have device information, so
         // we have separate strings to handle those cases. (See Also
         // unnamedTabsArrivingNotificationNoDevice.body below)
         if (deviceName) {
           title = bundle.formatStringFromName("tabArrivingNotificationWithDevice.title", [deviceName], 1);
         } else {
@@ -2880,23 +2880,25 @@ BrowserGlue.prototype = {
         if (win.gURLBar) {
           body = win.gURLBar.trimValue(body);
         }
         if (wasTruncated) {
           body = bundle.formatStringFromName("singleTabArrivingWithTruncatedURL.body", [body], 1);
         }
       } else {
         title = bundle.GetStringFromName("multipleTabsArrivingNotification.title");
-        const allSameDevice = URIs.every(URI => URI.sender.id == URIs[0].sender.id);
-        const unknownDevice = allSameDevice && !deviceName;
+        const allKnownSender = URIs.every(URI => URI.sender != null);
+        const allSameDevice = allKnownSender && URIs.every(URI => URI.sender.id == URIs[0].sender.id);
         let tabArrivingBody;
-        if (unknownDevice) {
-          tabArrivingBody = "unnamedTabsArrivingNotificationNoDevice.body";
-        } else if (allSameDevice) {
-          tabArrivingBody = "unnamedTabsArrivingNotification2.body";
+        if (allSameDevice) {
+          if (deviceName) {
+            tabArrivingBody = "unnamedTabsArrivingNotification2.body";
+          } else {
+            tabArrivingBody = "unnamedTabsArrivingNotificationNoDevice.body";
+          }
         } else {
           tabArrivingBody = "unnamedTabsArrivingNotificationMultiple2.body";
         }
 
         body = bundle.GetStringFromName(tabArrivingBody);
         body = PluralForm.get(URIs.length, body);
         body = body.replace("#1", URIs.length);
         body = body.replace("#2", deviceName);
--- a/services/fxaccounts/FxAccountsCommands.js
+++ b/services/fxaccounts/FxAccountsCommands.js
@@ -33,72 +33,62 @@ class FxAccountsCommands {
     if (!sessionToken) {
       throw new Error("_send called without a session token.");
     }
     const client = this._fxAccounts.getAccountsClient();
     await client.invokeCommand(sessionToken, command, device.id, payload);
     log.info(`Payload sent to device ${device.id}.`);
   }
 
-  async consumeRemoteCommand(index) {
+  /**
+   * Poll and handle device commands for the current device.
+   * This method can be called either in response to a Push message,
+   * or by itself as a "commands recovery" mechanism.
+   *
+   * @param {Number} receivedIndex "Command received" push messages include
+   * the index of the command that triggered the message. We use it as a
+   * hint when we have no "last command index" stored.
+   */
+  async pollDeviceCommands(receivedIndex = 0) {
+    // Whether the call to `pollDeviceCommands` was initiated by a Push message from the FxA
+    // servers in response to a message being received or simply scheduled in order
+    // to fetch missed messages.
+    const scheduledFetch = receivedIndex == 0;
     if (!Services.prefs.getBoolPref("identity.fxaccounts.commands.enabled", true)) {
       return false;
     }
-    log.info(`Consuming command with index ${index}.`);
-    const {messages} = await this._fetchRemoteCommands(index, 1);
-    if (messages.length != 1) {
-      log.warn(`Should have retrieved 1 and only 1 message, got ${messages.length}.`);
-    }
-    return this._fxAccounts._withCurrentAccountState(async (getUserData, updateUserData) => {
-      const {device} = await getUserData(["device"]);
-      if (!device) {
-        throw new Error("No device registration.");
-      }
-      const handledCommands = (device.handledCommands || []).concat(messages.map(m => m.index));
-      await updateUserData({
-        device: {...device, handledCommands},
-      });
-      await this._handleCommands(messages);
-
-      // Once the handledCommands array length passes a threshold, check the
-      // potentially missed remote commands in order to clear it.
-      if (handledCommands.length > 20) {
-        await this.fetchMissedRemoteCommands();
-      }
-    });
-  }
-
-  async fetchMissedRemoteCommands() {
-    if (!Services.prefs.getBoolPref("identity.fxaccounts.commands.enabled", true)) {
-      return false;
-    }
-    log.info(`Consuming missed commands.`);
+    log.info(`Polling device commands.`);
     await this._fxAccounts._withCurrentAccountState(async (getUserData, updateUserData) => {
       const {device} = await getUserData(["device"]);
       if (!device) {
         throw new Error("No device registration.");
       }
-      const lastCommandIndex = device.lastCommandIndex || 0;
-      const handledCommands = device.handledCommands || [];
-      handledCommands.push(lastCommandIndex); // Because the server also returns this command.
-      const {index, messages} = await this._fetchRemoteCommands(lastCommandIndex);
-      const missedMessages = messages.filter(m => !handledCommands.includes(m.index));
-      await updateUserData({
-        device: {...device, lastCommandIndex: index, handledCommands: []},
-      });
-      if (missedMessages.length) {
-        log.info(`Handling ${missedMessages.length} missed messages`);
-        Services.telemetry.scalarAdd("identity.fxaccounts.missed_commands_fetched", missedMessages.length);
-        await this._handleCommands(missedMessages);
+      // We increment lastCommandIndex by 1 because the server response includes the current index.
+      // If we don't have a `lastCommandIndex` stored, we fall back on the index from the push message we just got.
+      const lastCommandIndex = (device.lastCommandIndex + 1) || receivedIndex;
+      // We have already received this message before.
+      if (receivedIndex > 0 && receivedIndex < lastCommandIndex) {
+        return;
+      }
+      const {index, messages} = await this._fetchDeviceCommands(lastCommandIndex);
+      if (messages.length) {
+        await updateUserData({
+          device: {...device, lastCommandIndex: index},
+        });
+        log.info(`Handling ${messages.length} messages`);
+        if (scheduledFetch) {
+          Services.telemetry.scalarAdd("identity.fxaccounts.missed_commands_fetched", messages.length);
+        }
+        await this._handleCommands(messages);
       }
     });
     return true;
   }
 
-  async _fetchRemoteCommands(index, limit = null) {
+  async _fetchDeviceCommands(index, limit = null) {
     const userData = await this._fxAccounts.getSignedInUser();
     if (!userData) {
       throw new Error("No user.");
     }
     const {sessionToken} = userData;
     if (!sessionToken) {
       throw new Error("No session token.");
     }
@@ -107,33 +97,41 @@ class FxAccountsCommands {
     if (limit != null) {
       opts.limit = limit;
     }
     return client.getCommands(sessionToken, opts);
   }
 
   async _handleCommands(messages) {
     const fxaDevices = await this._fxAccounts.getDeviceList();
+    // We debounce multiple incoming tabs so we show a single notification.
+    const tabsReceived = [];
     for (const {data} of messages) {
-      let {command, payload, sender} = data;
-      if (sender) {
-        sender = fxaDevices.find(d => d.id == sender);
+      const {command, payload, sender: senderId} = data;
+      const sender = senderId ? fxaDevices.find(d => d.id == senderId) : null;
+      if (!sender) {
+        log.warn("Incoming command is from an unknown device (maybe disconnected?)");
       }
       switch (command) {
         case COMMAND_SENDTAB:
           try {
-            await this.sendTab.handle(sender, payload);
+            const {title, uri} = await this.sendTab.handle(payload);
+            log.info(`Tab received with FxA commands: ${title} from ${sender ? sender.name : "Unknown device"}.`);
+            tabsReceived.push({title, uri, sender});
           } catch (e) {
             log.error(`Error while handling incoming Send Tab payload.`, e);
           }
           break;
         default:
           log.info(`Unknown command: ${command}.`);
       }
     }
+    if (tabsReceived.length) {
+      Observers.notify("fxaccounts:commands:open-uri", tabsReceived);
+    }
   }
 }
 
 /**
  * Send Tab is built on top of FxA commands.
  *
  * Devices exchange keys wrapped in kSync between themselves (getEncryptedKey)
  * during the device registration flow. The FxA server can theorically never
@@ -179,32 +177,27 @@ class SendTab {
 
   // Returns true if the target device is compatible with FxA Commands Send tab.
   isDeviceCompatible(device) {
     return Services.prefs.getBoolPref("identity.fxaccounts.commands.enabled", true) &&
            device.availableCommands && device.availableCommands[COMMAND_SENDTAB];
   }
 
   // Handle incoming send tab payload, called by FxAccountsCommands.
-  async handle(sender, {encrypted}) {
-    if (!sender) {
-      log.warn("Incoming tab is from an unknown device (maybe disconnected?)");
-    }
+  async handle({encrypted}) {
     const bytes = await this._decrypt(encrypted);
     const decoder = new TextDecoder("utf8");
     const data = JSON.parse(decoder.decode(bytes));
     const current = data.hasOwnProperty("current") ? data.current :
                                                      data.entries.length - 1;
-    const tabSender = {
-      id: sender ? sender.id : "",
-      name: sender ? sender.name : "",
+    const {title, url: uri} = data.entries[current];
+    return {
+      title,
+      uri,
     };
-    const {title, url: uri} = data.entries[current];
-    log.info(`Tab received with FxA commands: ${title} from ${tabSender.name}.`);
-    Observers.notify("fxaccounts:commands:open-uri", [{uri, title, sender: tabSender}]);
   }
 
   async _encrypt(bytes, device) {
     let bundle = device.availableCommands[COMMAND_SENDTAB];
     if (!bundle) {
       throw new Error(`Device ${device.id} does not have send tab keys.`);
     }
     const {kSync, kXCS: ourKid} = await this._fxAccounts.getKeys();
--- a/services/fxaccounts/FxAccountsPush.jsm
+++ b/services/fxaccounts/FxAccountsPush.jsm
@@ -1,13 +1,14 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 const {Services} = ChromeUtils.import("resource://gre/modules/Services.jsm");
+const {Async} = ChromeUtils.import("resource://services-common/async.js");
 const {FXA_PUSH_SCOPE_ACCOUNT_UPDATE, ONLOGOUT_NOTIFICATION, ON_ACCOUNT_DESTROYED_NOTIFICATION, ON_ACCOUNT_STATE_CHANGE_NOTIFICATION, ON_COLLECTION_CHANGED_NOTIFICATION, ON_COMMAND_RECEIVED_NOTIFICATION, ON_DEVICE_CONNECTED_NOTIFICATION, ON_DEVICE_DISCONNECTED_NOTIFICATION, ON_PASSWORD_CHANGED_NOTIFICATION, ON_PASSWORD_RESET_NOTIFICATION, ON_PROFILE_CHANGE_NOTIFICATION, ON_PROFILE_UPDATED_NOTIFICATION, ON_VERIFY_LOGIN_NOTIFICATION, log} = ChromeUtils.import("resource://gre/modules/FxAccountsCommon.js");
 
 /**
  * FxAccountsPushService manages Push notifications for Firefox Accounts in the browser
  *
  * @param [options]
  *        Object, custom options that used for testing
  * @constructor
@@ -66,20 +67,27 @@ FxAccountsPushService.prototype = {
 
     if (options.fxAccounts) {
       this.fxAccounts = options.fxAccounts;
     } else {
       ChromeUtils.defineModuleGetter(this, "fxAccounts",
         "resource://gre/modules/FxAccounts.jsm");
     }
 
-    // listen to new push messages, push changes and logout events
-    Services.obs.addObserver(this, this.pushService.pushTopic);
-    Services.obs.addObserver(this, this.pushService.subscriptionChangeTopic);
-    Services.obs.addObserver(this, ONLOGOUT_NOTIFICATION);
+    this.asyncObserver = Async.asyncObserver(this, this.log);
+    // We use an async observer because a device waking up can
+    // observe multiple "Send Tab received" push notifications at the same time.
+    // The way these notifications are handled is as follows:
+    // Read index from storage, make network request, update the index.
+    // You can imagine what happens when multiple calls race: we load
+    // the same index multiple times and receive the same exact tabs, multiple times.
+    // The async observer will ensure we make these network requests serially.
+    Services.obs.addObserver(this.asyncObserver, this.pushService.pushTopic);
+    Services.obs.addObserver(this.asyncObserver, this.pushService.subscriptionChangeTopic);
+    Services.obs.addObserver(this.asyncObserver, ONLOGOUT_NOTIFICATION);
 
     this.log.debug("FxAccountsPush initialized");
     return true;
   },
   /**
    * Registers a new endpoint with the Push Server
    *
    * @returns {Promise}
@@ -98,73 +106,72 @@ FxAccountsPushService.prototype = {
           } else {
             this.log.warn("FxAccountsPush failed to subscribe", result);
             resolve(null);
           }
         });
     });
   },
   /**
-   * Standard observer interface to listen to push messages, changes and logout.
+   * Async observer interface to listen to push messages, changes and logout.
    *
    * @param subject
    * @param topic
    * @param data
    * @returns {Promise}
    */
-  _observe(subject, topic, data) {
-    this.log.trace(`observed topic=${topic}, data=${data}, subject=${subject}`);
-    switch (topic) {
-      case this.pushService.pushTopic:
-        if (data === FXA_PUSH_SCOPE_ACCOUNT_UPDATE) {
-          let message = subject.QueryInterface(Ci.nsIPushMessage);
-          this._onPushMessage(message);
-        }
-        break;
-      case this.pushService.subscriptionChangeTopic:
-        if (data === FXA_PUSH_SCOPE_ACCOUNT_UPDATE) {
-          this._onPushSubscriptionChange();
-        }
-        break;
-      case ONLOGOUT_NOTIFICATION:
-        // user signed out, we need to stop polling the Push Server
-        this.unsubscribe().catch(err => {
-          this.log.error("Error during unsubscribe", err);
-        });
-      default:
-        break;
+  async observe(subject, topic, data) {
+    try {
+      this.log.trace(`observed topic=${topic}, data=${data}, subject=${subject}`);
+      switch (topic) {
+        case this.pushService.pushTopic:
+          if (data === FXA_PUSH_SCOPE_ACCOUNT_UPDATE) {
+            let message = subject.QueryInterface(Ci.nsIPushMessage);
+            await this._onPushMessage(message);
+          }
+          break;
+        case this.pushService.subscriptionChangeTopic:
+          if (data === FXA_PUSH_SCOPE_ACCOUNT_UPDATE) {
+            await this._onPushSubscriptionChange();
+          }
+          break;
+        case ONLOGOUT_NOTIFICATION:
+          // user signed out, we need to stop polling the Push Server
+          try {
+            await this.unsubscribe();
+          } catch (err) {
+            this.log.error("Error during unsubscribe", err);
+          }
+        default:
+          break;
+      }
+    } catch (err) {
+      this.log.error(err);
     }
   },
-  /**
-   * Wrapper around _observe that catches errors
-   */
-  observe(subject, topic, data) {
-    Promise.resolve()
-      .then(() => this._observe(subject, topic, data))
-      .catch(err => this.log.error(err));
-  },
+
   /**
    * Fired when the Push server sends a notification.
    *
    * @private
    * @returns {Promise}
    */
-  _onPushMessage(message) {
+  async _onPushMessage(message) {
     this.log.trace("FxAccountsPushService _onPushMessage");
     if (!message.data) {
       // Use the empty signal to check the verification state of the account right away
       this.log.debug("empty push message - checking account status");
       this.fxAccounts.checkVerificationStatus();
       return;
     }
     let payload = message.data.json();
     this.log.debug(`push command: ${payload.command}`);
     switch (payload.command) {
       case ON_COMMAND_RECEIVED_NOTIFICATION:
-        this.fxAccounts.commands.consumeRemoteCommand(payload.data.index);
+        await this.fxAccounts.commands.pollDeviceCommands(payload.data.index);
         break;
       case ON_DEVICE_CONNECTED_NOTIFICATION:
         Services.obs.notifyObservers(null, ON_DEVICE_CONNECTED_NOTIFICATION, payload.data.deviceName);
         break;
       case ON_DEVICE_DISCONNECTED_NOTIFICATION:
         this.fxAccounts.handleDeviceDisconnection(payload.data.id);
         return;
       case ON_PROFILE_UPDATED_NOTIFICATION:
--- a/services/fxaccounts/tests/xpcshell/test_commands.js
+++ b/services/fxaccounts/tests/xpcshell/test_commands.js
@@ -45,48 +45,205 @@ add_task(async function test_sendtab_sen
   Assert.equal(report.succeeded[0].name, "Device 3");
   Assert.equal(report.failed[0].device.name, "Device 1");
   Assert.equal(report.failed[0].error.message, "Invoke error!");
   Assert.equal(report.failed[1].device.name, "Device 2");
   Assert.equal(report.failed[1].error.message, "Encrypt error!");
   Assert.ok(commands.invoke.calledTwice);
 });
 
-add_task(async function test_commands_fetchMissedRemoteCommands() {
+add_task(async function test_commands_pollDeviceCommands_push() {
+  // Server state.
+  const remoteMessages = [
+    {
+      index: 11,
+      data: {},
+    },
+    {
+      index: 12,
+      data: {},
+    },
+  ];
+  const remoteIndex = 12;
+
+  // Local state.
+  const pushIndexReceived = 11;
   const accountState = {
     data: {
       device: {
-        handledCommands: [8, 9, 10, 11],
-        lastCommandIndex: 11,
+        lastCommandIndex: 10,
       },
     },
   };
+
+  const fxAccounts = {
+    async _withCurrentAccountState(cb) {
+      const get = () => accountState.data;
+      const set = (val) => { accountState.data = val; };
+      await cb(get, set);
+    },
+  };
+  const commands = new FxAccountsCommands(fxAccounts);
+  const mockCommands = sinon.mock(commands);
+  mockCommands.expects("_fetchDeviceCommands").once().withArgs(11).returns({
+    index: remoteIndex,
+    messages: remoteMessages,
+  });
+  mockCommands.expects("_handleCommands").once().withArgs(remoteMessages);
+  await commands.pollDeviceCommands(pushIndexReceived);
+
+  mockCommands.verify();
+  Assert.equal(accountState.data.device.lastCommandIndex, 12);
+});
+
+add_task(async function test_commands_pollDeviceCommands_push_already_fetched() {
+  // Local state.
+  const pushIndexReceived = 12;
+  const accountState = {
+    data: {
+      device: {
+        lastCommandIndex: 12,
+      },
+    },
+  };
+
   const fxAccounts = {
     async _withCurrentAccountState(cb) {
       const get = () => accountState.data;
       const set = (val) => { accountState.data = val; };
       await cb(get, set);
     },
   };
   const commands = new FxAccountsCommands(fxAccounts);
-  commands._fetchRemoteCommands = () => {
-    return {
+  const mockCommands = sinon.mock(commands);
+  mockCommands.expects("_fetchDeviceCommands").never();
+  mockCommands.expects("_handleCommands").never();
+  await commands.pollDeviceCommands(pushIndexReceived);
+
+  mockCommands.verify();
+  Assert.equal(accountState.data.device.lastCommandIndex, 12);
+});
+
+add_task(async function test_commands_pollDeviceCommands_push_local_state_empty() {
+  // Server state.
+  const remoteMessages = [
+    {
+      index: 11,
+      data: {},
+    },
+    {
+      index: 12,
+      data: {},
+    },
+  ];
+  const remoteIndex = 12;
+
+  // Local state.
+  const pushIndexReceived = 11;
+  const accountState = {
+    data: {
+      device: {},
+    },
+  };
+
+  const fxAccounts = {
+    async _withCurrentAccountState(cb) {
+      const get = () => accountState.data;
+      const set = (val) => { accountState.data = val; };
+      await cb(get, set);
+    },
+  };
+  const commands = new FxAccountsCommands(fxAccounts);
+  const mockCommands = sinon.mock(commands);
+  mockCommands.expects("_fetchDeviceCommands").once().withArgs(11).returns({
+    index: remoteIndex,
+    messages: remoteMessages,
+  });
+  mockCommands.expects("_handleCommands").once().withArgs(remoteMessages);
+  await commands.pollDeviceCommands(pushIndexReceived);
+
+  mockCommands.verify();
+  Assert.equal(accountState.data.device.lastCommandIndex, 12);
+});
+
+add_task(async function test_commands_pollDeviceCommands_scheduled_local() {
+  // Server state.
+  const remoteMessages = [
+    {
+      index: 11,
+      data: {},
+    },
+    {
       index: 12,
-      messages: [
-        {
-          index: 11,
-          data: {},
-        },
-        {
-          index: 12,
-          data: {},
-        },
-      ],
-    };
+      data: {},
+    },
+  ];
+  const remoteIndex = 12;
+
+  // Local state.
+  const accountState = {
+    data: {
+      device: {
+        lastCommandIndex: 10,
+      },
+    },
+  };
+
+  const fxAccounts = {
+    async _withCurrentAccountState(cb) {
+      const get = () => accountState.data;
+      const set = (val) => { accountState.data = val; };
+      await cb(get, set);
+    },
   };
-  commands._handleCommands = sinon.spy();
-  await commands.fetchMissedRemoteCommands();
+  const commands = new FxAccountsCommands(fxAccounts);
+  const mockCommands = sinon.mock(commands);
+  mockCommands.expects("_fetchDeviceCommands").once().withArgs(11).returns({
+    index: remoteIndex,
+    messages: remoteMessages,
+  });
+  mockCommands.expects("_handleCommands").once().withArgs(remoteMessages);
+  await commands.pollDeviceCommands();
+
+  mockCommands.verify();
+  Assert.equal(accountState.data.device.lastCommandIndex, 12);
+});
+
+add_task(async function test_commands_pollDeviceCommands_scheduled_local_state_empty() {
+  // Server state.
+  const remoteMessages = [
+    {
+      index: 11,
+      data: {},
+    },
+    {
+      index: 12,
+      data: {},
+    },
+  ];
+  const remoteIndex = 12;
 
-  Assert.equal(accountState.data.device.handledCommands.length, 0);
+  // Local state.
+  const accountState = {
+    data: {
+      device: {},
+    },
+  };
+
+  const fxAccounts = {
+    async _withCurrentAccountState(cb) {
+      const get = () => accountState.data;
+      const set = (val) => { accountState.data = val; };
+      await cb(get, set);
+    },
+  };
+  const commands = new FxAccountsCommands(fxAccounts);
+  const mockCommands = sinon.mock(commands);
+  mockCommands.expects("_fetchDeviceCommands").once().withArgs(0).returns({
+    index: remoteIndex,
+    messages: remoteMessages,
+  });
+  mockCommands.expects("_handleCommands").once().withArgs(remoteMessages);
+  await commands.pollDeviceCommands();
+
+  mockCommands.verify();
   Assert.equal(accountState.data.device.lastCommandIndex, 12);
-  const callArgs = commands._handleCommands.args[0][0];
-  Assert.equal(callArgs[0].index, 12);
 });
--- a/services/fxaccounts/tests/xpcshell/test_push_service.js
+++ b/services/fxaccounts/tests/xpcshell/test_push_service.js
@@ -421,17 +421,17 @@ add_task(async function commandReceived(
     QueryInterface() {
       return this;
     },
   };
 
   let fxAccountsMock = {};
   const promiseConsumeRemoteMessagesCalled = new Promise(res => {
     fxAccountsMock.commands = {
-      consumeRemoteCommand() {
+      pollDeviceCommands() {
         res();
       },
     };
   });
 
   let pushService = new FxAccountsPushService({
     pushService: mockPushService,
     fxAccounts: fxAccountsMock,
--- a/services/sync/modules/policies.js
+++ b/services/sync/modules/policies.js
@@ -545,17 +545,17 @@ SyncScheduler.prototype = {
       return;
     }
     Services.tm.dispatchToMainThread(() => {
       this.service.sync({engines, why});
       const now = Math.round(new Date().getTime() / 1000);
       // Only fetch missed messages in a "scheduled" sync so we don't race against
       // the Push service reconnecting on a network link change for example.
       if (why == "schedule" && now >= this.missedFxACommandsLastFetch + this.missedFxACommandsFetchInterval) {
-        fxAccounts.commands.fetchMissedRemoteCommands().then(() => {
+        fxAccounts.commands.pollDeviceCommands().then(() => {
           this.missedFxACommandsLastFetch = now;
         }).catch(e => {
           this._log.error("Fetching missed remote commands failed.", e);
         });
       }
     });
   },