Bug 1337978 - Unify the multiple notions of 'weak upload' in sync. r=kitcambridge,markh
authorThom Chiovoloni <tchiovoloni@mozilla.com>
Tue, 18 Jul 2017 14:18:04 -0400
changeset 421785 e631feaa72e9c2b94b1e2f10fa2298a4ec0f79c3
parent 421784 7e0990dca8d163e844a2253f0c57fcf32b053440
child 421786 52b49a63629c78042d7683691fc46fb072a81fcb
push id1517
push userjlorenzo@mozilla.com
push dateThu, 14 Sep 2017 16:50:54 +0000
treeherdermozilla-release@3b41fd564418 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerskitcambridge, markh
bugs1337978
milestone56.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1337978 - Unify the multiple notions of 'weak upload' in sync. r=kitcambridge,markh MozReview-Commit-ID: 5vTCAhUfMzm
services/sync/modules/bookmark_repair.js
services/sync/modules/engines.js
services/sync/modules/engines/bookmarks.js
services/sync/tests/unit/test_bookmark_engine.js
services/sync/tests/unit/test_bookmark_repair_responder.js
services/sync/tests/unit/test_bookmark_tracker.js
toolkit/components/places/PlacesSyncUtils.jsm
--- a/services/sync/modules/bookmark_repair.js
+++ b/services/sync/modules/bookmark_repair.js
@@ -181,17 +181,17 @@ class BookmarkRepairRequestor extends Co
   }
 
   tryServerOnlyRepairs(validationInfo) {
     if (this._countServerOnlyFixableProblems(validationInfo) == 0) {
       return false;
     }
     let engine = this.service.engineManager.get("bookmarks");
     for (let id of validationInfo.problems.serverMissing) {
-      engine._modified.setWeak(id, { tombstone: false });
+      engine.addForWeakUpload(id);
     }
     let toFetch = engine.toFetch.concat(validationInfo.problems.clientMissing,
                                         validationInfo.problems.serverDeleted);
     engine.toFetch = Array.from(new Set(toFetch));
     return true;
   }
 
   /* See if the repairer is willing and able to begin a repair process given
@@ -569,34 +569,35 @@ class BookmarkRepairResponder extends Co
     // progress as we don't do anything too smart that could cause problems,
     // but just upload items. If we get any smarter we should re-think this
     // (but when we do, note that checking this._currentState isn't enough as
     // this responder is not a singleton)
 
     this._currentState = {
       request,
       rawCommand,
+      processedCommand: false,
       ids: [],
     }
 
     try {
       let engine = this.service.engineManager.get("bookmarks");
       let { toUpload, toDelete } = await this._fetchItemsToUpload(request);
 
       if (toUpload.size || toDelete.size) {
         log.debug(`repair request will upload ${toUpload.size} items and delete ${toDelete.size} items`);
         // whew - now add these items to the tracker "weakly" (ie, they will not
         // persist in the case of a restart, but that's OK - we'll then end up here
         // again) and also record them in the response we send back.
         for (let id of toUpload) {
-          engine._modified.setWeak(id, { tombstone: false });
+          engine.addForWeakUpload(id);
           this._currentState.ids.push(id);
         }
         for (let id of toDelete) {
-          engine._modified.setWeak(id, { tombstone: true });
+          engine.addForWeakUpload(id, { forceTombstone: true });
           this._currentState.ids.push(id);
         }
 
         // We have arranged for stuff to be uploaded, so wait until that's done.
         Svc.Obs.add("weave:engine:sync:uploaded", this.onUploaded, this);
         // and record in telemetry that we got this far - just incase we never
         // end up doing the upload for some obscure reason.
         let eventExtra = {
@@ -685,30 +686,33 @@ class BookmarkRepairResponder extends Co
     return { toUpload, toDelete };
   }
 
   onUploaded(subject, data) {
     if (data != "bookmarks") {
       return;
     }
     Svc.Obs.remove("weave:engine:sync:uploaded", this.onUploaded, this);
-    log.debug(`bookmarks engine has uploaded stuff - creating a repair response`);
+    if (subject.failed) {
+      return;
+    }
+    log.debug(`bookmarks engine has uploaded stuff - creating a repair response`, subject);
     Async.promiseSpinningly(this._finishRepair());
   }
 
   async _finishRepair() {
     let clientsEngine = this.service.clientsEngine;
     let flowID = this._currentState.request.flowID;
     let response = {
       request: this._currentState.request.request,
       collection: "bookmarks",
       clientID: clientsEngine.localID,
       flowID,
       ids: this._currentState.ids,
-    }
+    };
     let clientID = this._currentState.request.requestor;
     await clientsEngine.sendCommand("repairResponse", [response], clientID, { flowID });
     // and nuke the request from our client.
     await clientsEngine.removeLocalCommand(this._currentState.rawCommand);
     let eventExtra = {
       flowID,
       numIDs: response.ids.length.toString(),
     }
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -760,32 +760,43 @@ Engine.prototype = {
   },
 };
 
 this.SyncEngine = function SyncEngine(name, service) {
   Engine.call(this, name || "SyncEngine", service);
 
   // Async initializations can be made in the initialize() method.
 
-  // The set of records needing a weak reupload.
-  // The difference between this and a "normal" reupload is that these records
-  // are only tracked in memory, and if the reupload attempt fails (shutdown,
-  // 412, etc), we abort uploading the "weak" set.
+  // The map of ids => metadata for records needing a weak upload.
+  //
+  // Currently the "metadata" fields are:
+  //
+  // - forceTombstone: whether or not we should ignore the local information
+  //   about the record, and write a tombstone for it anyway -- e.g. in the case
+  //   of records that should exist locally, but should never be uploaded to the
+  //   server (note that not all sync engines support tombstones)
+  //
+  // The difference between this and a "normal" upload is that these records
+  // are only tracked in memory, and if the upload attempt fails (shutdown,
+  // 412, etc), we abort uploading the "weak" set (by clearing the map).
   //
   // The rationale here is for the cases where we receive a record from the
   // server that we know is wrong in some (small) way. For example, the
   // dateAdded field on bookmarks -- maybe we have a better date, or the server
   // record is entirely missing the date, etc.
   //
   // In these cases, we fix our local copy of the record, and mark it for
-  // weak reupload. A normal ("strong") reupload is problematic here because
+  // weak upload. A normal ("strong") upload is problematic here because
   // in the case of a conflict from the server, there's a window where our
   // record would be marked as modified more recently than a change that occurs
   // on another device change, and we lose data from the user.
-  this._needWeakReupload = new Set();
+  //
+  // Additionally, we use this as the set of items to upload for bookmark
+  // repair reponse, which has similar constraints.
+  this._needWeakUpload = new Map();
 }
 
 // Enumeration to define approaches to handling bad records.
 // Attached to the constructor to allow use as a kind of static enumeration.
 SyncEngine.kRecoveryStrategy = {
   ignore: "ignore",
   retry:  "retry",
   error:  "error"
@@ -961,16 +972,20 @@ SyncEngine.prototype = {
   _createTombstone(id) {
     let tombstone = new this._recordObj(this.name, id);
     tombstone.id = id;
     tombstone.collection = this.name;
     tombstone.deleted = true;
     return tombstone;
   },
 
+  addForWeakUpload(id, { forceTombstone = false } = {}) {
+    this._needWeakUpload.set(id, { forceTombstone });
+  },
+
   // Any setup that needs to happen at the beginning of each sync.
   async _syncStartup() {
 
     // Determine if we need to wipe on outdated versions
     let metaGlobal = await this.service.recordManager.get(this.metaURL);
     let engines = metaGlobal.payload.engines || {};
     let engineData = engines[this.name] || {};
 
@@ -1029,17 +1044,16 @@ SyncEngine.prototype = {
     // to upload back.
     this._tracker.clearChangedIDs();
 
     this._log.info(this._modified.count() +
                    " outgoing items pre-reconciliation");
 
     // Keep track of what to delete at the end of sync
     this._delete = {};
-    this._needWeakReupload.clear();
   },
 
   /**
    * A tiny abstraction to make it easier to test incoming record
    * application.
    */
   itemSource() {
     return new Collection(this.engineURL, this._recordObj, this.service);
@@ -1579,23 +1593,26 @@ SyncEngine.prototype = {
   },
 
   // Upload outgoing records.
   async _uploadOutgoing() {
     this._log.trace("Uploading local changes to server.");
 
     // collection we'll upload
     let up = new Collection(this.engineURL, null, this.service);
-    let modifiedIDs = this._modified.ids();
+    let modifiedIDs = new Set(this._modified.ids());
+    for (let id of this._needWeakUpload.keys()) {
+      modifiedIDs.add(id);
+    }
     let counts = { failed: 0, sent: 0 };
-    if (modifiedIDs.length) {
-      this._log.trace("Preparing " + modifiedIDs.length +
+    if (modifiedIDs.size) {
+      this._log.trace("Preparing " + modifiedIDs.size +
                       " outgoing records");
 
-      counts.sent = modifiedIDs.length;
+      counts.sent = modifiedIDs.size;
 
       let failed = [];
       let successful = [];
       let handleResponse = async (resp, batchOngoing = false) => {
         // Note: We don't want to update this.lastSync, or this._modified until
         // the batch is complete, however we want to remember success/failure
         // indicators for when that happens.
         if (!resp.success) {
@@ -1637,116 +1654,68 @@ SyncEngine.prototype = {
       };
 
       let postQueue = up.newPostQueue(this._log, this.lastSync, handleResponse);
 
       for (let id of modifiedIDs) {
         let out;
         let ok = false;
         try {
-          out = await this._createRecord(id);
+          let { forceTombstone = false } = this._needWeakUpload.get(id) || {};
+          if (forceTombstone) {
+            out = await this._createTombstone(id);
+          } else {
+            out = await this._createRecord(id);
+          }
           if (this._log.level <= Log.Level.Trace)
             this._log.trace("Outgoing: " + out);
 
           out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
           let payloadLength = JSON.stringify(out.payload).length;
           if (payloadLength > this.maxRecordPayloadBytes) {
             if (this.allowSkippedRecord) {
               this._modified.delete(id); // Do not attempt to sync that record again
             }
             throw new Error(`Payload too big: ${payloadLength} bytes`);
           }
           ok = true;
         } catch (ex) {
           this._log.warn("Error creating record", ex);
           ++counts.failed;
           if (Async.isShutdownException(ex) || !this.allowSkippedRecord) {
-            Observers.notify("weave:engine:sync:uploaded", counts, this.name);
+            if (!this.allowSkippedRecord) {
+              // Don't bother for shutdown errors
+              Observers.notify("weave:engine:sync:uploaded", counts, this.name);
+            }
             throw ex;
           }
         }
-        this._needWeakReupload.delete(id);
         if (ok) {
           let { enqueued, error } = await postQueue.enqueue(out);
           if (!enqueued) {
             ++counts.failed;
             if (!this.allowSkippedRecord) {
+              Observers.notify("weave:engine:sync:uploaded", counts, this.name);
               throw error;
             }
             this._modified.delete(id);
             this._log.warn(`Failed to enqueue record "${id}" (skipping)`, error);
           }
         }
         await Async.promiseYield();
       }
       await postQueue.flush(true);
     }
+    this._needWeakUpload.clear();
 
-    if (this._needWeakReupload.size) {
-      try {
-        const { sent, failed } = await this._weakReupload(up);
-        counts.sent += sent;
-        counts.failed += failed;
-      } catch (e) {
-        if (Async.isShutdownException(e)) {
-          throw e;
-        }
-        this._log.warn("Weak reupload failed", e);
-      }
-    }
     if (counts.sent || counts.failed) {
       Observers.notify("weave:engine:sync:uploaded", counts, this.name);
     }
   },
 
-  async _weakReupload(collection) {
-    const counts = { sent: 0, failed: 0 };
-    let pendingSent = 0;
-    let postQueue = collection.newPostQueue(this._log, this.lastSync, (resp, batchOngoing = false) => {
-      if (!resp.success) {
-        this._needWeakReupload.clear();
-        this._log.warn("Uploading records (weak) failed: " + resp);
-        resp.failureCode = resp.status == 412 ? ENGINE_BATCH_INTERRUPTED : ENGINE_UPLOAD_FAIL;
-        throw resp;
-      }
-      if (!batchOngoing) {
-        counts.sent += pendingSent;
-        pendingSent = 0;
-      }
-    });
-
-    let pendingWeakReupload = await this.buildWeakReuploadMap(this._needWeakReupload);
-    for (let [id, encodedRecord] of pendingWeakReupload) {
-      try {
-        this._log.trace("Outgoing (weak)", encodedRecord);
-        encodedRecord.encrypt(this.service.collectionKeys.keyForCollection(this.name));
-      } catch (ex) {
-        if (Async.isShutdownException(ex)) {
-          throw ex;
-        }
-        this._log.warn(`Failed to encrypt record "${id}" during weak reupload`, ex);
-        ++counts.failed;
-        continue;
-      }
-      // Note that general errors (network error, 412, etc.) will throw here.
-      // `enqueued` is only false if the specific item failed to enqueue, but
-      // other items should be/are fine. For example, enqueued will be false if
-      // it is larger than the max post or WBO size.
-      let { enqueued } = await postQueue.enqueue(encodedRecord);
-      if (!enqueued) {
-        ++counts.failed;
-      } else {
-        ++pendingSent;
-      }
-      await Async.promiseYield();
-    }
-    await postQueue.flush(true);
-    return counts;
-  },
-
   async _onRecordsWritten(succeeded, failed) {
     // Implement this method to take specific actions against successfully
     // uploaded records and failed records.
   },
 
   // Any cleanup necessary.
   // Save the current snapshot so as to calculate changes at next sync
   async _syncFinish() {
@@ -1772,17 +1741,17 @@ SyncEngine.prototype = {
           await doDelete(key, val.slice(0, 100));
           val = val.slice(100);
         }
       }
     }
   },
 
   async _syncCleanup() {
-    this._needWeakReupload.clear();
+    this._needWeakUpload.clear();
     if (!this._modified) {
       return;
     }
 
     try {
       // Mark failed WBOs as changed again so they are reuploaded next time.
       await this.trackRemainingChanges();
     } finally {
@@ -1832,16 +1801,17 @@ SyncEngine.prototype = {
 
     return canDecrypt;
   },
 
   async _resetClient() {
     this.resetLastSync();
     this.previousFailed = [];
     this.toFetch = [];
+    this._needWeakUpload.clear();
   },
 
   async wipeServer() {
     let response = await this.service.resource(this.engineURL).delete();
     if (response.status != 200 && response.status != 404) {
       throw response;
     }
     await this._resetClient();
@@ -1910,40 +1880,16 @@ SyncEngine.prototype = {
    * items that failed to upload. This method is called at the end of each sync.
    *
    */
   async trackRemainingChanges() {
     for (let [id, change] of this._modified.entries()) {
       this._tracker.addChangedID(id, change);
     }
   },
-
-  /**
-   * Returns a map of (id, unencrypted record) that will be used to perform
-   * the weak reupload. Subclasses may override this to filter out items we
-   * shouldn't upload as part of a weak reupload (items that have changed,
-   * for example).
-   */
-  async buildWeakReuploadMap(idSet) {
-    let result = new Map();
-    let maybeYield = Async.jankYielder();
-    for (let id of idSet) {
-      await maybeYield();
-      try {
-        let record = await this._createRecord(id);
-        result.set(id, record);
-      } catch (ex) {
-        if (Async.isShutdownException(ex)) {
-          throw ex;
-        }
-        this._log.warn("createRecord failed during weak reupload", ex);
-      }
-    }
-    return result;
-  }
 };
 
 /**
  * A changeset is created for each sync in `Engine::get{Changed, All}IDs`,
  * and stores opaque change data for tracked IDs. The default implementation
  * only records timestamps, though engines can extend this to store additional
  * data for each entry.
  */
--- a/services/sync/modules/engines/bookmarks.js
+++ b/services/sync/modules/engines/bookmarks.js
@@ -546,43 +546,16 @@ BookmarksEngine.prototype = {
       // Make sure deleted items are marked as tombstones. We do this here
       // in addition to the `isTombstone` call above because it's possible
       // a changed bookmark might be deleted during a sync (bug 1313967).
       this._modified.setTombstone(record.id);
     }
     return record;
   },
 
-  async buildWeakReuploadMap(idSet) {
-    // We want to avoid uploading records which have changed, since that could
-    // cause an inconsistent state on the server.
-    //
-    // Strictly speaking, it would be correct to just call getChangedIds() after
-    // building the initial weak reupload map, however this is quite slow, since
-    // we might end up doing createRecord() (which runs at least one, and
-    // sometimes multiple database queries) for a potentially large number of
-    // items.
-    //
-    // Since the call to getChangedIds is relatively cheap, we do it once before
-    // building the weakReuploadMap (which is where the calls to createRecord()
-    // occur) as an optimization, and once after for correctness, to handle the
-    // unlikely case that a record was modified while we were building the map.
-    let initialChanges = await PlacesSyncUtils.bookmarks.getChangedIds();
-    for (let changed of initialChanges) {
-      idSet.delete(changed);
-    }
-
-    let map = await SyncEngine.prototype.buildWeakReuploadMap.call(this, idSet);
-    let changes = await PlacesSyncUtils.bookmarks.getChangedIds();
-    for (let id of changes) {
-      map.delete(id);
-    }
-    return map;
-  },
-
   async _findDupe(item) {
     this._log.trace("Finding dupe for " + item.id +
                     " (already duped: " + item.hasDupe + ").");
 
     // Don't bother finding a dupe if the incoming item has duplicates.
     if (item.hasDupe) {
       this._log.trace(item.id + " already a dupe: not finding one.");
       return null;
@@ -721,34 +694,34 @@ BookmarksStore.prototype = {
     // This can throw if we're inserting an invalid or incomplete bookmark.
     // That's fine; the exception will be caught by `applyIncomingBatch`
     // without aborting further processing.
     let item = await PlacesSyncUtils.bookmarks.insert(info);
     if (item) {
       this._log.trace(`Created ${item.kind} ${item.syncId} under ${
         item.parentSyncId}`, item);
       if (item.dateAdded != record.dateAdded) {
-        this.engine._needWeakReupload.add(item.syncId);
+        this.engine.addForWeakUpload(item.syncId);
       }
     }
   },
 
   async remove(record) {
     this._log.trace(`Buffering removal of item "${record.id}".`);
     this._itemsToDelete.add(record.id);
   },
 
   async update(record) {
     let info = record.toSyncBookmark();
     let item = await PlacesSyncUtils.bookmarks.update(info);
     if (item) {
       this._log.trace(`Updated ${item.kind} ${item.syncId} under ${
         item.parentSyncId}`, item);
       if (item.dateAdded != record.dateAdded) {
-        this.engine._needWeakReupload.add(item.syncId);
+        this.engine.addForWeakUpload(item.syncId);
       }
     }
   },
 
   async _orderChildren() {
     for (let syncID in this._childrenToOrder) {
       let children = this._childrenToOrder[syncID];
       try {
@@ -1139,109 +1112,69 @@ BookmarksTracker.prototype = {
       this.score += SCORE_INCREMENT_XLARGE;
       this._batchSawScoreIncrement = false;
     }
   },
   onItemVisited() {}
 };
 
 class BookmarksChangeset extends Changeset {
-  constructor() {
-    super();
-    // Weak changes are part of the changeset, but don't bump the change
-    // counter, and aren't persisted anywhere.
-    this.weakChanges = {};
-  }
 
   getStatus(id) {
     let change = this.changes[id];
     if (!change) {
       return PlacesUtils.bookmarks.SYNC_STATUS.UNKNOWN;
     }
     return change.status;
   }
 
   getModifiedTimestamp(id) {
     let change = this.changes[id];
     if (change) {
       // Pretend the change doesn't exist if we've already synced or
       // reconciled it.
       return change.synced ? Number.NaN : change.modified;
     }
-    if (this.weakChanges[id]) {
-      // For weak changes, we use a timestamp from long ago to ensure we always
-      // prefer the remote version in case of conflicts.
-      return 0;
-    }
     return Number.NaN;
   }
 
-  setWeak(id, { tombstone = false } = {}) {
-    this.weakChanges[id] = { tombstone };
-  }
-
   has(id) {
     let change = this.changes[id];
     if (change) {
       return !change.synced;
     }
-    return !!this.weakChanges[id];
+    return false;
   }
 
   setTombstone(id) {
     let change = this.changes[id];
     if (change) {
       change.tombstone = true;
     }
-    let weakChange = this.weakChanges[id];
-    if (weakChange) {
-      // Not strictly necessary, since we never persist weak changes, but may
-      // be useful for bookkeeping.
-      weakChange.tombstone = true;
-    }
   }
 
   delete(id) {
     let change = this.changes[id];
     if (change) {
       // Mark the change as synced without removing it from the set. We do this
       // so that we can update Places in `trackRemainingChanges`.
       change.synced = true;
     }
-    delete this.weakChanges[id];
-  }
-
-  changeID(oldID, newID) {
-    super.changeID(oldID, newID);
-    this.weakChanges[newID] = this.weakChanges[oldID];
-    delete this.weakChanges[oldID];
   }
 
   ids() {
     let results = new Set();
     for (let id in this.changes) {
       if (!this.changes[id].synced) {
         results.add(id);
       }
     }
-    for (let id in this.weakChanges) {
-      results.add(id);
-    }
     return [...results];
   }
 
-  clear() {
-    super.clear();
-    this.weakChanges = {};
-  }
-
   isTombstone(id) {
     let change = this.changes[id];
     if (change) {
       return change.tombstone;
     }
-    let weakChange = this.weakChanges[id];
-    if (weakChange) {
-      return weakChange.tombstone;
-    }
     return false;
   }
 }
--- a/services/sync/tests/unit/test_bookmark_engine.js
+++ b/services/sync/tests/unit/test_bookmark_engine.js
@@ -639,23 +639,16 @@ add_task(async function test_sync_dateAd
     let item6 = new Bookmark("bookmarks", item6GUID);
     item6.bmkUri = "https://example.com/6";
     item6.title = "asdf6";
     item6.parentName = "Bookmarks Toolbar";
     item6.parentid = "toolbar";
     const item6LastModified = (now - oneYearMS) / 1000;
     collection.insert(item6GUID, encryptPayload(item6.cleartext), item6LastModified);
 
-    let origBuildWeakReuploadMap = engine.buildWeakReuploadMap;
-    engine.buildWeakReuploadMap = async (set) => {
-      let fullMap = await origBuildWeakReuploadMap.call(engine, set);
-      fullMap.delete(item6GUID);
-      return fullMap;
-    };
-
     await sync_engine_and_validate_telem(engine, false);
 
     let record1 = await store.createRecord(item1GUID);
     let record2 = await store.createRecord(item2GUID);
 
     equal(item1.dateAdded, record1.dateAdded, "dateAdded in past should be synced");
     equal(record2.dateAdded, item2LastModified * 1000, "dateAdded in future should be ignored in favor of last modified");
 
@@ -670,25 +663,16 @@ add_task(async function test_sync_dateAd
     let record4 = await store.createRecord(item4GUID);
     equal(record4.dateAdded, item4LastModified * 1000,
           "If no dateAdded is provided, lastModified should be used");
 
     let record5 = await store.createRecord(item5GUID);
     equal(record5.dateAdded, item5LastModified * 1000,
           "If no dateAdded is provided, lastModified should be used (even if it's in the future)");
 
-    let item6WBO = JSON.parse(JSON.parse(collection._wbos[item6GUID].payload).ciphertext);
-    ok(!item6WBO.dateAdded,
-       "If we think an item has been modified locally, we don't upload it to the server");
-
-    let record6 = await store.createRecord(item6GUID);
-    equal(record6.dateAdded, item6LastModified * 1000,
-       "We still remember the more accurate dateAdded if we don't upload a record due to local changes");
-    engine.buildWeakReuploadMap = origBuildWeakReuploadMap;
-
     // Update item2 and try resyncing it.
     item2.dateAdded = now - 100000;
     collection.insert(item2GUID, encryptPayload(item2.cleartext), now / 1000 - 50);
 
 
     // Also, add a local bookmark and make sure it's date added makes it up to the server
     let bzid = PlacesUtils.bookmarks.insertBookmark(
       PlacesUtils.bookmarksMenuFolderId, Utils.makeURI("https://bugzilla.mozilla.org/"),
--- a/services/sync/tests/unit/test_bookmark_repair_responder.js
+++ b/services/sync/tests/unit/test_bookmark_repair_responder.js
@@ -520,11 +520,96 @@ add_task(async function test_aborts_unkn
       extra: { flowID: request.flowID,
                reason: "Don't understand request type 'not-upload'",
              },
     },
   ]);
   await cleanup(server);
 });
 
+add_task(async function test_upload_fail() {
+  let server = await makeServer();
+
+  // Pretend we've already synced this bookmark, so that we can ensure it's
+  // uploaded in response to our repair request.
+  let bm = await PlacesUtils.bookmarks.insert({ parentGuid: PlacesUtils.bookmarks.unfiledGuid,
+                                                title: "Get Firefox",
+                                                url: "http://getfirefox.com/",
+                                                source: PlacesUtils.bookmarks.SOURCES.SYNC });
+
+  await Service.sync();
+  let request = {
+    request: "upload",
+    ids: [bm.guid],
+    flowID: Utils.makeGUID(),
+  }
+  let responder = new BookmarkRepairResponder();
+  await responder.repair(request, null);
+
+  checkRecordedEvents([
+    { object: "repairResponse",
+      method: "uploading",
+      value: undefined,
+      extra: {flowID: request.flowID, numIDs: "1"},
+    },
+  ]);
+
+  // This sync would normally upload the item - arrange for it to fail.
+  let engine = Service.engineManager.get("bookmarks");
+  let oldCreateRecord = engine._createRecord;
+  engine._createRecord = async function(id) {
+    return "anything"; // doesn't have an "encrypt"
+  }
+
+  let numFailures = 0;
+  let numSuccesses = 0;
+  function onUploaded(subject, data) {
+    if (data != "bookmarks") {
+      return;
+    }
+    if (subject.failed) {
+      numFailures += 1;
+    } else {
+      numSuccesses += 1;
+    }
+  }
+  Svc.Obs.add("weave:engine:sync:uploaded", onUploaded, this);
+
+  await Service.sync();
+
+  equal(numFailures, 1);
+  equal(numSuccesses, 0);
+
+  // should be no recorded events
+  checkRecordedEvents([]);
+
+  // restore the error injection so next sync succeeds - the repair should
+  // restart
+  engine._createRecord = oldCreateRecord;
+  await responder.repair(request, null);
+
+  checkRecordedEvents([
+    { object: "repairResponse",
+      method: "uploading",
+      value: undefined,
+      extra: {flowID: request.flowID, numIDs: "1"},
+    },
+  ]);
+
+  await Service.sync();
+  checkRecordedEvents([
+    { object: "repairResponse",
+      method: "finished",
+      value: undefined,
+      extra: {flowID: request.flowID, numIDs: "1"},
+    },
+  ])
+
+  equal(numFailures, 1);
+  equal(numSuccesses, 1);
+
+  Svc.Obs.remove("weave:engine:sync:uploaded", onUploaded, this);
+  await cleanup(server);
+});
+
 add_task(async function teardown() {
   Svc.Prefs.reset("engine.bookmarks.validation.enabled");
 });
--- a/services/sync/tests/unit/test_bookmark_tracker.js
+++ b/services/sync/tests/unit/test_bookmark_tracker.js
@@ -40,17 +40,17 @@ async function verifyTrackerEmpty() {
 
 async function resetTracker() {
   await PlacesTestUtils.markBookmarksAsSynced();
   tracker.resetScore();
 }
 
 async function cleanup() {
   engine.lastSync = 0;
-  engine._needWeakReupload.clear()
+  engine._needWeakUpload.clear()
   await store.wipe();
   await resetTracker();
   await stopTracking();
 }
 
 // startTracking is a signal that the test wants to notice things that happen
 // after this is called (ie, things already tracked should be discarded.)
 async function startTracking() {
--- a/toolkit/components/places/PlacesSyncUtils.jsm
+++ b/toolkit/components/places/PlacesSyncUtils.jsm
@@ -109,21 +109,18 @@ const BookmarkSyncUtils = PlacesSyncUtil
   },
 
   /**
    * Resolves to an array of the syncIDs of bookmarks that have a nonzero change
    * counter
    */
   async getChangedIds() {
     let db = await PlacesUtils.promiseDBConnection();
-    let result = await db.executeCached(`
-      SELECT guid FROM moz_bookmarks
-      WHERE syncChangeCounter >= 1`);
-    return result.map(row =>
-      BookmarkSyncUtils.guidToSyncId(row.getResultByName("guid")));
+    let changes = await pullSyncChanges(db);
+    return Object.keys(changes);
   },
 
   /**
    * Fetches the sync IDs for a folder's children, ordered by their position
    * within the folder.
    */
   async fetchChildSyncIds(parentSyncId) {
     PlacesUtils.SYNC_BOOKMARK_VALIDATORS.syncId(parentSyncId);
@@ -1728,19 +1725,18 @@ function addRowToChangeRecords(row, chan
     counter: row.getResultByName("syncChangeCounter"),
     status: row.getResultByName("syncStatus"),
     tombstone: !!row.getResultByName("tombstone"),
     synced: false,
   };
 }
 
 /**
- * Queries the database for synced bookmarks and tombstones, updates the sync
- * status of all "NEW" bookmarks to "NORMAL", and returns a changeset for the
- * Sync bookmarks engine.
+ * Queries the database for synced bookmarks and tombstones, and returns a
+ * changeset for the Sync bookmarks engine.
  *
  * @param db
  *        The Sqlite.jsm connection handle.
  * @return {Promise} resolved once all items have been fetched.
  * @resolves to an object containing records for changed bookmarks, keyed by
  *           the sync ID.
  */
 var pullSyncChanges = async function(db) {