Backed out 3 changesets (bug 1368209)xpcshell failures services/sync/tests/unit/test_syncengine_sync.js on CLOSED TREE
authorNARCIS BELEUZU <nbeleuzu@mozilla.com>
Fri, 03 Nov 2017 11:40:06 +0200
changeset 443236 f7206a54b9aaa37e811b91d8bb07f0901280bac5
parent 443235 bbbd75cd12a945396d21f2000f5a9d4bc09602bb
child 443237 37ceb314b2b3ae5c629a028dd5ac46c222015a56
push id1618
push userCallek@gmail.com
push dateThu, 11 Jan 2018 17:45:48 +0000
treeherdermozilla-release@882ca853e05a [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
bugs1368209
milestone58.0a1
backs out1b868efa368f0e8f2caa4df8568fca04f756a00e
b88c681ccdc1614c3589730e72ac37ff342d22a7
4b8e56844ae98e4129f567c23491f34d4f6ce708
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Backed out 3 changesets (bug 1368209)xpcshell failures services/sync/tests/unit/test_syncengine_sync.js on CLOSED TREE Backed out changeset 1b868efa368f (bug 1368209) Backed out changeset b88c681ccdc1 (bug 1368209) Backed out changeset 4b8e56844ae9 (bug 1368209)
services/sync/modules/constants.js
services/sync/modules/engines.js
services/sync/modules/engines/forms.js
services/sync/modules/engines/history.js
services/sync/modules/engines/passwords.js
services/sync/modules/record.js
services/sync/tests/unit/head_http_server.js
services/sync/tests/unit/test_history_engine.js
services/sync/tests/unit/test_syncengine_sync.js
services/sync/tests/unit/xpcshell.ini
testing/modules/TestUtils.jsm
toolkit/components/places/PlacesSyncUtils.jsm
--- a/services/sync/modules/constants.js
+++ b/services/sync/modules/constants.js
@@ -49,16 +49,22 @@ HMAC_EVENT_INTERVAL:                   6
 MASTER_PASSWORD_LOCKED_RETRY_INTERVAL: 15 * 60 * 1000,   // 15 minutes
 
 // 50 is hardcoded here because of URL length restrictions.
 // (GUIDs can be up to 64 chars long.)
 // Individual engines can set different values for their limit if their
 // identifiers are shorter.
 DEFAULT_GUID_FETCH_BATCH_SIZE:         50,
 
+// Default batch size for applying incoming records.
+DEFAULT_STORE_BATCH_SIZE:              1,
+HISTORY_STORE_BATCH_SIZE:              50,
+FORMS_STORE_BATCH_SIZE:                50,
+PASSWORDS_STORE_BATCH_SIZE:            50,
+
 // Default batch size for download batching
 // (how many records are fetched at a time from the server when batching is used).
 DEFAULT_DOWNLOAD_BATCH_SIZE:           1000,
 
 // score thresholds for early syncs
 SINGLE_USER_THRESHOLD:                 1000,
 MULTI_DEVICE_THRESHOLD:                300,
 
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -19,21 +19,20 @@ Cu.import("resource://gre/modules/Log.js
 Cu.import("resource://services-common/async.js");
 Cu.import("resource://services-common/observers.js");
 Cu.import("resource://services-common/utils.js");
 Cu.import("resource://services-sync/constants.js");
 Cu.import("resource://services-sync/record.js");
 Cu.import("resource://services-sync/resource.js");
 Cu.import("resource://services-sync/util.js");
 
-XPCOMUtils.defineLazyModuleGetters(this, {
-  fxAccounts: "resource://gre/modules/FxAccounts.jsm",
-  OS: "resource://gre/modules/osfile.jsm",
-  PlacesSyncUtils: "resource://gre/modules/PlacesSyncUtils.jsm",
-});
+XPCOMUtils.defineLazyModuleGetter(this, "fxAccounts",
+  "resource://gre/modules/FxAccounts.jsm");
+XPCOMUtils.defineLazyModuleGetter(this, "OS",
+                                  "resource://gre/modules/osfile.jsm");
 
 /*
  * Trackers are associated with a single engine and deal with
  * listening for changes to their particular data type.
  *
  * There are two things they keep track of:
  * 1) A score, indicating how urgently the engine wants to sync
  * 2) A list of IDs for all the changed items that need to be synced
@@ -822,17 +821,18 @@ SyncEngine.prototype = {
   // How many records to pull in a single sync. This is primarily to avoid very
   // long first syncs against profiles with many history records.
   downloadLimit: null,
 
   // How many records to pull at one time when specifying IDs. This is to avoid
   // URI length limitations.
   guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
 
-  downloadBatchSize: DEFAULT_DOWNLOAD_BATCH_SIZE,
+  // How many records to process in a single batch.
+  applyIncomingBatchSize: DEFAULT_STORE_BATCH_SIZE,
 
   async initialize() {
     await this.loadToFetch();
     await this.loadPreviousFailed();
     this._log.debug("SyncEngine initialized", this.name);
   },
 
   get storageURL() {
@@ -1058,324 +1058,314 @@ SyncEngine.prototype = {
    * A tiny abstraction to make it easier to test incoming record
    * application.
    */
   itemSource() {
     return new Collection(this.engineURL, this._recordObj, this.service);
   },
 
   /**
-   * Download and apply remote records changed since the last sync. This
-   * happens in three stages.
-   *
-   * In the first stage, we fetch full records for all changed items, newest
-   * first, up to the download limit. The limit lets us make progress for large
-   * collections, where the sync is likely to be interrupted before we
-   * can fetch everything.
-   *
-   * In the second stage, we fetch the IDs of any remaining records changed
-   * since the last sync, add them to our backlog, and fast-forward our last
-   * sync time.
-   *
-   * In the third stage, we fetch and apply records for all backlogged IDs,
-   * as well as any records that failed to apply during the last sync. We
-   * request records for the IDs in chunks, to avoid exceeding URL length
-   * limits, then remove successfully applied records from the backlog, and
-   * record IDs of any records that failed to apply to retry on the next sync.
+   * Process incoming records.
+   * In the most awful and untestable way possible.
+   * This now accepts something that makes testing vaguely less impossible.
    */
-  async _processIncoming() {
+  async _processIncoming(newitems) {
     this._log.trace("Downloading & applying server changes");
 
-    let newitems = this.itemSource();
+    // Figure out how many total items to fetch this sync; do less on mobile.
+    let batchSize = this.downloadLimit || Infinity;
+
+    if (!newitems) {
+      newitems = this.itemSource();
+    }
+
+    if (this._defaultSort) {
+      newitems.sort = this._defaultSort;
+    }
 
     newitems.newer = this.lastSync;
     newitems.full  = true;
-
-    let downloadLimit = Infinity;
-    if (this.downloadLimit) {
-      // Fetch new records up to the download limit. Currently, only the history
-      // engine sets a limit, since the history collection has the highest volume
-      // of changed records between syncs. The other engines fetch all records
-      // changed since the last sync.
-      if (this._defaultSort) {
-        // A download limit with a sort order doesn't make sense: we won't know
-        // which records to backfill.
-        throw new Error("Can't specify download limit with default sort order");
-      }
-      newitems.sort = "newest";
-      downloadLimit = newitems.limit = this.downloadLimit;
-    } else if (this._defaultSort) {
-      // The bookmarks engine fetches records by sort index; other engines leave
-      // the order unspecified. We can remove `_defaultSort` entirely after bug
-      // 1305563: the sort index won't matter because we'll buffer all bookmarks
-      // before applying.
-      newitems.sort = this._defaultSort;
-    }
+    newitems.limit = batchSize;
 
     // applied    => number of items that should be applied.
     // failed     => number of items that failed in this sync.
     // newFailed  => number of items that failed for the first time in this sync.
     // reconciled => number of items that were reconciled.
     let count = {applied: 0, failed: 0, newFailed: 0, reconciled: 0};
-    let recordsToApply = [];
-    let failedInCurrentSync = [];
+    let handled = [];
+    let applyBatch = [];
+    let failed = [];
+    let failedInPreviousSync = this.previousFailed;
+    let fetchBatch = Utils.arrayUnion(this.toFetch, failedInPreviousSync);
+    // Reset previousFailed for each sync since previously failed items may not fail again.
+    this.previousFailed = [];
+
+    // Used (via exceptions) to allow the record handler/reconciliation/etc.
+    // methods to signal that they would like processing of incoming records to
+    // cease.
+    let aborting = undefined;
+
+    async function doApplyBatch() {
+      this._tracker.ignoreAll = true;
+      try {
+        failed = failed.concat((await this._store.applyIncomingBatch(applyBatch)));
+      } catch (ex) {
+        if (Async.isShutdownException(ex)) {
+          throw ex;
+        }
+        // Catch any error that escapes from applyIncomingBatch. At present
+        // those will all be abort events.
+        this._log.warn("Got exception, aborting processIncoming", ex);
+        aborting = ex;
+      }
+      this._tracker.ignoreAll = false;
+      applyBatch = [];
+    }
+
+    async function doApplyBatchAndPersistFailed() {
+      // Apply remaining batch.
+      if (applyBatch.length) {
+        await doApplyBatch.call(this);
+      }
+      // Persist failed items so we refetch them.
+      if (failed.length) {
+        this.previousFailed = Utils.arrayUnion(failed, this.previousFailed);
+        count.failed += failed.length;
+        this._log.debug("Records that failed to apply: " + failed);
+        failed = [];
+      }
+    }
+
+    let key = this.service.collectionKeys.keyForCollection(this.name);
+
+    // Not binding this method to 'this' for performance reasons. It gets
+    // called for every incoming record.
+    let self = this;
+
+    let recordHandler = async function(item) {
+      if (aborting) {
+        return;
+      }
+
+      // Grab a later last modified if possible
+      if (self.lastModified == null || item.modified > self.lastModified)
+        self.lastModified = item.modified;
+
+      // Track the collection for the WBO.
+      item.collection = self.name;
+
+      // Remember which records were processed
+      handled.push(item.id);
 
-    let oldestModified = this.lastModified;
-    let downloadedIDs = new Set();
+      try {
+        try {
+          await item.decrypt(key);
+        } catch (ex) {
+          if (!Utils.isHMACMismatch(ex)) {
+            throw ex;
+          }
+          let strategy = await self.handleHMACMismatch(item, true);
+          if (strategy == SyncEngine.kRecoveryStrategy.retry) {
+            // You only get one retry.
+            try {
+              // Try decrypting again, typically because we've got new keys.
+              self._log.info("Trying decrypt again...");
+              key = self.service.collectionKeys.keyForCollection(self.name);
+              await item.decrypt(key);
+              strategy = null;
+            } catch (ex) {
+              if (!Utils.isHMACMismatch(ex)) {
+                throw ex;
+              }
+              strategy = await self.handleHMACMismatch(item, false);
+            }
+          }
 
-    // Stage 1: Fetch new records from the server, up to the download limit.
+          switch (strategy) {
+            case null:
+              // Retry succeeded! No further handling.
+              break;
+            case SyncEngine.kRecoveryStrategy.retry:
+              self._log.debug("Ignoring second retry suggestion.");
+              // Fall through to error case.
+            case SyncEngine.kRecoveryStrategy.error:
+              self._log.warn("Error decrypting record", ex);
+              failed.push(item.id);
+              return;
+            case SyncEngine.kRecoveryStrategy.ignore:
+              self._log.debug("Ignoring record " + item.id +
+                              " with bad HMAC: already handled.");
+              return;
+          }
+        }
+      } catch (ex) {
+        if (Async.isShutdownException(ex)) {
+          throw ex;
+        }
+        self._log.warn("Error decrypting record", ex);
+        failed.push(item.id);
+        return;
+      }
+
+      if (self._shouldDeleteRemotely(item)) {
+        self._log.trace("Deleting item from server without applying", item);
+        self._deleteId(item.id);
+        return;
+      }
+
+      let shouldApply;
+      try {
+        shouldApply = await self._reconcile(item);
+      } catch (ex) {
+        if (ex.code == Engine.prototype.eEngineAbortApplyIncoming) {
+          self._log.warn("Reconciliation failed: aborting incoming processing.");
+          failed.push(item.id);
+          aborting = ex.cause;
+        } else if (!Async.isShutdownException(ex)) {
+          self._log.warn("Failed to reconcile incoming record " + item.id, ex);
+          failed.push(item.id);
+          return;
+        } else {
+          throw ex;
+        }
+      }
+
+      if (shouldApply) {
+        count.applied++;
+        applyBatch.push(item);
+      } else {
+        count.reconciled++;
+        self._log.trace("Skipping reconciled incoming item " + item.id);
+      }
+
+      if (applyBatch.length == self.applyIncomingBatchSize) {
+        await doApplyBatch.call(self);
+      }
+    };
+
+    // Only bother getting data from the server if there's new things
     if (this.lastModified == null || this.lastModified > this.lastSync) {
-      let { response, records } = await newitems.getBatched(this.downloadBatchSize);
+      let { response, records } = await newitems.getBatched();
       if (!response.success) {
         response.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw response;
       }
 
       let maybeYield = Async.jankYielder();
       for (let record of records) {
         await maybeYield();
-        downloadedIDs.add(record.id);
-
-        if (record.modified < oldestModified) {
-          oldestModified = record.modified;
-        }
+        await recordHandler(record);
+      }
+      await doApplyBatchAndPersistFailed.call(this);
 
-        let { shouldApply, error } = await this._maybeReconcile(record);
-        if (error) {
-          failedInCurrentSync.push(record.id);
-          count.failed++;
-          continue;
-        }
-        if (!shouldApply) {
-          count.reconciled++;
-          continue;
-        }
-        recordsToApply.push(record);
+      if (aborting) {
+        throw aborting;
       }
-
-      let failedToApply = await this._applyRecords(recordsToApply);
-      failedInCurrentSync.push(...failedToApply);
-
-      // `applied` is a bit of a misnomer: it counts records that *should* be
-      // applied, so it also includes records that we tried to apply and failed.
-      // `recordsToApply.length - failedToApply.length` is the number of records
-      // that we *successfully* applied.
-      count.failed += failedToApply.length;
-      count.applied += recordsToApply.length;
     }
 
-    // Stage 2: If we reached our download limit, we might still have records
-    // on the server that changed since the last sync. Fetch the IDs for the
-    // remaining records, and add them to the backlog. Note that this stage
-    // only runs for engines that set a download limit.
-    if (downloadedIDs.size == downloadLimit) {
-      let guidColl = this.itemSource();
+    // History: check if we got the maximum that we requested; get the rest if so.
+    if (handled.length == newitems.limit) {
+      // XXX - this block appears to have no test coverage (eg, throwing here,
+      // or commenting the entire block causes no tests to fail.)
+      // See bug 1368951 comment 3 for some insightful analysis of why this
+      // might not be doing what we expect anyway, so it may be the case that
+      // this needs both fixing *and* tests.
+      let guidColl = new Collection(this.engineURL, null, this.service);
 
+      // Sort and limit so that we only get the last X records.
+      guidColl.limit = this.downloadLimit;
       guidColl.newer = this.lastSync;
-      guidColl.older = oldestModified;
-      guidColl.sort  = "oldest";
+
+      // index: Orders by the sortindex descending (highest weight first).
+      guidColl.sort  = "index";
 
       let guids = await guidColl.get();
       if (!guids.success)
         throw guids;
 
-      // Filtering out already downloaded IDs here isn't necessary. We only do
-      // that in case the Sync server doesn't support `older` (bug 1316110).
-      let remainingIDs = guids.obj.filter(id => !downloadedIDs.has(id));
-      if (remainingIDs.length > 0) {
-        this.toFetch = Utils.arrayUnion(this.toFetch, remainingIDs);
+      // Figure out which guids weren't just fetched then remove any guids that
+      // were already waiting and prepend the new ones
+      let extra = Utils.arraySub(guids.obj, handled);
+      if (extra.length > 0) {
+        fetchBatch = Utils.arrayUnion(extra, fetchBatch);
+        this.toFetch = Utils.arrayUnion(extra, this.toFetch);
       }
     }
 
-    // Fast-foward the lastSync timestamp since we have backlogged the
-    // remaining items.
+    // Fast-foward the lastSync timestamp since we have stored the
+    // remaining items in toFetch.
     if (this.lastSync < this.lastModified) {
       this.lastSync = this.lastModified;
     }
 
-    // Stage 3: Backfill records from the backlog, and those that failed to
-    // decrypt or apply during the last sync. We only backfill up to the
-    // download limit, to prevent a large backlog for one engine from blocking
-    // the others. We'll keep processing the backlog on subsequent engine syncs.
-    let failedInPreviousSync = this.previousFailed;
-    let idsToBackfill = Utils.arrayUnion(this.toFetch.slice(0, downloadLimit),
-      failedInPreviousSync);
+    // Process any backlog of GUIDs.
+    // At this point we impose an upper limit on the number of items to fetch
+    // in a single request, even for desktop, to avoid hitting URI limits.
+    batchSize = this.guidFetchBatchSize;
 
-    // Note that we intentionally overwrite the previously failed list here.
-    // Records that fail to decrypt or apply in two consecutive syncs are likely
-    // corrupt; we remove them from the list because retrying and failing on
-    // every subsequent sync just adds noise.
-    this.previousFailed = failedInCurrentSync;
-
-    let backfilledItems = this.itemSource();
+    while (fetchBatch.length && !aborting) {
+      // Reuse the original query, but get rid of the restricting params
+      // and batch remaining records.
+      newitems.limit = 0;
+      newitems.newer = 0;
+      newitems.ids = fetchBatch.slice(0, batchSize);
 
-    backfilledItems.sort = "newest";
-    backfilledItems.full = true;
-
-    // `get` includes the list of IDs as a query parameter, so we need to fetch
-    // records in chunks to avoid exceeding URI length limits.
-    for (let ids of PlacesSyncUtils.chunkArray(idsToBackfill, this.guidFetchBatchSize)) {
-      backfilledItems.ids = ids;
-
-      let resp = await backfilledItems.get();
+      let resp = await newitems.get();
       if (!resp.success) {
         resp.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw resp;
       }
 
       let maybeYield = Async.jankYielder();
-      let backfilledRecordsToApply = [];
-      let failedInBackfill = [];
-
       for (let json of resp.obj) {
         await maybeYield();
         let record = new this._recordObj();
         record.deserialize(json);
+        await recordHandler(record);
+      }
 
-        let { shouldApply, error } = await this._maybeReconcile(record);
-        if (error) {
-          failedInBackfill.push(record.id);
-          count.failed++;
-          continue;
-        }
-        if (!shouldApply) {
-          count.reconciled++;
-          continue;
-        }
-        backfilledRecordsToApply.push(record);
+      // This batch was successfully applied. Not using
+      // doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
+      fetchBatch = fetchBatch.slice(batchSize);
+      this.toFetch = Utils.arraySub(this.toFetch, newitems.ids);
+      this.previousFailed = Utils.arrayUnion(this.previousFailed, failed);
+      if (failed.length) {
+        count.failed += failed.length;
+        this._log.debug("Records that failed to apply: " + failed);
       }
-      let failedToApply = await this._applyRecords(backfilledRecordsToApply);
-      failedInBackfill.push(...failedToApply);
+      failed = [];
 
-      count.failed += failedToApply.length;
-      count.applied += backfilledRecordsToApply.length;
-
-      this.toFetch = Utils.arraySub(this.toFetch, ids);
-      this.previousFailed = Utils.arrayUnion(this.previousFailed, failedInBackfill);
+      if (aborting) {
+        throw aborting;
+      }
 
       if (this.lastSync < this.lastModified) {
         this.lastSync = this.lastModified;
       }
     }
 
+    // Apply remaining items.
+    await doApplyBatchAndPersistFailed.call(this);
+
     count.newFailed = this.previousFailed.reduce((count, engine) => {
       if (failedInPreviousSync.indexOf(engine) == -1) {
         count++;
       }
       return count;
     }, 0);
     count.succeeded = Math.max(0, count.applied - count.failed);
     this._log.info(["Records:",
                     count.applied, "applied,",
                     count.succeeded, "successfully,",
                     count.failed, "failed to apply,",
                     count.newFailed, "newly failed to apply,",
                     count.reconciled, "reconciled."].join(" "));
     Observers.notify("weave:engine:sync:applied", count, this.name);
   },
 
-  async _maybeReconcile(item) {
-    let key = this.service.collectionKeys.keyForCollection(this.name);
-
-    // Grab a later last modified if possible
-    if (this.lastModified == null || item.modified > this.lastModified) {
-      this.lastModified = item.modified;
-    }
-
-    try {
-      try {
-        await item.decrypt(key);
-      } catch (ex) {
-        if (!Utils.isHMACMismatch(ex)) {
-          throw ex;
-        }
-        let strategy = await this.handleHMACMismatch(item, true);
-        if (strategy == SyncEngine.kRecoveryStrategy.retry) {
-          // You only get one retry.
-          try {
-            // Try decrypting again, typically because we've got new keys.
-            this._log.info("Trying decrypt again...");
-            key = this.service.collectionKeys.keyForCollection(this.name);
-            await item.decrypt(key);
-            strategy = null;
-          } catch (ex) {
-            if (!Utils.isHMACMismatch(ex)) {
-              throw ex;
-            }
-            strategy = await this.handleHMACMismatch(item, false);
-          }
-        }
-
-        switch (strategy) {
-          case null:
-            // Retry succeeded! No further handling.
-            break;
-          case SyncEngine.kRecoveryStrategy.retry:
-            this._log.debug("Ignoring second retry suggestion.");
-            // Fall through to error case.
-          case SyncEngine.kRecoveryStrategy.error:
-            this._log.warn("Error decrypting record", ex);
-            return { shouldApply: false, error: ex };
-          case SyncEngine.kRecoveryStrategy.ignore:
-            this._log.debug("Ignoring record " + item.id +
-                            " with bad HMAC: already handled.");
-            return { shouldApply: false, error: null };
-        }
-      }
-    } catch (ex) {
-      if (Async.isShutdownException(ex)) {
-        throw ex;
-      }
-      this._log.warn("Error decrypting record", ex);
-      return { shouldApply: false, error: ex };
-    }
-
-    if (this._shouldDeleteRemotely(item)) {
-      this._log.trace("Deleting item from server without applying", item);
-      this._deleteId(item.id);
-      return { shouldApply: false, error: null };
-    }
-
-    let shouldApply;
-    try {
-      shouldApply = await this._reconcile(item);
-    } catch (ex) {
-      if (ex.code == Engine.prototype.eEngineAbortApplyIncoming) {
-        this._log.warn("Reconciliation failed: aborting incoming processing.");
-        throw ex.cause;
-      } else if (!Async.isShutdownException(ex)) {
-        this._log.warn("Failed to reconcile incoming record " + item.id, ex);
-        return { shouldApply: false, error: ex };
-      } else {
-        throw ex;
-      }
-    }
-
-    if (!shouldApply) {
-      this._log.trace("Skipping reconciled incoming item " + item.id);
-    }
-
-    return { shouldApply, error: null };
-  },
-
-  async _applyRecords(records) {
-    this._tracker.ignoreAll = true;
-    try {
-      let failedIDs = await this._store.applyIncomingBatch(records);
-      return failedIDs;
-    } catch (ex) {
-      // Catch any error that escapes from applyIncomingBatch. At present
-      // those will all be abort events.
-      this._log.warn("Got exception, aborting processIncoming", ex);
-      throw ex;
-    } finally {
-      this._tracker.ignoreAll = false;
-    }
-  },
-
   // Indicates whether an incoming item should be deleted from the server at
   // the end of the sync. Engines can override this method to clean up records
   // that shouldn't be on the server.
   _shouldDeleteRemotely(remoteItem) {
     return false;
   },
 
   /**
--- a/services/sync/modules/engines/forms.js
+++ b/services/sync/modules/engines/forms.js
@@ -102,16 +102,17 @@ var FormWrapper = {
 this.FormEngine = function FormEngine(service) {
   SyncEngine.call(this, "Forms", service);
 };
 FormEngine.prototype = {
   __proto__: SyncEngine.prototype,
   _storeObj: FormStore,
   _trackerObj: FormTracker,
   _recordObj: FormRec,
+  applyIncomingBatchSize: FORMS_STORE_BATCH_SIZE,
 
   syncPriority: 6,
 
   get prefName() {
     return "history";
   },
 
   async _findDupe(item) {
--- a/services/sync/modules/engines/history.js
+++ b/services/sync/modules/engines/history.js
@@ -42,16 +42,17 @@ this.HistoryEngine = function HistoryEng
   SyncEngine.call(this, "History", service);
 };
 HistoryEngine.prototype = {
   __proto__: SyncEngine.prototype,
   _recordObj: HistoryRec,
   _storeObj: HistoryStore,
   _trackerObj: HistoryTracker,
   downloadLimit: MAX_HISTORY_DOWNLOAD,
+  applyIncomingBatchSize: HISTORY_STORE_BATCH_SIZE,
 
   syncPriority: 7,
 
   async _processIncoming(newitems) {
     // We want to notify history observers that a batch operation is underway
     // so they don't do lots of work for each incoming record.
     let observers = PlacesUtils.history.getObservers();
     function notifyHistoryObservers(notification) {
--- a/services/sync/modules/engines/passwords.js
+++ b/services/sync/modules/engines/passwords.js
@@ -72,16 +72,18 @@ this.PasswordEngine = function PasswordE
   SyncEngine.call(this, "Passwords", service);
 };
 PasswordEngine.prototype = {
   __proto__: SyncEngine.prototype,
   _storeObj: PasswordStore,
   _trackerObj: PasswordTracker,
   _recordObj: LoginRec,
 
+  applyIncomingBatchSize: PASSWORDS_STORE_BATCH_SIZE,
+
   syncPriority: 2,
 
   async _syncFinish() {
     await SyncEngine.prototype._syncFinish.call(this);
 
     // Delete the Weave credentials from the server once.
     if (!Svc.Prefs.get("deletePwdFxA", false)) {
       try {
--- a/services/sync/modules/record.js
+++ b/services/sync/modules/record.js
@@ -613,43 +613,35 @@ Collection.prototype = {
   __proto__: Resource.prototype,
   _logName: "Sync.Collection",
 
   _rebuildURL: function Coll__rebuildURL() {
     // XXX should consider what happens if it's not a URL...
     this.uri.QueryInterface(Ci.nsIURL);
 
     let args = [];
-    if (this.older) {
+    if (this.older)
       args.push("older=" + this.older);
-    }
-    if (this.newer) {
+    else if (this.newer) {
       args.push("newer=" + this.newer);
     }
-    if (this.full) {
+    if (this.full)
       args.push("full=1");
-    }
-    if (this.sort) {
+    if (this.sort)
       args.push("sort=" + this.sort);
-    }
-    if (this.ids != null) {
+    if (this.ids != null)
       args.push("ids=" + this.ids);
-    }
-    if (this.limit > 0 && this.limit != Infinity) {
+    if (this.limit > 0 && this.limit != Infinity)
       args.push("limit=" + this.limit);
-    }
-    if (this._batch) {
+    if (this._batch)
       args.push("batch=" + encodeURIComponent(this._batch));
-    }
-    if (this._commit) {
+    if (this._commit)
       args.push("commit=true");
-    }
-    if (this._offset) {
+    if (this._offset)
       args.push("offset=" + encodeURIComponent(this._offset));
-    }
 
     this.uri.query = (args.length > 0) ? "?" + args.join("&") : "";
   },
 
   // get full items
   get full() { return this._full; },
   set full(value) {
     this._full = value;
--- a/services/sync/tests/unit/head_http_server.js
+++ b/services/sync/tests/unit/head_http_server.js
@@ -2,17 +2,16 @@
 /* import-globals-from ../../../common/tests/unit/head_helpers.js */
 /* import-globals-from head_helpers.js */
 
 var Cm = Components.manager;
 
 // Shared logging for all HTTP server functions.
 Cu.import("resource://gre/modules/Log.jsm");
 Cu.import("resource://services-common/utils.js");
-Cu.import("resource://testing-common/TestUtils.jsm");
 const SYNC_HTTP_LOGGER = "Sync.Test.Server";
 
 // While the sync code itself uses 1.5, the tests hard-code 1.1,
 // so we're sticking with 1.1 here.
 const SYNC_API_VERSION = "1.1";
 
 // Use the same method that record.js does, which mirrors the server.
 // The server returns timestamps with 1/100 sec granularity. Note that this is
@@ -74,39 +73,36 @@ function ServerWBO(id, initialPayload, m
     return;
   }
 
   if (typeof initialPayload == "object") {
     initialPayload = JSON.stringify(initialPayload);
   }
   this.payload = initialPayload;
   this.modified = modified || new_timestamp();
-  this.sortindex = 0;
 }
 ServerWBO.prototype = {
 
   get data() {
     return JSON.parse(this.payload);
   },
 
   get() {
     return JSON.stringify(this, ["id", "modified", "payload"]);
   },
 
   put(input) {
     input = JSON.parse(input);
     this.payload = input.payload;
     this.modified = new_timestamp();
-    this.sortindex = input.sortindex || 0;
   },
 
   delete() {
     delete this.payload;
     delete this.modified;
-    delete this.sortindex;
   },
 
   // This handler sets `newModified` on the response body if the collection
   // timestamp has changed. This allows wrapper handlers to extract information
   // that otherwise would exist only in the body stream.
   handler() {
     let self = this;
 
@@ -283,94 +279,79 @@ ServerCollection.prototype = {
    */
   remove: function remove(id) {
     delete this._wbos[id];
   },
 
   _inResultSet(wbo, options) {
     return wbo.payload
            && (!options.ids || (options.ids.indexOf(wbo.id) != -1))
-           && (!options.newer || (wbo.modified > options.newer))
-           && (!options.older || (wbo.modified < options.older));
+           && (!options.newer || (wbo.modified > options.newer));
   },
 
   count(options) {
     options = options || {};
     let c = 0;
     for (let wbo of Object.values(this._wbos)) {
       if (wbo.modified && this._inResultSet(wbo, options)) {
         c++;
       }
     }
     return c;
   },
 
   get(options, request) {
-    let data = [];
-    for (let wbo of Object.values(this._wbos)) {
-      if (wbo.modified && this._inResultSet(wbo, options)) {
-        data.push(wbo);
+    let result;
+    if (options.full) {
+      let data = [];
+      for (let wbo of Object.values(this._wbos)) {
+        // Drop deleted.
+        if (wbo.modified && this._inResultSet(wbo, options)) {
+          data.push(wbo.get());
+        }
       }
-    }
-    switch (options.sort) {
-      case "newest":
-        data.sort((a, b) => b.modified - a.modified);
-        break;
-
-      case "oldest":
-        data.sort((a, b) => a.modified - b.modified);
-        break;
-
-      case "index":
-        data.sort((a, b) => b.sortindex - a.sortindex);
-        break;
-
-      default:
-        if (options.sort) {
-          this._log.error("Error: client requesting unknown sort order",
-                          options.sort);
-          throw new Error("Unknown sort order");
-        }
-        // If the client didn't request a sort order, shuffle the records
-        // to ensure that we don't accidentally depend on the default order.
-        TestUtils.shuffle(data);
-    }
-    if (options.full) {
-      data = data.map(wbo => wbo.get());
       let start = options.offset || 0;
       if (options.limit) {
         let numItemsPastOffset = data.length - start;
         data = data.slice(start, start + options.limit);
         // use options as a backchannel to set x-weave-next-offset
         if (numItemsPastOffset > options.limit) {
           options.nextOffset = start + options.limit;
         }
       } else if (start) {
         data = data.slice(start);
       }
 
       if (request && request.getHeader("accept") == "application/newlines") {
         this._log.error("Error: client requesting application/newlines content");
         throw new Error("This server should not serve application/newlines content");
+      } else {
+        result = JSON.stringify(data);
       }
 
       // Use options as a backchannel to report count.
       options.recordCount = data.length;
     } else {
-      data = data.map(wbo => wbo.id);
+      let data = [];
+      for (let [id, wbo] of Object.entries(this._wbos)) {
+        if (this._inResultSet(wbo, options)) {
+          data.push(id);
+        }
+      }
       let start = options.offset || 0;
       if (options.limit) {
         data = data.slice(start, start + options.limit);
         options.nextOffset = start + options.limit;
       } else if (start) {
         data = data.slice(start);
       }
+      result = JSON.stringify(data);
       options.recordCount = data.length;
     }
-    return JSON.stringify(data);
+    return result;
   },
 
   post(input) {
     input = JSON.parse(input);
     let success = [];
     let failed = {};
 
     // This will count records where we have an existing ServerWBO
@@ -382,17 +363,16 @@ ServerCollection.prototype = {
         this._log.debug("Creating WBO " + JSON.stringify(record.id) +
                         " on the fly.");
         wbo = new ServerWBO(record.id);
         this.insertWBO(wbo);
       }
       if (wbo) {
         wbo.payload = record.payload;
         wbo.modified = new_timestamp();
-        wbo.sortindex = record.sortindex || 0;
         success.push(record.id);
       } else {
         failed[record.id] = "no wbo configured";
       }
     }
     return {modified: new_timestamp(),
             success,
             failed};
@@ -441,19 +421,16 @@ ServerCollection.prototype = {
           response.bodyOutputStream.write(body, body.length);
           return;
         }
         options.ids = options.ids.split(",");
       }
       if (options.newer) {
         options.newer = parseFloat(options.newer);
       }
-      if (options.older) {
-        options.older = parseFloat(options.older);
-      }
       if (options.limit) {
         options.limit = parseInt(options.limit, 10);
       }
       if (options.offset) {
         options.offset = parseInt(options.offset, 10);
       }
 
       switch (request.method) {
deleted file mode 100644
--- a/services/sync/tests/unit/test_history_engine.js
+++ /dev/null
@@ -1,111 +0,0 @@
-/* Any copyright is dedicated to the Public Domain.
-   http://creativecommons.org/publicdomain/zero/1.0/ */
-
-Cu.import("resource://services-sync/service.js");
-Cu.import("resource://services-sync/engines/history.js");
-Cu.import("resource://testing-common/services/sync/utils.js");
-
-add_task(async function setup() {
-  initTestLogging("Trace");
-});
-
-add_task(async function test_history_download_limit() {
-  let engine = new HistoryEngine(Service);
-  await engine.initialize();
-
-  let server = await serverForFoo(engine);
-  await SyncTestingInfrastructure(server);
-
-  let lastSync = Date.now() / 1000;
-
-  let collection = server.user("foo").collection("history");
-  for (let i = 0; i < 15; i++) {
-    let id = "place" + ("0".repeat(7) + i).slice(-7);
-    let wbo = new ServerWBO(id, encryptPayload({
-      id,
-      histUri: "http://example.com/" + i,
-      title: "Page " + i,
-      visits: [{
-        date: Date.now() * 1000,
-        type: PlacesUtils.history.TRANSITIONS.TYPED,
-      }, {
-        date: Date.now() * 1000,
-        type: PlacesUtils.history.TRANSITIONS.LINK,
-      }],
-    }), lastSync + 1 + i);
-    wbo.sortindex = 15 - i;
-    collection.insertWBO(wbo);
-  }
-
-  // We have 15 records on the server since the last sync, but our download
-  // limit is 5 records at a time. We should eventually fetch all 15.
-  engine.lastSync = lastSync;
-  engine.downloadBatchSize = 4;
-  engine.downloadLimit = 5;
-
-  // Don't actually fetch any backlogged records, so that we can inspect
-  // the backlog between syncs.
-  engine.guidFetchBatchSize = 0;
-
-  let ping = await sync_engine_and_validate_telem(engine, false);
-  deepEqual(ping.engines[0].incoming, { applied: 5 });
-
-  let backlogAfterFirstSync = engine.toFetch.slice(0);
-  deepEqual(backlogAfterFirstSync, ["place0000000", "place0000001",
-    "place0000002", "place0000003", "place0000004", "place0000005",
-    "place0000006", "place0000007", "place0000008", "place0000009"]);
-
-  // We should have fast-forwarded the last sync time.
-  equal(engine.lastSync, lastSync + 15);
-
-  engine.lastModified = collection.modified;
-  ping = await sync_engine_and_validate_telem(engine, false);
-  ok(!ping.engines[0].incoming);
-
-  // After the second sync, our backlog still contains the same GUIDs: we
-  // weren't able to make progress on fetching them, since our
-  // `guidFetchBatchSize` is 0.
-  let backlogAfterSecondSync = engine.toFetch.slice(0);
-  deepEqual(backlogAfterFirstSync, backlogAfterSecondSync);
-
-  // Now add a newer record to the server.
-  let newWBO = new ServerWBO("placeAAAAAAA", encryptPayload({
-    id: "placeAAAAAAA",
-    histUri: "http://example.com/a",
-    title: "New Page A",
-    visits: [{
-      date: Date.now() * 1000,
-      type: PlacesUtils.history.TRANSITIONS.TYPED,
-    }],
-  }), lastSync + 20);
-  newWBO.sortindex = -1;
-  collection.insertWBO(newWBO);
-
-  engine.lastModified = collection.modified;
-  ping = await sync_engine_and_validate_telem(engine, false);
-  deepEqual(ping.engines[0].incoming, { applied: 1 });
-
-  // Our backlog should remain the same.
-  let backlogAfterThirdSync = engine.toFetch.slice(0);
-  deepEqual(backlogAfterSecondSync, backlogAfterThirdSync);
-
-  equal(engine.lastSync, lastSync + 20);
-
-  // Bump the fetch batch size to let the backlog make progress. We should
-  // make 3 requests to fetch 5 backlogged GUIDs.
-  engine.guidFetchBatchSize = 2;
-
-  engine.lastModified = collection.modified;
-  ping = await sync_engine_and_validate_telem(engine, false);
-  deepEqual(ping.engines[0].incoming, { applied: 5 });
-
-  deepEqual(engine.toFetch, ["place0000005", "place0000006", "place0000007",
-    "place0000008", "place0000009"]);
-
-  // Sync again to clear out the backlog.
-  engine.lastModified = collection.modified;
-  ping = await sync_engine_and_validate_telem(engine, false);
-  deepEqual(ping.engines[0].incoming, { applied: 5 });
-
-  deepEqual(engine.toFetch, []);
-});
--- a/services/sync/tests/unit/test_syncengine_sync.js
+++ b/services/sync/tests/unit/test_syncengine_sync.js
@@ -660,39 +660,135 @@ add_task(async function test_processInco
     do_check_eq(engine._store.items.failed2, "Record No. 2");
     do_check_eq(engine.previousFailed.length, 0);
   } finally {
     await cleanAndGo(engine, server);
   }
 });
 
 
+add_task(async function test_processIncoming_applyIncomingBatchSize_smaller() {
+  _("Ensure that a number of incoming items less than applyIncomingBatchSize is still applied.");
+
+  // Engine that doesn't like the first and last record it's given.
+  const APPLY_BATCH_SIZE = 10;
+  let engine = makeRotaryEngine();
+  engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
+  engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
+  engine._store.applyIncomingBatch = async function(records) {
+    let failed1 = records.shift();
+    let failed2 = records.pop();
+    await this._applyIncomingBatch(records);
+    return [failed1.id, failed2.id];
+  };
+
+  // Let's create less than a batch worth of server side records.
+  let collection = new ServerCollection();
+  for (let i = 0; i < APPLY_BATCH_SIZE - 1; i++) {
+    let id = "record-no-" + i;
+    let payload = encryptPayload({id, denomination: "Record No. " + id});
+    collection.insert(id, payload);
+  }
+
+  let server = sync_httpd_setup({
+      "/1.1/foo/storage/rotary": collection.handler()
+  });
+
+  await SyncTestingInfrastructure(server);
+
+  let meta_global = Service.recordManager.set(engine.metaURL,
+                                              new WBORecord(engine.metaURL));
+  meta_global.payload.engines = {rotary: {version: engine.version,
+                                         syncID: engine.syncID}};
+  try {
+
+    // Confirm initial environment
+    do_check_empty(engine._store.items);
+
+    await engine._syncStartup();
+    await engine._processIncoming();
+
+    // Records have been applied and the expected failures have failed.
+    do_check_attribute_count(engine._store.items, APPLY_BATCH_SIZE - 1 - 2);
+    do_check_eq(engine.toFetch.length, 0);
+    do_check_eq(engine.previousFailed.length, 2);
+    do_check_eq(engine.previousFailed[0], "record-no-0");
+    do_check_eq(engine.previousFailed[1], "record-no-8");
+
+  } finally {
+    await cleanAndGo(engine, server);
+  }
+});
+
+
+add_task(async function test_processIncoming_applyIncomingBatchSize_multiple() {
+  _("Ensure that incoming items are applied according to applyIncomingBatchSize.");
+
+  const APPLY_BATCH_SIZE = 10;
+
+  // Engine that applies records in batches.
+  let engine = makeRotaryEngine();
+  engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
+  let batchCalls = 0;
+  engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
+  engine._store.applyIncomingBatch = async function(records) {
+    batchCalls += 1;
+    do_check_eq(records.length, APPLY_BATCH_SIZE);
+    await this._applyIncomingBatch.apply(this, arguments);
+  };
+
+  // Let's create three batches worth of server side records.
+  let collection = new ServerCollection();
+  for (let i = 0; i < APPLY_BATCH_SIZE * 3; i++) {
+    let id = "record-no-" + i;
+    let payload = encryptPayload({id, denomination: "Record No. " + id});
+    collection.insert(id, payload);
+  }
+
+  let server = sync_httpd_setup({
+      "/1.1/foo/storage/rotary": collection.handler()
+  });
+
+  await SyncTestingInfrastructure(server);
+
+  let meta_global = Service.recordManager.set(engine.metaURL,
+                                              new WBORecord(engine.metaURL));
+  meta_global.payload.engines = {rotary: {version: engine.version,
+                                         syncID: engine.syncID}};
+  try {
+
+    // Confirm initial environment
+    do_check_empty(engine._store.items);
+
+    await engine._syncStartup();
+    await engine._processIncoming();
+
+    // Records have been applied in 3 batches.
+    do_check_eq(batchCalls, 3);
+    do_check_attribute_count(engine._store.items, APPLY_BATCH_SIZE * 3);
+
+  } finally {
+    await cleanAndGo(engine, server);
+  }
+});
+
+
 add_task(async function test_processIncoming_notify_count() {
   _("Ensure that failed records are reported only once.");
 
+  const APPLY_BATCH_SIZE = 5;
   const NUMBER_OF_RECORDS = 15;
 
   // Engine that fails the first record.
   let engine = makeRotaryEngine();
+  engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
   engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
   engine._store.applyIncomingBatch = async function(records) {
-    let recordsToApply = [], recordsToFail = [];
-    for (let record of records) {
-      if (record.id == "record-no-0") {
-        recordsToFail.push(record);
-      } else if (records.length == NUMBER_OF_RECORDS &&
-                 (record.id == "record-no-5" || record.id == "record-no-10")) {
-        // Fail records 5 and 10 during the first sync.
-        recordsToFail.push(record);
-      } else {
-        recordsToApply.push(record);
-      }
-    }
-    await engine._store._applyIncomingBatch(recordsToApply);
-    return recordsToFail.map(record => record.id);
+    await engine._store._applyIncomingBatch(records.slice(1));
+    return [records[0].id];
   };
 
   // Create a batch of server side records.
   let collection = new ServerCollection();
   for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
     let id = "record-no-" + i;
     let payload = encryptPayload({id, denomination: "Record No. " + id});
     collection.insert(id, payload);
@@ -760,38 +856,28 @@ add_task(async function test_processInco
   } finally {
     await cleanAndGo(engine, server);
   }
 });
 
 
 add_task(async function test_processIncoming_previousFailed() {
   _("Ensure that failed records are retried.");
+  Svc.Prefs.set("client.type", "mobile");
 
+  const APPLY_BATCH_SIZE = 4;
   const NUMBER_OF_RECORDS = 14;
 
   // Engine that fails the first 2 records.
   let engine = makeRotaryEngine();
+  engine.mobileGUIDFetchBatchSize = engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
   engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
   engine._store.applyIncomingBatch = async function(records) {
-    let recordsToApply = [], recordsToFail = [];
-    for (let record of records) {
-      if (record.id == "record-no-0" || record.id == "record-no-1" ||
-          record.id == "record-no-8" || record.id == "record-no-9") {
-        recordsToFail.push(record);
-      } else if (records.length == NUMBER_OF_RECORDS &&
-                 (record.id == "record-no-4" || record.id == "record-no-5" ||
-                  record.id == "record-no-12" || record.id == "record-no-13")) {
-        recordsToFail.push(record);
-      } else {
-        recordsToApply.push(record);
-      }
-    }
-    await engine._store._applyIncomingBatch(recordsToApply);
-    return recordsToFail.map(record => record.id);
+    await engine._store._applyIncomingBatch(records.slice(2));
+    return [records[0].id, records[1].id];
   };
 
   // Create a batch of server side records.
   let collection = new ServerCollection();
   for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
     let id = "record-no-" + i;
     let payload = encryptPayload({id, denomination: "Record No. " + i});
     collection.insert(id, payload);
@@ -880,16 +966,17 @@ add_task(async function test_processInco
                          "record-no-" + 23,
                          "record-no-" + (42 + APPLY_BATCH_SIZE),
                          "record-no-" + (23 + APPLY_BATCH_SIZE),
                          "record-no-" + (42 + APPLY_BATCH_SIZE * 2),
                          "record-no-" + (23 + APPLY_BATCH_SIZE * 2),
                          "record-no-" + (2 + APPLY_BATCH_SIZE * 3),
                          "record-no-" + (1 + APPLY_BATCH_SIZE * 3)];
   let engine = makeRotaryEngine();
+  engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
 
   engine.__reconcile = engine._reconcile;
   engine._reconcile = async function _reconcile(record) {
     if (BOGUS_RECORDS.indexOf(record.id) % 2 == 0) {
       throw new Error("I don't like this record! Baaaaaah!");
     }
     return this.__reconcile.apply(this, arguments);
   };
--- a/services/sync/tests/unit/xpcshell.ini
+++ b/services/sync/tests/unit/xpcshell.ini
@@ -166,17 +166,16 @@ run-sequentially = Frequent timeouts, bu
 [test_doctor.js]
 [test_extension_storage_engine.js]
 [test_extension_storage_tracker.js]
 [test_forms_store.js]
 [test_forms_tracker.js]
 # Too many intermittent "ASSERTION: thread pool wasn't shutdown: '!mPool'" (bug 804479)
 skip-if = debug
 [test_form_validator.js]
-[test_history_engine.js]
 [test_history_store.js]
 [test_history_tracker.js]
 # Too many intermittent "ASSERTION: thread pool wasn't shutdown: '!mPool'" (bug 804479)
 skip-if = debug
 [test_places_guid_downgrade.js]
 [test_password_engine.js]
 [test_password_store.js]
 [test_password_validator.js]
--- a/testing/modules/TestUtils.jsm
+++ b/testing/modules/TestUtils.jsm
@@ -137,19 +137,9 @@ this.TestUtils = {
         if (conditionPassed) {
           clearInterval(intervalID);
           resolve();
         }
         tries++;
       }, interval);
     });
   },
-
-  shuffle(array) {
-    let results = [];
-    for (let i = 0; i < array.length; ++i) {
-      let randomIndex = Math.floor(Math.random() * (i + 1));
-      results[i] = results[randomIndex];
-      results[randomIndex] = array[i];
-    }
-    return results;
-  },
 };
--- a/toolkit/components/places/PlacesSyncUtils.jsm
+++ b/toolkit/components/places/PlacesSyncUtils.jsm
@@ -19,37 +19,17 @@ XPCOMUtils.defineLazyModuleGetter(this, 
                                   "resource://gre/modules/PlacesUtils.jsm");
 
 /**
  * This module exports functions for Sync to use when applying remote
  * records. The calls are similar to those in `Bookmarks.jsm` and
  * `nsINavBookmarksService`, with special handling for smart bookmarks,
  * tags, keywords, synced annotations, and missing parents.
  */
-var PlacesSyncUtils = {
-  /**
-   * Auxiliary generator function that yields an array in chunks
-   *
-   * @param  array
-   * @param  chunkLength
-   * @yields {Array}
-   *         New Array with the next chunkLength elements of array.
-   *         If the array has less than chunkLength elements, yields all of them
-   */
-  * chunkArray(array, chunkLength) {
-    if (!array.length || chunkLength <= 0) {
-      return;
-    }
-    let startIndex = 0;
-    while (startIndex < array.length) {
-      yield array.slice(startIndex, startIndex + chunkLength);
-      startIndex += chunkLength;
-    }
-  },
-};
+var PlacesSyncUtils = {};
 
 const { SOURCE_SYNC } = Ci.nsINavBookmarksService;
 
 const MICROSECONDS_PER_SECOND = 1000000;
 const SQLITE_MAX_VARIABLE_NUMBER = 999;
 
 const ORGANIZER_QUERY_ANNO = "PlacesOrganizer/OrganizerQuery";
 const ORGANIZER_ALL_BOOKMARKS_ANNO_VALUE = "AllBookmarks";
@@ -75,16 +55,30 @@ XPCOMUtils.defineLazyGetter(this, "ROOT_
   [PlacesUtils.bookmarks.unfiledGuid]: "unfiled",
   [PlacesUtils.bookmarks.mobileGuid]: "mobile",
 }));
 
 XPCOMUtils.defineLazyGetter(this, "ROOTS", () =>
   Object.keys(ROOT_SYNC_ID_TO_GUID)
 );
 
+/**
+ * Auxiliary generator function that yields an array in chunks
+ *
+ * @param array
+ * @param chunkLength
+ * @yields {Array} New Array with the next chunkLength elements of array. If the array has less than chunkLength elements, yields all of them
+ */
+function* chunkArray(array, chunkLength) {
+  let startIndex = 0;
+  while (startIndex < array.length) {
+    yield array.slice(startIndex, startIndex += chunkLength);
+  }
+}
+
 const HistorySyncUtils = PlacesSyncUtils.history = Object.freeze({
   /**
    * Clamps a history visit date between the current date and the earliest
    * sensible date.
    *
    * @param {Date} visitDate
    *        The visit date.
    * @return {Date} The clamped visit date.
@@ -130,17 +124,17 @@ const HistorySyncUtils = PlacesSyncUtils
    */
   async determineNonSyncableGuids(guids) {
     // Filter out hidden pages and `TRANSITION_FRAMED_LINK` visits. These are
     // excluded when rendering the history menu, so we use the same constraints
     // for Sync. We also don't want to sync `TRANSITION_EMBED` visits, but those
     // aren't stored in the database.
     let db = await PlacesUtils.promiseDBConnection();
     let nonSyncableGuids = [];
-    for (let chunk of PlacesSyncUtils.chunkArray(guids, SQLITE_MAX_VARIABLE_NUMBER)) {
+    for (let chunk of chunkArray(guids, SQLITE_MAX_VARIABLE_NUMBER)) {
       let rows = await db.execute(`
         SELECT DISTINCT p.guid FROM moz_places p
         JOIN moz_historyvisits v ON p.id = v.place_id
         WHERE p.guid IN (${new Array(chunk.length).fill("?").join(",")}) AND
             (p.hidden = 1 OR v.visit_type IN (0,
               ${PlacesUtils.history.TRANSITION_FRAMED_LINK}))
       `, chunk);
       nonSyncableGuids = nonSyncableGuids.concat(rows.map(row => row.getResultByName("guid")));