Merge places and mozilla-central. a=blockers
authorPhilipp von Weitershausen <philipp@weitershausen.de>
Sat, 29 Jan 2011 15:51:29 -0800
changeset 61611 aecb4a72e8d81d07fa00bb1e2e3ec4eafaf5f84b
parent 61601 dd5e9bfd0f6aade836e51f6a7242927441106d9d (current diff)
parent 61610 2a7a03e4d6d80b1bb7029cabb79aa9f9072e5430 (diff)
child 61612 b0f17cbbfd20be2ed628aec00384f6651492c293
push id1
push userroot
push dateTue, 10 Dec 2013 15:46:25 +0000
reviewersblockers
milestone2.0b11pre
Merge places and mozilla-central. a=blockers
--- a/services/sync/modules/constants.js
+++ b/services/sync/modules/constants.js
@@ -83,16 +83,20 @@ HMAC_EVENT_INTERVAL:                   6
 
 // How long to wait between sync attempts if the Master Password is locked.
 MASTER_PASSWORD_LOCKED_RETRY_INTERVAL: 15 * 60 * 1000,   // 15 minutes
 
 // 50 is hardcoded here because of URL length restrictions.
 // (GUIDs can be up to 64 chars long)
 MOBILE_BATCH_SIZE:                     50,
 
+// Default batch size for applying incoming records.
+DEFAULT_STORE_BATCH_SIZE:              1,
+HISTORY_STORE_BATCH_SIZE:              50, // same as MOBILE_BATCH_SIZE
+
 // score thresholds for early syncs
 SINGLE_USER_THRESHOLD:                 1000,
 MULTI_DESKTOP_THRESHOLD:               500,
 MULTI_MOBILE_THRESHOLD:                100,
 
 // File IO Flags
 MODE_RDONLY:                           0x01,
 MODE_WRONLY:                           0x02,
@@ -146,16 +150,17 @@ NO_SYNC_NODE_FOUND:                    "
 OVER_QUOTA:                            "error.sync.reason.over_quota",
 
 RESPONSE_OVER_QUOTA:                   "14",
 
 // engine failure status codes
 ENGINE_UPLOAD_FAIL:                    "error.engine.reason.record_upload_fail",
 ENGINE_DOWNLOAD_FAIL:                  "error.engine.reason.record_download_fail",
 ENGINE_UNKNOWN_FAIL:                   "error.engine.reason.unknown_fail",
+ENGINE_APPLY_FAIL:                     "error.engine.reason.apply_fail",
 ENGINE_METARECORD_DOWNLOAD_FAIL:       "error.engine.reason.metarecord_download_fail",
 ENGINE_METARECORD_UPLOAD_FAIL:         "error.engine.reason.metarecord_upload_fail",
 
 JPAKE_ERROR_CHANNEL:                   "jpake.error.channel",
 JPAKE_ERROR_NETWORK:                   "jpake.error.network",
 JPAKE_ERROR_SERVER:                    "jpake.error.server",
 JPAKE_ERROR_TIMEOUT:                   "jpake.error.timeout",
 JPAKE_ERROR_INTERNAL:                  "jpake.error.internal",
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -190,16 +190,31 @@ function Store(name) {
   name = name || "Unnamed";
   this.name = name.toLowerCase();
 
   this._log = Log4Moz.repository.getLogger("Store." + name);
   let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
   this._log.level = Log4Moz.Level[level];
 }
 Store.prototype = {
+
+  applyIncomingBatch: function applyIncomingBatch(records) {
+    let failed = [];
+    records.forEach(function (record) {
+      try {
+        this.applyIncoming(record);
+      } catch (ex) {
+        this._log.warn("Failed to apply incoming record " + record.id);
+        this._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
+        failed.push(record.id);
+      }
+    }, this);
+    return failed;
+  },
+
   applyIncoming: function Store_applyIncoming(record) {
     if (record.deleted)
       this.remove(record);
     else if (!this.itemExists(record.id))
       this.create(record);
     else
       this.update(record);
   },
@@ -316,17 +331,16 @@ EngineManagerSvc.prototype = {
       name = val.name;
     delete this._engines[name];
   }
 };
 
 function Engine(name) {
   this.Name = name || "Unnamed";
   this.name = name.toLowerCase();
-  this.downloadLimit = null;
 
   this._notify = Utils.notify("weave:engine:");
   this._log = Log4Moz.repository.getLogger("Engine." + this.Name);
   let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
   this._log.level = Log4Moz.Level[level];
 
   this._tracker; // initialize tracker to load previously changed IDs
   this._log.debug("Engine initialized");
@@ -470,16 +484,18 @@ Engine.prototype = {
 function SyncEngine(name) {
   Engine.call(this, name || "SyncEngine");
   this.loadToFetch();
 }
 SyncEngine.prototype = {
   __proto__: Engine.prototype,
   _recordObj: CryptoWrapper,
   version: 1,
+  downloadLimit: null,
+  applyIncomingBatchSize: DEFAULT_STORE_BATCH_SIZE,
 
   get storageURL() Svc.Prefs.get("clusterURL") + Svc.Prefs.get("storageAPI") +
     "/" + ID.get("WeaveID").username + "/storage/",
 
   get engineURL() this.storageURL + this.name,
 
   get cryptoKeysURL() this.storageURL + "crypto/keys",
 
@@ -644,18 +660,43 @@ SyncEngine.prototype = {
     let newitems = new Collection(this.engineURL, this._recordObj);
     if (Svc.Prefs.get("client.type") == "mobile") {
       batchSize = MOBILE_BATCH_SIZE;
     }
     newitems.newer = this.lastSync;
     newitems.full = true;
     newitems.limit = batchSize;
 
-    let count = {applied: 0, reconciled: 0};
+    let count = {applied: 0, failed: 0, reconciled: 0};
     let handled = [];
+    let applyBatch = [];
+    let failed = [];
+    let fetchBatch = this.toFetch;
+
+    function doApplyBatch() {
+      this._tracker.ignoreAll = true;
+      failed = failed.concat(this._store.applyIncomingBatch(applyBatch));
+      this._tracker.ignoreAll = false;
+      applyBatch = [];
+    }
+
+    function doApplyBatchAndPersistFailed() {
+      // Apply remaining batch.
+      if (applyBatch.length) {
+        doApplyBatch.call(this);
+      }
+      // Persist failed items so we refetch them.
+      if (failed.length) {
+        this.toFetch = Utils.arrayUnion(failed, this.toFetch);
+        count.failed += failed.length;
+        this._log.debug("Records that failed to apply: " + failed);
+        failed = [];
+      }
+    }
+
     newitems.recordHandler = Utils.bind2(this, function(item) {
       // Grab a later last modified if possible
       if (this.lastModified == null || item.modified > this.lastModified)
         this.lastModified = item.modified;
 
       // Track the collection for the WBO.
       item.collection = this.name;
       
@@ -667,40 +708,51 @@ SyncEngine.prototype = {
           item.decrypt();
         } catch (ex if (Utils.isHMACMismatch(ex) &&
                         this.handleHMACMismatch())) {
           // Let's try handling it.
           // If the callback returns true, try decrypting again, because
           // we've got new keys.
           this._log.info("Trying decrypt again...");
           item.decrypt();
-        }
-       
-        if (this._reconcile(item)) {
-          count.applied++;
-          this._tracker.ignoreAll = true;
-          this._store.applyIncoming(item);
-        } else {
-          count.reconciled++;
-          this._log.trace("Skipping reconciled incoming item " + item.id);
-        }
-      } catch (ex if (Utils.isHMACMismatch(ex))) {
-        this._log.warn("Error processing record: " + Utils.exceptionStr(ex));
+        }       
+      } catch (ex) {
+        this._log.warn("Error decrypting record: " + Utils.exceptionStr(ex));
+        failed.push(item.id);
+        return;
+      }
 
-        // Upload a new record to replace the bad one if we have it
-        if (this._store.itemExists(item.id))
-          this._modified[item.id] = 0;
+      let shouldApply;
+      try {
+        shouldApply = this._reconcile(item);
+      } catch (ex) {
+        this._log.warn("Failed to reconcile incoming record " + item.id);
+        this._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
+        failed.push(item.id);
+        return;
       }
-      this._tracker.ignoreAll = false;
+
+      if (shouldApply) {
+        count.applied++;
+        applyBatch.push(item);
+      } else {
+        count.reconciled++;
+        this._log.trace("Skipping reconciled incoming item " + item.id);
+      }
+
+      if (applyBatch.length == this.applyIncomingBatchSize) {
+        doApplyBatch.call(this);
+      }
       Sync.sleep(0);
     });
 
     // Only bother getting data from the server if there's new things
     if (this.lastModified == null || this.lastModified > this.lastSync) {
       let resp = newitems.get();
+      doApplyBatchAndPersistFailed.call(this);
       if (!resp.success) {
         resp.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw resp;
       }
     }
 
     // Mobile: check if we got the maximum that we requested; get the rest if so.
     if (handled.length == newitems.limit) {
@@ -715,50 +767,68 @@ SyncEngine.prototype = {
 
       let guids = guidColl.get();
       if (!guids.success)
         throw guids;
 
       // Figure out which guids weren't just fetched then remove any guids that
       // were already waiting and prepend the new ones
       let extra = Utils.arraySub(guids.obj, handled);
-      if (extra.length > 0)
-        this.toFetch = extra.concat(Utils.arraySub(this.toFetch, extra));
+      if (extra.length > 0) {
+        fetchBatch = Utils.arrayUnion(extra, fetchBatch);
+        this.toFetch = Utils.arrayUnion(extra, this.toFetch);
+      }
     }
 
     // Fast-foward the lastSync timestamp since we have stored the
     // remaining items in toFetch.
     if (this.lastSync < this.lastModified) {
       this.lastSync = this.lastModified;
     }
 
     // Mobile: process any backlog of GUIDs
-    while (this.toFetch.length) {
+    while (fetchBatch.length) {
       // Reuse the original query, but get rid of the restricting params
       // and batch remaining records.
       newitems.limit = 0;
       newitems.newer = 0;
-      newitems.ids = this.toFetch.slice(0, batchSize);
+      newitems.ids = fetchBatch.slice(0, batchSize);
 
       // Reuse the existing record handler set earlier
       let resp = newitems.get();
       if (!resp.success) {
         resp.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw resp;
       }
 
-      // This batch was successfully applied.
-      this.toFetch = this.toFetch.slice(batchSize);
+      // This batch was successfully applied. Not using
+      // doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
+      fetchBatch = fetchBatch.slice(batchSize);
+      let newToFetch = Utils.arraySub(this.toFetch, newitems.ids);
+      this.toFetch = Utils.arrayUnion(newToFetch, failed);
+      count.failed += failed.length;
+      this._log.debug("Records that failed to apply: " + failed);
+      failed = [];
       if (this.lastSync < this.lastModified) {
         this.lastSync = this.lastModified;
       }
     }
 
-    this._log.info(["Records:", count.applied, "applied,", count.reconciled,
-      "reconciled."].join(" "));
+    // Apply remaining items.
+    doApplyBatchAndPersistFailed.call(this);
+
+    if (count.failed) {
+      // Notify observers if records failed to apply. Pass the count object
+      // along so that they can make an informed decision on what to do.
+      Observers.notify("weave:engine:sync:apply-failed", count, this.name);
+    }
+    this._log.info(["Records:",
+                    count.applied, "applied,",
+                    count.failed, "failed to apply,",
+                    count.reconciled, "reconciled."].join(" "));
   },
 
   /**
    * Find a GUID of an item that is a duplicate of the incoming item but happens
    * to have a different GUID
    *
    * @return GUID of the similar item; falsy otherwise
    */
--- a/services/sync/modules/engines/bookmarks.js
+++ b/services/sync/modules/engines/bookmarks.js
@@ -262,18 +262,28 @@ BookmarksEngine.prototype = {
 
       let lazyMap = {};
       for (let guid in this._store.getAllIDs()) {
         // Figure out what key to store the mapping
         let key;
         let id = this._store.idForGUID(guid);
         switch (Svc.Bookmark.getItemType(id)) {
           case Svc.Bookmark.TYPE_BOOKMARK:
-            key = "b" + Svc.Bookmark.getBookmarkURI(id).spec + ":" +
-              Svc.Bookmark.getItemTitle(id);
+
+            // Smart bookmarks map to their annotation value.
+            let queryId;
+            try {
+              queryId = Utils.anno(id, SMART_BOOKMARKS_ANNO);
+            } catch(ex) {}
+            
+            if (queryId)
+              key = "q" + queryId;
+            else
+              key = "b" + Svc.Bookmark.getBookmarkURI(id).spec + ":" +
+                    Svc.Bookmark.getItemTitle(id);
             break;
           case Svc.Bookmark.TYPE_FOLDER:
             key = "f" + Svc.Bookmark.getItemTitle(id);
             break;
           case Svc.Bookmark.TYPE_SEPARATOR:
             key = "s" + Svc.Bookmark.getItemIndex(id);
             break;
           default:
@@ -297,19 +307,29 @@ BookmarksEngine.prototype = {
         lazyMap[parentName][key] = entry;
         this._log.trace("Mapped: " + [parentName, key, entry, entry.hasDupe]);
       }
 
       // Expose a helper function to get a dupe guid for an item
       return this._lazyMap = function(item) {
         // Figure out if we have something to key with
         let key;
+        let altKey;
         switch (item.type) {
+          case "query":
+            // Prior to Bug 610501, records didn't carry their Smart Bookmark
+            // anno, so we won't be able to dupe them correctly. This altKey
+            // hack should get them to dupe correctly.
+            if (item.queryId) {
+              key = "q" + item.queryId;
+              altKey = "b" + item.bmkUri + ":" + item.title;
+              break;
+            }
+            // No queryID? Fall through to the regular bookmark case.
           case "bookmark":
-          case "query":
           case "microsummary":
             key = "b" + item.bmkUri + ":" + item.title;
             break;
           case "folder":
           case "livemark":
             key = "f" + item.title;
             break;
           case "separator":
@@ -317,19 +337,39 @@ BookmarksEngine.prototype = {
             break;
           default:
             return;
         }
 
         // Give the guid if we have the matching pair
         this._log.trace("Finding mapping: " + item.parentName + ", " + key);
         let parent = lazyMap[item.parentName];
-        let dupe = parent && parent[key];
-        this._log.trace("Mapped dupe: " + dupe);
-        return dupe;
+        
+        if (!parent) {
+          this._log.trace("No parent => no dupe.");
+          return undefined;
+        }
+          
+        let dupe = parent[key];
+        
+        if (dupe) {
+          this._log.trace("Mapped dupe: " + dupe);
+          return dupe;
+        }
+        
+        if (altKey) {
+          dupe = parent[altKey];
+          if (dupe) {
+            this._log.trace("Mapped dupe using altKey " + altKey + ": " + dupe);
+            return dupe;
+          }
+        }
+        
+        this._log.trace("No dupe found for key " + key + "/" + altKey + ".");
+        return undefined;
       };
     });
 
     this._store._childrenToOrder = {};
   },
 
   _processIncoming: function _processIncoming() {
     try {
@@ -441,63 +481,87 @@ BookmarksStore.prototype = {
                   getService(Ci.nsITaggingService);
     return this.__ts;
   },
 
 
   itemExists: function BStore_itemExists(id) {
     return this.idForGUID(id, true) > 0;
   },
+  
+  /*
+   * If the record is a tag query, rewrite it to refer to the local tag ID.
+   * 
+   * Otherwise, just return.
+   */
+  preprocessTagQuery: function preprocessTagQuery(record) {
+    if (record.type != "query" ||
+        record.bmkUri == null ||
+        record.folderName == null)
+      return;
+    
+    // Yes, this works without chopping off the "place:" prefix.
+    let uri           = record.bmkUri
+    let queriesRef    = {};
+    let queryCountRef = {};
+    let optionsRef    = {};
+    Svc.History.queryStringToQueries(uri, queriesRef, queryCountRef, optionsRef);
+    
+    // We only process tag URIs.
+    if (optionsRef.value.resultType != optionsRef.value.RESULTS_AS_TAG_CONTENTS)
+      return;
+    
+    // Tag something to ensure that the tag exists.
+    let tag = record.folderName;
+    let dummyURI = Utils.makeURI("about:weave#BStore_preprocess");
+    this._ts.tagURI(dummyURI, [tag]);
 
+    // Look for the id of the tag, which might just have been added.
+    let tags = this._getNode(this._bms.tagsFolder);
+    if (!(tags instanceof Ci.nsINavHistoryQueryResultNode)) {
+      this._log.debug("tags isn't an nsINavHistoryQueryResultNode; aborting.");
+      return;
+    }
+
+    tags.containerOpen = true;
+    for (let i = 0; i < tags.childCount; i++) {
+      let child = tags.getChild(i);
+      if (child.title == tag) {
+        // Found the tag, so fix up the query to use the right id.
+        this._log.debug("Tag query folder: " + tag + " = " + child.itemId);
+        
+        this._log.trace("Replacing folders in: " + uri);
+        for each (let q in queriesRef.value)
+          q.setFolders([child.itemId], 1);
+        
+        record.bmkUri = Svc.History.queriesToQueryString(queriesRef.value,
+                                                         queryCountRef.value,
+                                                         optionsRef.value);
+        return;
+      }
+    }
+  },
+  
   applyIncoming: function BStore_applyIncoming(record) {
     // Don't bother with pre and post-processing for deletions.
     if (record.deleted) {
       Store.prototype.applyIncoming.apply(this, arguments);
       return;
     }
 
     // For special folders we're only interested in child ordering.
     if ((record.id in kSpecialIds) && record.children) {
       this._log.debug("Processing special node: " + record.id);
       // Reorder children later
       this._childrenToOrder[record.id] = record.children;
       return;
     }
 
-    // Preprocess the record before doing the normal apply
-    switch (record.type) {
-      case "query": {
-        // Convert the query uri if necessary
-        if (record.bmkUri == null || record.folderName == null)
-          break;
-
-        // Tag something so that the tag exists
-        let tag = record.folderName;
-        let dummyURI = Utils.makeURI("about:weave#BStore_preprocess");
-        this._ts.tagURI(dummyURI, [tag]);
-
-        // Look for the id of the tag (that might have just been added)
-        let tags = this._getNode(this._bms.tagsFolder);
-        if (!(tags instanceof Ci.nsINavHistoryQueryResultNode))
-          break;
-
-        tags.containerOpen = true;
-        for (let i = 0; i < tags.childCount; i++) {
-          let child = tags.getChild(i);
-          // Found the tag, so fix up the query to use the right id
-          if (child.title == tag) {
-            this._log.debug("query folder: " + tag + " = " + child.itemId);
-            record.bmkUri = record.bmkUri.replace(/([:&]folder=)\d+/, "$1" +
-              child.itemId);
-            break;
-          }
-        }
-        break;
-      }
-    }
+    // Preprocess the record before doing the normal apply.
+    this.preprocessTagQuery(record);
 
     // Figure out the local id of the parent GUID if available
     let parentGUID = record.parentid;
     if (!parentGUID) {
       throw "Record " + record.id + " has invalid parentid: " + parentGUID;
     }
 
     let parentId = this.idForGUID(parentGUID);
--- a/services/sync/modules/engines/history.js
+++ b/services/sync/modules/engines/history.js
@@ -41,46 +41,50 @@ const EXPORTED_SYMBOLS = ['HistoryEngine
 
 const Cc = Components.classes;
 const Ci = Components.interfaces;
 const Cu = Components.utils;
 const Cr = Components.results;
 
 const GUID_ANNO = "sync/guid";
 const HISTORY_TTL = 5184000; // 60 days
+const TOPIC_UPDATEPLACES_COMPLETE = "places-updatePlaces-complete";
 
 Cu.import("resource://gre/modules/XPCOMUtils.jsm");
 Cu.import("resource://services-sync/constants.js");
 Cu.import("resource://services-sync/engines.js");
 Cu.import("resource://services-sync/record.js");
 Cu.import("resource://services-sync/util.js");
 Cu.import("resource://services-sync/log4moz.js");
+Cu.import("resource://services-sync/ext/Sync.js");
 
 function HistoryRec(collection, id) {
   CryptoWrapper.call(this, collection, id);
 }
 HistoryRec.prototype = {
   __proto__: CryptoWrapper.prototype,
   _logName: "Record.History",
   ttl: HISTORY_TTL
 };
 
 Utils.deferGetSet(HistoryRec, "cleartext", ["histUri", "title", "visits"]);
 
 
 function HistoryEngine() {
   SyncEngine.call(this, "History");
-  this.downloadLimit = MAX_HISTORY_DOWNLOAD;
 }
 HistoryEngine.prototype = {
   __proto__: SyncEngine.prototype,
   _recordObj: HistoryRec,
   _storeObj: HistoryStore,
   _trackerObj: HistoryTracker,
+  downloadLimit: MAX_HISTORY_DOWNLOAD,
+  applyIncomingBatchSize: HISTORY_STORE_BATCH_SIZE,
 
+  // For Gecko <2.0
   _sync: Utils.batchSync("History", SyncEngine),
 
   _findDupe: function _findDupe(item) {
     return this._store.GUIDForUri(item.histUri);
   }
 };
 
 function HistoryStore(name) {
@@ -103,16 +107,25 @@ HistoryStore.prototype = {
       this.__hsvc = Cc["@mozilla.org/browser/nav-history-service;1"].
                     getService(Ci.nsINavHistoryService).
                     QueryInterface(Ci.nsIGlobalHistory2).
                     QueryInterface(Ci.nsIBrowserHistory).
                     QueryInterface(Ci.nsPIPlacesDatabase);
     return this.__hsvc;
   },
 
+  __asyncHistory: null,
+  get _asyncHistory() {
+    if (!this.__asyncHistory && "mozIAsyncHistory" in Components.interfaces) {
+      this.__asyncHistory = Cc["@mozilla.org/browser/history;1"]
+                              .getService(Ci.mozIAsyncHistory);
+    }
+    return this.__asyncHistory;
+  },
+
   get _db() {
     return this._hsvc.DBConnection;
   },
 
   _stmts: {},
   _getStmt: function(query) {
     if (query in this._stmts)
       return this._stmts[query];
@@ -400,16 +413,139 @@ HistoryStore.prototype = {
     let urls = Utils.queryAsync(this._allUrlStm, "url");
     let self = this;
     return urls.reduce(function(ids, item) {
       ids[self.GUIDForUri(item.url, true)] = item.url;
       return ids;
     }, {});
   },
 
+  applyIncomingBatch: function applyIncomingBatch(records) {
+    // Gecko <2.0
+    if (!this._asyncHistory) {
+      return Store.prototype.applyIncomingBatch.apply(this, arguments);
+    }
+
+    // Gecko 2.0
+    let failed = [];
+
+    // Convert incoming records to mozIPlaceInfo objects.
+    let placeInfos = records.map(function (record) {
+      // This is still synchronous I/O for now.
+      if (record.deleted) {
+        try {
+          // Consider using nsIBrowserHistory::removePages() here.
+          this.remove(record);
+        } catch (ex) {
+          this._log.warn("Failed to delete record " + record.id);
+          failed.push(record.id);
+        }
+        return null;
+      }
+      try {
+        return this._recordToPlaceInfo(record);
+      } catch(ex) {
+        failed.push(record.id);
+        return null;
+      }
+    }, this);
+
+    // Filter out the places that can't be added (they're null)
+    function identity(obj) {
+      return obj;
+    }
+    placeInfos = placeInfos.filter(identity);
+
+    // Nothing to do.
+    if (!placeInfos.length) {
+      return failed;
+    }
+
+    let [updatePlaces, cb] = Sync.withCb(this._asyncHistory.updatePlaces,
+                                         this._asyncHistory);
+    let onPlace = function onPlace(result, placeInfo) {
+      if (!Components.isSuccessCode(result)) {
+        failed.push(placeInfo.guid);
+      }
+    };
+    let onComplete = function onComplete(subject, topic, data) {
+      Svc.Obs.remove(TOPIC_UPDATEPLACES_COMPLETE, onComplete);
+      cb();
+    };
+    Svc.Obs.add(TOPIC_UPDATEPLACES_COMPLETE, onComplete);
+    updatePlaces(placeInfos, onPlace);
+    return failed;
+  },
+
+  /**
+   * Converts a Sync history record to a mozIPlaceInfo.
+   * 
+   * Throws if an invalid record is encountered (invalid URI, etc.)
+   * and returns null if the record is to be ignored (no visits to add, etc.)
+   */
+  _recordToPlaceInfo: function _recordToPlaceInfo(record) {
+    // Sort out invalid URIs and ones Places just simply doesn't want.
+    let uri = Utils.makeURI(record.histUri);
+    if (!uri) {
+      this._log.warn("Attempted to process invalid URI, skipping.");
+      throw "Invalid URI in record";
+    }
+
+    if (!Utils.checkGUID(record.id)) {
+      this._log.warn("Encountered record with invalid GUID: " + record.id);
+      return null;
+    }
+
+    if (!this._hsvc.canAddURI(uri)) {
+      this._log.trace("Ignoring record " + record.id +
+                      " with URI " + uri.spec + ": can't add this URI.");
+      return null;
+    }
+
+    // We dupe visits by date and type. So an incoming visit that has
+    // the same timestamp and type as a local one won't get applied.
+    let curVisitsByDate = {};
+    for each (let {date, type} in this._getVisits(record.histUri)) {
+      curVisitsByDate[date] = type;
+    }
+    let visits = record.visits.filter(function (visit) {
+      if (!visit.date || typeof visit.date != "number") {
+        this._log.warn("Encountered record with invalid visit date: "
+                       + visit.date);
+        throw "Visit has no date!";
+      }
+      // TRANSITION_FRAMED_LINK = TRANSITION_DOWNLOAD + 1 is new in Gecko 2.0
+      if (!visit.type || !(visit.type >= Svc.History.TRANSITION_LINK &&
+                           visit.type <= Svc.History.TRANSITION_DOWNLOAD + 1)) {
+        this._log.warn("Encountered record with invalid visit type: "
+                       + visit.type);
+        throw "Invalid visit type!";
+      }
+      // Dates need to be integers
+      visit.date = Math.round(visit.date);
+      return curVisitsByDate[visit.date] != visit.type;
+    });
+
+    // No update if there aren't any visits to apply.
+    // mozIAsyncHistory::updatePlaces() wants at least one visit.
+    // In any case, the only thing we could change would be the title
+    // and that shouldn't change without a visit.
+    if (!visits.length) {
+      this._log.trace("Ignoring record " + record.id +
+                      " with URI " + uri.spec + ": no visits to add.");
+      return null;
+    }
+
+    return {uri: uri,
+            guid: record.id,
+            title: record.title,
+            visits: [{visitDate: visit.date, transitionType: visit.type}
+                      for each (visit in visits)]};
+  },
+
   create: function HistStore_create(record) {
     // Add the url and set the GUID
     this.update(record);
     this.setGUID(record.histUri, record.id);
   },
 
   remove: function HistStore_remove(record) {
     let page = this._findURLByGUID(record.id);
@@ -421,33 +557,29 @@ HistoryStore.prototype = {
     let uri = Utils.makeURI(page.url);
     Svc.History.removePage(uri);
     this._log.trace("Removed page: " + [record.id, page.url, page.title]);
   },
 
   update: function HistStore_update(record) {
     this._log.trace("  -> processing history entry: " + record.histUri);
 
-    let uri = Utils.makeURI(record.histUri);
-    if (!uri) {
-      this._log.warn("Attempted to process invalid URI, skipping");
-      throw "invalid URI in record";
+    let placeInfo = this._recordToPlaceInfo(record);
+    if (!placeInfo) {
+      return;
     }
-    let curvisits = [];
-    if (this.urlExists(uri))
-      curvisits = this._getVisits(record.histUri);
 
-    // Add visits if there's no local visit with the same date
-    for each (let {date, type} in record.visits)
-      if (curvisits.every(function(cur) cur.date != date))
-        Svc.History.addVisit(uri, date, null, type, type == 5 || type == 6, 0);
+    for each (let {visitDate, transitionType} in placeInfo.visits) {
+      Svc.History.addVisit(placeInfo.uri, visitDate, null, transitionType,
+                           transitionType == 5 || transitionType == 6, 0);
+    }
 
     if (record.title) {
       try {
-        this._hsvc.setPageTitle(uri, record.title);
+        this._hsvc.setPageTitle(placeInfo.uri, record.title);
       } catch (ex if ex.result == Cr.NS_ERROR_NOT_AVAILABLE) {
         // There's no entry for the given URI, either because it's a
         // URI that Places ignores (e.g. javascript:) or there were no
         // visits.  We can just ignore those cases.
       }
     }
   },
 
--- a/services/sync/modules/service.js
+++ b/services/sync/modules/service.js
@@ -396,16 +396,17 @@ WeaveSvc.prototype = {
     }
 
     Svc.Obs.add("weave:service:setup-complete", this);
     Svc.Obs.add("network:offline-status-changed", this);
     Svc.Obs.add("weave:service:sync:finish", this);
     Svc.Obs.add("weave:service:sync:error", this);
     Svc.Obs.add("weave:service:backoff:interval", this);
     Svc.Obs.add("weave:engine:score:updated", this);
+    Svc.Obs.add("weave:engine:sync:apply-failed", this);
     Svc.Obs.add("weave:resource:status:401", this);
     Svc.Prefs.observe("engine.", this);
 
     if (!this.enabled)
       this._log.info("Weave Sync disabled");
 
     // Create Weave identities (for logging in, and for encryption)
     let id = ID.get("WeaveID");
@@ -554,16 +555,24 @@ WeaveSvc.prototype = {
       case "weave:service:backoff:interval":
         let interval = (data + Math.random() * data * 0.25) * 1000; // required backoff + up to 25%
         Status.backoffInterval = interval;
         Status.minimumNextSync = Date.now() + data;
         break;
       case "weave:engine:score:updated":
         this._handleScoreUpdate();
         break;
+      case "weave:engine:sync:apply-failed":
+        // An engine isn't able to apply one or more incoming records.
+        // We don't fail hard on this, but it usually indicates a bug,
+        // so for now treat it as sync error (c.f. Service._syncEngine())
+        Status.engines = [data, ENGINE_APPLY_FAIL];
+        this._syncError = true;
+        this._log.debug(data + " failed to apply some records.");
+        break;
       case "weave:resource:status:401":
         this._handleResource401(subject);
         break;
       case "idle":
         this._log.trace("Idle time hit, trying to sync");
         Svc.Idle.removeIdleObserver(this, this._idleTime);
         this._idleTime = 0;
         Utils.delay(function() this.sync(false), 0, this);
--- a/services/sync/modules/util.js
+++ b/services/sync/modules/util.js
@@ -259,16 +259,21 @@ let Utils = {
   /**
    * GUIDs are 9 random bytes encoded with base64url (RFC 4648).
    * That makes them 12 characters long with 72 bits of entropy.
    */
   makeGUID: function makeGUID() {
     return Utils.encodeBase64url(Utils.generateRandomBytes(9));
   },
 
+  _base64url_regex: /^[-abcdefghijklmnopqrstuvwxyz0123456789_]{12}$/i,
+  checkGUID: function checkGUID(guid) {
+    return !!guid && this._base64url_regex.test(guid);
+  },
+
   anno: function anno(id, anno, val, expire) {
     // Figure out if we have a bookmark or page
     let annoFunc = (typeof id == "number" ? "Item" : "Page") + "Annotation";
 
     // Convert to a nsIURI if necessary
     if (typeof id == "string")
       id = Utils.makeURI(id);
 
@@ -1446,16 +1451,23 @@ let Utils = {
 
   /**
    * Create an array like the first but without elements of the second
    */
   arraySub: function arraySub(minuend, subtrahend) {
     return minuend.filter(function(i) subtrahend.indexOf(i) == -1);
   },
 
+  /**
+   * Build the union of two arrays.
+   */
+  arrayUnion: function arrayUnion(foo, bar) {
+    return foo.concat(Utils.arraySub(bar, foo));
+  },
+
   bind2: function Async_bind2(object, method) {
     return function innerBind() { return method.apply(object, arguments); };
   },
 
   mpLocked: function mpLocked() {
     let modules = Cc["@mozilla.org/security/pkcs11moduledb;1"].
                   getService(Ci.nsIPKCS11ModuleDB);
     let sdrSlot = modules.findSlotByName("");
--- a/services/sync/tests/unit/head_helpers.js
+++ b/services/sync/tests/unit/head_helpers.js
@@ -1,8 +1,9 @@
+Cu.import("resource://services-sync/util.js");
 var btoa;
 
 // initialize nss
 let ch = Cc["@mozilla.org/security/hash;1"].
          createInstance(Ci.nsICryptoHash);
 
 let ds = Cc["@mozilla.org/file/directory_service;1"]
   .getService(Ci.nsIProperties);
@@ -43,18 +44,16 @@ function loadInSandbox(aUri) {
   request.overrideMimeType("application/javascript");
   request.send(null);
   Components.utils.evalInSandbox(request.responseText, sandbox, "1.8");
 
   return sandbox;
 }
 
 function FakeTimerService() {
-  Cu.import("resource://services-sync/util.js");
-
   this.callbackQueue = [];
 
   var self = this;
 
   this.__proto__ = {
     makeTimerForCall: function FTS_makeTimerForCall(cb) {
       // Just add the callback to our queue and we'll call it later, so
       // as to simulate a real nsITimer.
@@ -355,16 +354,20 @@ function ensureThrows(func) {
     try {
       func.apply(this, arguments);
     } catch (ex) {
       do_throw(ex);
     }
   };
 }
 
+function asyncChainTests() {
+  return Utils.asyncChain.apply(this, Array.map(arguments, ensureThrows));
+}
+
 
 /**
  * Print some debug message to the console. All arguments will be printed,
  * separated by spaces.
  *
  * @param [arg0, arg1, arg2, ...]
  *        Any number of arguments to print out
  * @usage _("Hello World") -> prints "Hello World"
--- a/services/sync/tests/unit/test_bookmark_engine.js
+++ b/services/sync/tests/unit/test_bookmark_engine.js
@@ -69,17 +69,16 @@ function test_processIncoming_error_orde
   let global = new ServerWBO('global',
                              {engines: {bookmarks: {version: engine.version,
                                                     syncID: engine.syncID}}});
   let server = httpd_setup({
     "/1.0/foo/storage/meta/global": global.handler(),
     "/1.0/foo/storage/bookmarks": collection.handler()
   });
 
-
   try {
 
     let folder1_id = Svc.Bookmark.createFolder(
       Svc.Bookmark.toolbarFolder, "Folder 1", 0);
     let folder1_guid = store.GUIDForId(folder1_id);
 
     let fxuri = Utils.makeURI("http://getfirefox.com/");
     let tburi = Utils.makeURI("http://getthunderbird.com/");
@@ -93,27 +92,29 @@ function test_processIncoming_error_orde
 
     // Create a server record for folder1 where we flip the order of
     // the children.
     let folder1_payload = store.createRecord(folder1_guid).cleartext;
     folder1_payload.children.reverse();
     collection.wbos[folder1_guid] = new ServerWBO(
       folder1_guid, encryptPayload(folder1_payload));
 
-    // Also create a bogus server record (no parent) to provoke an exception.
+    // Create a bogus record that when synced down will provoke a
+    // network error which in turn provokes an exception in _processIncoming.
     const BOGUS_GUID = "zzzzzzzzzzzz";
-    collection.wbos[BOGUS_GUID] = new ServerWBO(
-      BOGUS_GUID, encryptPayload({
-        id: BOGUS_GUID,
-        type: "folder",
-        title: "Bogus Folder",
-        parentid: null,
-        parentName: null,
-        children: []
-    }));
+    let bogus_record = collection.wbos[BOGUS_GUID]
+      = new ServerWBO(BOGUS_GUID, "I'm a bogus record!");
+    bogus_record.get = function get() {
+      throw "Sync this!";
+    };
+
+    // Make the 10 minutes old so it will only be synced in the toFetch phase.
+    bogus_record.modified = Date.now() / 1000 - 60 * 10;
+    engine.lastSync = Date.now() / 1000 - 60;
+    engine.toFetch = [BOGUS_GUID];
 
     let error;
     try {
       engine.sync();
     } catch(ex) {
       error = ex;
     }
     do_check_true(!!error);
new file mode 100644
--- /dev/null
+++ b/services/sync/tests/unit/test_bookmark_places_query_rewriting.js
@@ -0,0 +1,45 @@
+_("Rewrite place: URIs.");
+Cu.import("resource://services-sync/engines/bookmarks.js");
+Cu.import("resource://services-sync/util.js");
+
+let engine = new BookmarksEngine();
+let store = engine._store;
+
+function run_test() {
+  initTestLogging("Trace");
+  Log4Moz.repository.getLogger("Engine.Bookmarks").level = Log4Moz.Level.Trace;
+  Log4Moz.repository.getLogger("Store.Bookmarks").level = Log4Moz.Level.Trace;
+
+  let tagRecord = new BookmarkQuery("bookmarks", "abcdefabcdef");
+  let uri = "place:folder=499&type=7&queryType=1";
+  tagRecord.queryId = "MagicTags";
+  tagRecord.parentName = "Bookmarks Toolbar";
+  tagRecord.bmkUri = uri;
+  tagRecord.title = "tagtag";
+  tagRecord.folderName = "bar";
+
+  _("Type: " + tagRecord.type);
+  _("Folder name: " + tagRecord.folderName);
+  store.preprocessTagQuery(tagRecord);
+  
+  _("Verify that the URI has been rewritten.");
+  do_check_neq(tagRecord.bmkUri, uri);
+  
+  let tags = store._getNode(store._bms.tagsFolder);
+  tags.containerOpen = true;
+  let tagID;
+  for (let i = 0; i < tags.childCount; ++i) {
+    let child = tags.getChild(i);
+    if (child.title == "bar")
+      tagID = child.itemId;
+  }
+      
+  _("Tag ID: " + tagID);
+  do_check_eq(tagRecord.bmkUri, uri.replace("499", tagID));
+  
+  _("... but not if the type is wrong.");
+  let wrongTypeURI = "place:folder=499&type=2&queryType=1";
+  tagRecord.bmkUri = wrongTypeURI;
+  store.preprocessTagQuery(tagRecord);
+  do_check_eq(tagRecord.bmkUri, wrongTypeURI);
+}
--- a/services/sync/tests/unit/test_bookmark_smart_bookmarks.js
+++ b/services/sync/tests/unit/test_bookmark_smart_bookmarks.js
@@ -173,16 +173,79 @@ function test_annotation_uploaded() {
     // Clean up.
     store.wipe();
     server.stop(do_test_finished);
     Svc.Prefs.resetBranch("");
     Records.clearCache();
   }
 }
 
+function test_smart_bookmarks_duped() {
+  let parent = PlacesUtils.toolbarFolderId;
+  let uri =
+    Utils.makeURI("place:redirectsMode=" +
+                  Ci.nsINavHistoryQueryOptions.REDIRECTS_MODE_TARGET +
+                  "&sort=" +
+                  Ci.nsINavHistoryQueryOptions.SORT_BY_VISITCOUNT_DESCENDING +
+                  "&maxResults=10");
+  let title = "Most Visited";
+  let mostVisitedID = newSmartBookmark(parent, uri, -1, title, "MostVisited");
+  let mostVisitedGUID = store.GUIDForId(mostVisitedID);
+  
+  let record = store.createRecord(mostVisitedGUID);
+  
+  _("Prepare sync.");
+  Svc.Prefs.set("username", "foo");
+  Service.serverURL = "http://localhost:8080/";
+  Service.clusterURL = "http://localhost:8080/";
+
+  let collection = new ServerCollection({}, true);
+  let global = new ServerWBO('global',
+                             {engines: {bookmarks: {version: engine.version,
+                                                    syncID: engine.syncID}}});
+  let server = httpd_setup({
+    "/1.0/foo/storage/meta/global": global.handler(),
+    "/1.0/foo/storage/bookmarks": collection.handler()
+  });
+  
+  try {
+    engine._syncStartup();
+    
+    _("Verify that lazyMap uses the anno, discovering a dupe regardless of URI.");
+    do_check_eq(mostVisitedGUID, engine._lazyMap(record));
+    
+    record.bmkUri = "http://foo/";
+    do_check_eq(mostVisitedGUID, engine._lazyMap(record));
+    do_check_neq(Svc.Bookmark.getBookmarkURI(mostVisitedID).spec, record.bmkUri);
+    
+    _("Verify that different annos don't dupe.");
+    let other = new BookmarkQuery("bookmarks", "abcdefabcdef");
+    other.queryId = "LeastVisited";
+    other.parentName = "Bookmarks Toolbar";
+    other.bmkUri = "place:foo";
+    other.title = "";
+    do_check_eq(undefined, engine._findDupe(other));
+    
+    _("Handle records without a queryId entry.");
+    record.bmkUri = uri;
+    delete record.queryId;
+    do_check_eq(mostVisitedGUID, engine._lazyMap(record));
+    
+    engine._syncFinish();
+
+  } finally {
+    // Clean up.
+    store.wipe();
+    server.stop(do_test_finished);
+    Svc.Prefs.resetBranch("");
+    Records.clearCache();
+  }
+}
+
 function run_test() {
   initTestLogging("Trace");
   Log4Moz.repository.getLogger("Engine.Bookmarks").level = Log4Moz.Level.Trace;
 
   CollectionKeys.generateNewKeys();
 
   test_annotation_uploaded();
+  test_smart_bookmarks_duped();
 }
--- a/services/sync/tests/unit/test_corrupt_keys.js
+++ b/services/sync/tests/unit/test_corrupt_keys.js
@@ -97,17 +97,17 @@ function test_locally_changed_keys() {
     m.payload = {"syncID": "foooooooooooooooooooooooooo",
                  "storageVersion": STORAGE_VERSION};
     m.upload(Weave.Service.metaURL);
     
     _("New meta/global: " + JSON.stringify(meta_global));
     
     // Upload keys.
     CollectionKeys.generateNewKeys();
-    serverKeys = CollectionKeys.asWBO("crypto", "keys");
+    let serverKeys = CollectionKeys.asWBO("crypto", "keys");
     serverKeys.encrypt(Weave.Service.syncKeyBundle);
     do_check_true(serverKeys.upload(Weave.Service.cryptoKeysURL).success);
     
     // Check that login works.
     do_check_true(Weave.Service.login("johndoe", "ilovejane", passphrase));
     do_check_true(Weave.Service.isLoggedIn);
     
     // Sync should upload records.
@@ -118,121 +118,119 @@ function test_locally_changed_keys() {
     do_check_true(!!collections.tabs);
     do_check_true(collections.tabs > 0);
     
     let coll_modified = CollectionKeys._lastModified;
     
     // Let's create some server side history records.
     let liveKeys = CollectionKeys.keyForCollection("history");
     _("Keys now: " + liveKeys.keyPair);
-    let nextHistory = {}
     let visitType = Ci.nsINavHistoryService.TRANSITION_LINK;
     for (var i = 0; i < 5; i++) {
-      let id = 'record-no-' + i;
+      let id = 'record-no--' + i;
       let modified = Date.now()/1000 - 60*(i+10);
       
       let w = new CryptoWrapper("history", "id");
       w.cleartext = {
         id: id,
         histUri: "http://foo/bar?" + id,
         title: id,
         sortindex: i,
-        visits: [{date: (modified - 5), type: visitType}],
+        visits: [{date: (modified - 5) * 1000000, type: visitType}],
         deleted: false};
       w.encrypt();
       
       let wbo = new ServerWBO(id, {ciphertext: w.ciphertext,
                                    IV: w.IV,
                                    hmac: w.hmac});
       wbo.modified = modified;
       history.wbos[id] = wbo;
-      server.registerPathHandler("/1.0/johndoe/storage/history/record-no-" + i, upd("history", wbo.handler()));
+      server.registerPathHandler(
+        "/1.0/johndoe/storage/history/record-no--" + i,
+        upd("history", wbo.handler()));
     }
     
     collections.history = Date.now()/1000;
     let old_key_time = collections.crypto;
     _("Old key time: " + old_key_time);
     
     // Check that we can decrypt one.
-    let rec = new CryptoWrapper("history", "record-no-0");
-    rec.fetch(Weave.Service.storageURL + "history/record-no-0");
+    let rec = new CryptoWrapper("history", "record-no--0");
+    rec.fetch(Weave.Service.storageURL + "history/record-no--0");
     _(JSON.stringify(rec));
     do_check_true(!!rec.decrypt());
     
     do_check_eq(hmacErrorCount, 0);
     
     // Fill local key cache with bad data.
     corrupt_local_keys();
     _("Keys now: " + CollectionKeys.keyForCollection("history").keyPair);
     
     do_check_eq(hmacErrorCount, 0);
     
-    // Add some data.
-    for (let k in nextHistory) {
-      nextHistory[k].modified += 1000;
-      history.wbos[k] = nextHistory[k];
-    }
-    
     _("HMAC error count: " + hmacErrorCount);
     // Now syncing should succeed, after one HMAC error.
     Weave.Service.sync();
     do_check_eq(hmacErrorCount, 1);
     _("Keys now: " + CollectionKeys.keyForCollection("history").keyPair);
     
     // And look! We downloaded history!
-    do_check_true(Engines.get("history")._store.urlExists("http://foo/bar?record-no-0"));
-    do_check_true(Engines.get("history")._store.urlExists("http://foo/bar?record-no-1"));
-    do_check_true(Engines.get("history")._store.urlExists("http://foo/bar?record-no-2"));
-    do_check_true(Engines.get("history")._store.urlExists("http://foo/bar?record-no-3"));
-    do_check_true(Engines.get("history")._store.urlExists("http://foo/bar?record-no-4"));
+    let store = Engines.get("history")._store;
+    do_check_true(store.urlExists("http://foo/bar?record-no--0"));
+    do_check_true(store.urlExists("http://foo/bar?record-no--1"));
+    do_check_true(store.urlExists("http://foo/bar?record-no--2"));
+    do_check_true(store.urlExists("http://foo/bar?record-no--3"));
+    do_check_true(store.urlExists("http://foo/bar?record-no--4"));
     do_check_eq(hmacErrorCount, 1);
     
     _("Busting some new server values.");
     // Now what happens if we corrupt the HMAC on the server?
     for (var i = 5; i < 10; i++) {
-      let id = 'record-no-' + i;
+      let id = 'record-no--' + i;
       let modified = 1 + (Date.now()/1000);
       
       let w = new CryptoWrapper("history", "id");
       w.cleartext = {
         id: id,
         histUri: "http://foo/bar?" + id,
         title: id,
         sortindex: i,
-        visits: [{date: (modified - 5), type: visitType}],
+        visits: [{date: (modified - 5 ) * 1000000, type: visitType}],
         deleted: false};
       w.encrypt();
       w.hmac = w.hmac.toUpperCase();
       
       let wbo = new ServerWBO(id, {ciphertext: w.ciphertext,
                                    IV: w.IV,
                                    hmac: w.hmac});
       wbo.modified = modified;
       history.wbos[id] = wbo;
-      server.registerPathHandler("/1.0/johndoe/storage/history/record-no-" + i, upd("history", wbo.handler()));
+      server.registerPathHandler(
+        "/1.0/johndoe/storage/history/record-no--" + i,
+        upd("history", wbo.handler()));
     }
     collections.history = Date.now()/1000;
     
     _("Server key time hasn't changed.");
     do_check_eq(collections.crypto, old_key_time);
     
     _("Resetting HMAC error timer.");
     Weave.Service.lastHMACEvent = 0;
     
     _("Syncing...");
     Weave.Service.sync();
     _("Keys now: " + CollectionKeys.keyForCollection("history").keyPair);
     _("Server keys have been updated, and we skipped over 5 more HMAC errors without adjusting history.");
     do_check_true(collections.crypto > old_key_time);
     do_check_eq(hmacErrorCount, 6);
-    do_check_false(Engines.get("history")._store.urlExists("http://foo/bar?record-no-5"));
-    do_check_false(Engines.get("history")._store.urlExists("http://foo/bar?record-no-6"));
-    do_check_false(Engines.get("history")._store.urlExists("http://foo/bar?record-no-7"));
-    do_check_false(Engines.get("history")._store.urlExists("http://foo/bar?record-no-8"));
-    do_check_false(Engines.get("history")._store.urlExists("http://foo/bar?record-no-9"));
+    do_check_false(store.urlExists("http://foo/bar?record-no--5"));
+    do_check_false(store.urlExists("http://foo/bar?record-no--6"));
+    do_check_false(store.urlExists("http://foo/bar?record-no--7"));
+    do_check_false(store.urlExists("http://foo/bar?record-no--8"));
+    do_check_false(store.urlExists("http://foo/bar?record-no--9"));
     
     // Clean up.
     Weave.Service.startOver();
     
   } finally {
     Weave.Svc.Prefs.resetBranch("");
     server.stop(do_test_finished);
   }
--- a/services/sync/tests/unit/test_history_engine.js
+++ b/services/sync/tests/unit/test_history_engine.js
@@ -30,24 +30,24 @@ function test_processIncoming_mobile_his
     this.get_log.push(options);
     return this._get(options);
   };
 
   // Let's create some 234 server side history records. They're all at least
   // 10 minutes old.
   let visitType = Ci.nsINavHistoryService.TRANSITION_LINK;
   for (var i = 0; i < 234; i++) {
-    let id = 'record-no-' + i;
+    let id = 'record-no' + ("00" + i).slice(-3);
     let modified = Date.now()/1000 - 60*(i+10);
     let payload = encryptPayload({
       id: id,
       histUri: "http://foo/bar?" + id,
         title: id,
         sortindex: i,
-        visits: [{date: (modified - 5), type: visitType}],
+        visits: [{date: (modified - 5) * 1000000, type: visitType}],
         deleted: false});
     
     let wbo = new ServerWBO(id, payload);
     wbo.modified = modified;
     collection.wbos[id] = wbo;
   }
   
   let server = sync_httpd_setup({
@@ -123,16 +123,17 @@ function test_processIncoming_mobile_his
       do_check_eq(collection.get_log[j].limit, undefined);
       if (i < Math.floor((234 - 50) / MOBILE_BATCH_SIZE))
         do_check_eq(collection.get_log[j].ids.length, MOBILE_BATCH_SIZE);
       else
         do_check_eq(collection.get_log[j].ids.length, 234 % MOBILE_BATCH_SIZE);
     }
 
   } finally {
+    Svc.History.removeAllPages();
     server.stop(do_test_finished);
     Svc.Prefs.resetBranch("");
     Records.clearCache();
   }
 }
 
 function run_test() {
   CollectionKeys.generateNewKeys();
--- a/services/sync/tests/unit/test_history_store.js
+++ b/services/sync/tests/unit/test_history_store.js
@@ -58,18 +58,24 @@ function ensureThrows(func) {
     } catch (ex) {
       Svc.History.removeAllPages();
       do_throw(ex);
     }
   };
 }
 
 function run_test() {
+  initTestLogging("Trace");
+
+  let store = new HistoryEngine()._store;
+  function applyEnsureNoFailures(records) {
+    do_check_eq(store.applyIncomingBatch(records).length, 0);
+  }
+
   _("Verify that we've got an empty store to work with.");
-  let store = new HistoryEngine()._store;
   do_check_eq([id for (id in store.getAllIDs())].length, 0);
 
   let fxuri, fxguid, tburi, tbguid;
   do_test_pending();
   Utils.asyncChain(function (next) {
 
     _("Let's create an entry in the database.");
     fxuri = Utils.makeURI("http://getfirefox.com/");
@@ -100,49 +106,56 @@ function run_test() {
       let queryres = queryHistoryVisits(fxuri);
       do_check_eq(queryres.length, 2);
       do_check_eq(queryres[0].time, TIMESTAMP1);
       do_check_eq(queryres[0].title, "Hol Dir Firefox!");
       do_check_eq(queryres[1].time, TIMESTAMP2);
       do_check_eq(queryres[1].title, "Hol Dir Firefox!");
       next();
     }));
-    store.update({histUri: record.histUri,
-                  title: "Hol Dir Firefox!",
-                  visits: [record.visits[0], secondvisit]});
+    applyEnsureNoFailures([
+      {id: fxguid,
+       histUri: record.histUri,
+       title: "Hol Dir Firefox!",
+       visits: [record.visits[0], secondvisit]}
+    ]);
 
   }, function (next) {
 
     _("Create a brand new record through the store.");
     tbguid = Utils.makeGUID();
     tburi = Utils.makeURI("http://getthunderbird.com");
     onNextTitleChanged(ensureThrows(function() {
       do_check_eq([id for (id in store.getAllIDs())].length, 2);
       let queryres = queryHistoryVisits(tburi);
       do_check_eq(queryres.length, 1);
       do_check_eq(queryres[0].time, TIMESTAMP3);
       do_check_eq(queryres[0].title, "The bird is the word!");
       next();
     }));
-    store.create({id: tbguid,
-                  histUri: tburi.spec,
-                  title: "The bird is the word!",
-                  visits: [{date: TIMESTAMP3,
-                            type: Ci.nsINavHistoryService.TRANSITION_TYPED}]});
+    applyEnsureNoFailures([
+      {id: tbguid,
+       histUri: tburi.spec,
+       title: "The bird is the word!",
+       visits: [{date: TIMESTAMP3,
+                 type: Ci.nsINavHistoryService.TRANSITION_TYPED}]}
+    ]);
 
   }, function (next) {
 
     _("Make sure we handle a null title gracefully (it can happen in some cases, e.g. for resource:// URLs)");
     let resguid = Utils.makeGUID();
     let resuri = Utils.makeURI("unknown://title");
-    store.create({id: resguid,
-                  histUri: resuri.spec,
-                  title: null,
-                  visits: [{date: TIMESTAMP3,
-                            type: Ci.nsINavHistoryService.TRANSITION_TYPED}]});
+    applyEnsureNoFailures([
+      {id: resguid,
+       histUri: resuri.spec,
+       title: null,
+       visits: [{date: TIMESTAMP3,
+                 type: Ci.nsINavHistoryService.TRANSITION_TYPED}]}
+    ]);
     do_check_eq([id for (id in store.getAllIDs())].length, 3);
     let queryres = queryHistoryVisits(resuri);
     do_check_eq(queryres.length, 1);
     do_check_eq(queryres[0].time, TIMESTAMP3);
     next();
 
   }, function (next) {
 
@@ -150,31 +163,91 @@ function run_test() {
     let table = store._haveTempTables ? "moz_places_temp" : "moz_places";
     let query = "INSERT INTO " + table + " "
       + "(url, title, rev_host, visit_count, last_visit_date) "
       + "VALUES ('invalid-uri', 'Invalid URI', '.', 1, " + TIMESTAMP3 + ")";
     let stmt = Utils.createStatement(Svc.History.DBConnection, query);
     let result = Utils.queryAsync(stmt);    
     do_check_eq([id for (id in store.getAllIDs())].length, 4);
 
+    _("Make sure we report records with invalid URIs.");
+    let invalid_uri_guid = Utils.makeGUID();
+    let failed = store.applyIncomingBatch([{
+      id: invalid_uri_guid,
+      histUri: ":::::::::::::::",
+      title: "Doesn't have a valid URI",
+      visits: [{date: TIMESTAMP3,
+                type: Ci.nsINavHistoryService.TRANSITION_EMBED}]}
+    ]);
+    do_check_eq(failed.length, 1);
+    do_check_eq(failed[0], invalid_uri_guid);
+
+    _("Make sure we handle records with invalid GUIDs gracefully (ignore).");
+    applyEnsureNoFailures([
+      {id: "invalid",
+       histUri: "http://invalid.guid/",
+       title: "Doesn't have a valid GUID",
+       visits: [{date: TIMESTAMP3,
+                 type: Ci.nsINavHistoryService.TRANSITION_EMBED}]}
+    ]);
+
+    _("Make sure we report records with invalid visits, gracefully handle non-integer dates.");
+    let no_date_visit_guid = Utils.makeGUID();
+    let no_type_visit_guid = Utils.makeGUID();
+    let invalid_type_visit_guid = Utils.makeGUID();
+    let non_integer_visit_guid = Utils.makeGUID();
+    failed = store.applyIncomingBatch([
+      {id: no_date_visit_guid,
+       histUri: "http://no.date.visit/",
+       title: "Visit has no date",
+       visits: [{date: TIMESTAMP3}]},
+      {id: no_type_visit_guid,
+       histUri: "http://no.type.visit/",
+       title: "Visit has no type",
+       visits: [{type: Ci.nsINavHistoryService.TRANSITION_EMBED}]},
+      {id: invalid_type_visit_guid,
+       histUri: "http://invalid.type.visit/",
+       title: "Visit has invalid type",
+       visits: [{date: TIMESTAMP3,
+                 type: Ci.nsINavHistoryService.TRANSITION_LINK - 1}]},
+      {id: non_integer_visit_guid,
+       histUri: "http://non.integer.visit/",
+       title: "Visit has non-integer date",
+       visits: [{date: 1234.567,
+                 type: Ci.nsINavHistoryService.TRANSITION_EMBED}]}
+    ]);
+    do_check_eq(failed.length, 3);
+    failed.sort();
+    let expected = [no_date_visit_guid,
+                    no_type_visit_guid,
+                    invalid_type_visit_guid].sort();
+    for (let i = 0; i < expected.length; i++) {
+      do_check_eq(failed[i], expected[i]);
+    }
+
     _("Make sure we handle records with javascript: URLs gracefully.");
-    store.create({id: Utils.makeGUID(),
-                  histUri: "javascript:''",
-                  title: "javascript:''",
-                  visits: [{date: TIMESTAMP3,
-                            type: Ci.nsINavHistoryService.TRANSITION_EMBED}]});
+    applyEnsureNoFailures([
+      {id: Utils.makeGUID(),
+       histUri: "javascript:''",
+       title: "javascript:''",
+       visits: [{date: TIMESTAMP3,
+                 type: Ci.nsINavHistoryService.TRANSITION_EMBED}]}
+    ]);
 
     _("Make sure we handle records without any visits gracefully.");
-    store.create({id: Utils.makeGUID(),
-                  histUri: "http://getfirebug.com",
-                  title: "Get Firebug!",
-                  visits: []});
+    applyEnsureNoFailures([
+      {id: Utils.makeGUID(),
+       histUri: "http://getfirebug.com",
+       title: "Get Firebug!",
+       visits: []}
+    ]);
 
-    _("Remove a record from the store.");
-    store.remove({id: fxguid});
+    _("Remove an existent record and a non-existent from the store.");
+    applyEnsureNoFailures([{id: fxguid, deleted: true},
+                           {id: Utils.makeGUID(), deleted: true}]);
     do_check_false(store.itemExists(fxguid));
     let queryres = queryHistoryVisits(fxuri);
     do_check_eq(queryres.length, 0);
 
     _("Make sure wipe works.");
     store.wipe();
     do_check_eq([id for (id in store.getAllIDs())].length, 0);
     queryres = queryHistoryVisits(fxuri);
--- a/services/sync/tests/unit/test_service_sync_checkServerError.js
+++ b/services/sync/tests/unit/test_service_sync_checkServerError.js
@@ -40,20 +40,19 @@ function sync_httpd_setup() {
 function setUp() {
   Service.username = "johndoe";
   Service.password = "ilovejane";
   Service.passphrase = "aabcdeabcdeabcdeabcdeabcde";
   Service.clusterURL = "http://localhost:8080/";
   new FakeCryptoService();
 }
 
-function test_backoff500() {
+function test_backoff500(next) {
   _("Test: HTTP 500 sets backoff status.");
   let server = sync_httpd_setup();
-  do_test_pending();
   setUp();
 
   Engines.register(CatapultEngine);
   let engine = Engines.get("catapult");
   engine.enabled = true;
   engine.exception = {status: 500};
 
   try {
@@ -62,27 +61,26 @@ function test_backoff500() {
     // Forcibly create and upload keys here -- otherwise we don't get to the 500!
     CollectionKeys.generateNewKeys();
     do_check_true(CollectionKeys.asWBO().upload("http://localhost:8080/1.0/johndoe/storage/crypto/keys").success);
     
     Service.login();
     Service.sync();
     do_check_true(Status.enforceBackoff);
   } finally {
-    server.stop(do_test_finished);
     Engines.unregister("catapult");
     Status.resetBackoff();
     Service.startOver();
+    server.stop(next);
   }
 }
 
-function test_backoff503() {
+function test_backoff503(next) {
   _("Test: HTTP 503 with Retry-After header leads to backoff notification and sets backoff status.");
   let server = sync_httpd_setup();
-  do_test_pending();
   setUp();
 
   const BACKOFF = 42;
   Engines.register(CatapultEngine);
   let engine = Engines.get("catapult");
   engine.enabled = true;
   engine.exception = {status: 503,
                       headers: {"retry-after": BACKOFF}};
@@ -96,50 +94,80 @@ function test_backoff503() {
     do_check_false(Status.enforceBackoff);
 
     Service.login();
     Service.sync();
 
     do_check_true(Status.enforceBackoff);
     do_check_eq(backoffInterval, BACKOFF);
   } finally {
-    server.stop(do_test_finished);
     Engines.unregister("catapult");
     Status.resetBackoff();
     Service.startOver();
+    server.stop(next);
   }
 }
 
-function test_overQuota() {
+function test_overQuota(next) {
   _("Test: HTTP 400 with body error code 14 means over quota.");
   let server = sync_httpd_setup();
-  do_test_pending();
   setUp();
 
   Engines.register(CatapultEngine);
   let engine = Engines.get("catapult");
   engine.enabled = true;
   engine.exception = {status: 400,
                       toString: function() "14"};
 
   try {
     do_check_eq(Status.sync, SYNC_SUCCEEDED);
 
     Service.login();
     Service.sync();
 
     do_check_eq(Status.sync, OVER_QUOTA);
   } finally {
-    server.stop(do_test_finished);
     Engines.unregister("catapult");
     Status.resetSync();
     Service.startOver();
+    server.stop(next);
+  }
+}
+
+// Slightly misplaced test as it doesn't actually test checkServerError,
+// but the observer for "weave:engine:sync:apply-failed".
+function test_engine_applyFailed(next) {
+  let server = sync_httpd_setup();
+  setUp();
+
+  Engines.register(CatapultEngine);
+  let engine = Engines.get("catapult");
+  engine.enabled = true;
+  engine.sync = function sync() {
+    Svc.Obs.notify("weave:engine:sync:apply-failed", {}, "steam");
+  };
+
+  try {
+    do_check_eq(Status.engines["steam"], undefined);
+
+    Service.login();
+    Service.sync();
+
+    do_check_eq(Status.engines["steam"], ENGINE_APPLY_FAIL);
+  } finally {
+    Engines.unregister("catapult");
+    Status.resetSync();
+    Service.startOver();
+    server.stop(next);
   }
 }
 
 function run_test() {
   if (DISABLE_TESTS_BUG_604565)
     return;
 
-  test_backoff500();
-  test_backoff503();
-  test_overQuota();
+  do_test_pending();
+  asyncChainTests(test_backoff500,
+                  test_backoff503,
+                  test_overQuota,
+                  test_engine_applyFailed,
+                  do_test_finished)();
 }
--- a/services/sync/tests/unit/test_syncengine_sync.js
+++ b/services/sync/tests/unit/test_syncengine_sync.js
@@ -654,16 +654,308 @@ function test_processIncoming_resume_toF
     server.stop(do_test_finished);
     Svc.Prefs.resetBranch("");
     Records.clearCache();
     syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
   }
 }
 
 
+function test_processIncoming_applyIncomingBatchSize_smaller() {
+  _("Ensure that a number of incoming items less than applyIncomingBatchSize is still applied.");
+  Svc.Prefs.set("clusterURL", "http://localhost:8080/");
+  Svc.Prefs.set("username", "foo");
+
+  // Engine that doesn't like the first and last record it's given.
+  const APPLY_BATCH_SIZE = 10;
+  let engine = makeSteamEngine();
+  engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
+  engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
+  engine._store.applyIncomingBatch = function (records) {
+    let failed1 = records.shift();
+    let failed2 = records.pop();
+    this._applyIncomingBatch(records);
+    return [failed1.id, failed2.id];
+  };
+
+  // Let's create less than a batch worth of server side records.
+  let collection = new ServerCollection();
+  for (let i = 0; i < APPLY_BATCH_SIZE - 1; i++) {
+    let id = 'record-no-' + i;
+    let payload = encryptPayload({id: id, denomination: "Record No. " + id});
+    collection.wbos[id] = new ServerWBO(id, payload);
+  }
+
+  let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
+  meta_global.payload.engines = {steam: {version: engine.version,
+                                         syncID: engine.syncID}};
+  let server = sync_httpd_setup({
+      "/1.0/foo/storage/steam": collection.handler()
+  });
+  do_test_pending();
+
+  try {
+
+    // Confirm initial environment
+    do_check_eq([id for (id in engine._store.items)].length, 0);
+
+    engine._syncStartup();
+    engine._processIncoming();
+
+    // Records have been applied.
+    do_check_eq([id for (id in engine._store.items)].length,
+                APPLY_BATCH_SIZE - 1 - 2);
+    do_check_eq(engine.toFetch.length, 2);
+    do_check_eq(engine.toFetch[0], "record-no-0");
+    do_check_eq(engine.toFetch[1], "record-no-8");
+
+  } finally {
+    server.stop(do_test_finished);
+    Svc.Prefs.resetBranch("");
+    Records.clearCache();
+    syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
+  }
+}
+
+
+function test_processIncoming_applyIncomingBatchSize_multiple() {
+  _("Ensure that incoming items are applied according to applyIncomingBatchSize.");
+  Svc.Prefs.set("clusterURL", "http://localhost:8080/");
+  Svc.Prefs.set("username", "foo");
+
+  const APPLY_BATCH_SIZE = 10;
+
+  // Engine that applies records in batches.
+  let engine = makeSteamEngine();
+  engine.applyIncomingBatchSize = APPLY_BATCH_SIZE;
+  let batchCalls = 0;
+  engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
+  engine._store.applyIncomingBatch = function (records) {
+    batchCalls += 1;
+    do_check_eq(records.length, APPLY_BATCH_SIZE);
+    this._applyIncomingBatch.apply(this, arguments);
+  };
+
+  // Let's create three batches worth of server side records.
+  let collection = new ServerCollection();
+  for (let i = 0; i < APPLY_BATCH_SIZE * 3; i++) {
+    let id = 'record-no-' + i;
+    let payload = encryptPayload({id: id, denomination: "Record No. " + id});
+    collection.wbos[id] = new ServerWBO(id, payload);
+  }
+
+  let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
+  meta_global.payload.engines = {steam: {version: engine.version,
+                                         syncID: engine.syncID}};
+  let server = sync_httpd_setup({
+      "/1.0/foo/storage/steam": collection.handler()
+  });
+  do_test_pending();
+
+  try {
+
+    // Confirm initial environment
+    do_check_eq([id for (id in engine._store.items)].length, 0);
+
+    engine._syncStartup();
+    engine._processIncoming();
+
+    // Records have been applied in 3 batches.
+    do_check_eq(batchCalls, 3);
+    do_check_eq([id for (id in engine._store.items)].length,
+                APPLY_BATCH_SIZE * 3);
+
+  } finally {
+    server.stop(do_test_finished);
+    Svc.Prefs.resetBranch("");
+    Records.clearCache();
+    syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
+  }
+}
+
+
+function test_processIncoming_failed_records() {
+  _("Ensure that failed records from _reconcile and applyIncomingBatch are refetched.");
+  Svc.Prefs.set("clusterURL", "http://localhost:8080/");
+  Svc.Prefs.set("username", "foo");
+
+  // Pretend to be a mobile client so we can test failed record handling
+  // while batching GETs.
+  Svc.Prefs.set("client.type", "mobile");
+
+  // Let's create three and a bit batches worth of server side records.
+  let collection = new ServerCollection();
+  const NUMBER_OF_RECORDS = MOBILE_BATCH_SIZE * 3 + 5;
+  for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
+    let id = 'record-no-' + i;
+    let payload = encryptPayload({id: id, denomination: "Record No. " + id});
+    let wbo = new ServerWBO(id, payload);
+    wbo.modified = Date.now()/1000 + 60 * (i - MOBILE_BATCH_SIZE * 3);
+    collection.wbos[id] = wbo;
+  }
+
+  // Engine that batches but likes to throw on a couple of records,
+  // two in each batch: the even ones fail in reconcile, the odd ones
+  // in applyIncoming.
+  const BOGUS_RECORDS = ["record-no-" + 42,
+                         "record-no-" + 23,
+                         "record-no-" + (42 + MOBILE_BATCH_SIZE),
+                         "record-no-" + (23 + MOBILE_BATCH_SIZE),
+                         "record-no-" + (42 + MOBILE_BATCH_SIZE * 2),
+                         "record-no-" + (23 + MOBILE_BATCH_SIZE * 2),
+                         "record-no-" + (2 + MOBILE_BATCH_SIZE * 3),
+                         "record-no-" + (1 + MOBILE_BATCH_SIZE * 3)];
+  let engine = makeSteamEngine();
+  engine.applyIncomingBatchSize = MOBILE_BATCH_SIZE;
+
+  engine.__reconcile = engine._reconcile;
+  engine._reconcile = function _reconcile(record) {
+    if (BOGUS_RECORDS.indexOf(record.id) % 2 == 0) {
+      throw "I don't like this record! Baaaaaah!";
+    }
+    return this.__reconcile.apply(this, arguments);
+  };
+  engine._store._applyIncoming = engine._store.applyIncoming;
+  engine._store.applyIncoming = function (record) {
+    if (BOGUS_RECORDS.indexOf(record.id) % 2 == 1) {
+      throw "I don't like this record! Baaaaaah!";
+    }
+    return this._applyIncoming.apply(this, arguments);
+  };
+
+  let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
+  meta_global.payload.engines = {steam: {version: engine.version,
+                                         syncID: engine.syncID}};
+  let server = sync_httpd_setup({
+      "/1.0/foo/storage/steam": collection.handler()
+  });
+  do_test_pending();
+
+  try {
+
+    // Confirm initial environment
+    do_check_eq(engine.lastSync, 0);
+    do_check_eq(engine.toFetch.length, 0);
+    do_check_eq([id for (id in engine._store.items)].length, 0);
+
+    let observerSubject;
+    let observerData;
+    Svc.Obs.add("weave:engine:sync:apply-failed",
+                function onApplyFailed(subject, data) {
+      Svc.Obs.remove("weave:engine:sync:apply-failed", onApplyFailed);
+      observerSubject = subject;
+      observerData = data;
+    });
+
+    engine._syncStartup();
+    engine._processIncoming();
+
+    // Ensure that all records but the bogus 4 have been applied.
+    do_check_eq([id for (id in engine._store.items)].length,
+                NUMBER_OF_RECORDS - BOGUS_RECORDS.length);
+
+    // Ensure that the bogus records will be fetched again on the next sync.
+    do_check_eq(engine.toFetch.length, BOGUS_RECORDS.length);
+    engine.toFetch.sort();
+    BOGUS_RECORDS.sort();
+    for (let i = 0; i < engine.toFetch.length; i++) {
+      do_check_eq(engine.toFetch[i], BOGUS_RECORDS[i]);
+    }
+
+    // Ensure the observer was notified
+    do_check_eq(observerData, engine.name);
+    do_check_eq(observerSubject.failed, BOGUS_RECORDS.length);
+  } finally {
+    server.stop(do_test_finished);
+    Svc.Prefs.resetBranch("");
+    Records.clearCache();
+    syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
+  }
+}
+
+
+function test_processIncoming_decrypt_failed() {
+  _("Ensure that records failing to decrypt are either replaced or refetched.");
+  Svc.Prefs.set("clusterURL", "http://localhost:8080/");
+  Svc.Prefs.set("username", "foo");
+
+  // Some good and some bogus records. One doesn't contain valid JSON,
+  // the other will throw during decrypt.
+  let collection = new ServerCollection();
+  collection.wbos.flying = new ServerWBO(
+      'flying', encryptPayload({id: 'flying',
+                                denomination: "LNER Class A3 4472"}));
+  collection.wbos.nojson = new ServerWBO("nojson", "This is invalid JSON");
+  collection.wbos.nojson2 = new ServerWBO("nojson2", "This is invalid JSON");
+  collection.wbos.scotsman = new ServerWBO(
+      'scotsman', encryptPayload({id: 'scotsman',
+                                  denomination: "Flying Scotsman"}));
+  collection.wbos.nodecrypt = new ServerWBO("nodecrypt", "Decrypt this!");
+  collection.wbos.nodecrypt2 = new ServerWBO("nodecrypt2", "Decrypt this!");
+
+  // Patch the fake crypto service to throw on the record above.
+  Svc.Crypto._decrypt = Svc.Crypto.decrypt;
+  Svc.Crypto.decrypt = function (ciphertext) {
+    if (ciphertext == "Decrypt this!") {
+      throw "Derp! Cipher finalized failed. Im ur crypto destroyin ur recordz.";
+    }
+    return this._decrypt.apply(this, arguments);
+  };
+
+  // Some broken records also exist locally.
+  let engine = makeSteamEngine();
+  engine.enabled = true;
+  engine._store.items = {nojson: "Valid JSON",
+                         nodecrypt: "Valid ciphertext"};
+
+  let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
+  meta_global.payload.engines = {steam: {version: engine.version,
+                                         syncID: engine.syncID}};
+  let server = sync_httpd_setup({
+      "/1.0/foo/storage/steam": collection.handler()
+  });
+  do_test_pending();
+
+  try {
+
+    // Confirm initial state
+    do_check_eq(engine.toFetch.length, 0);
+
+    let observerSubject;
+    let observerData;
+    Svc.Obs.add("weave:engine:sync:apply-failed",
+                function onApplyFailed(subject, data) {
+      Svc.Obs.remove("weave:engine:sync:apply-failed", onApplyFailed);
+      observerSubject = subject;
+      observerData = data;
+    });
+
+    engine.lastSync = collection.wbos.nojson.modified - 1;
+    engine.sync();
+
+    do_check_eq(engine.toFetch.length, 4);
+    do_check_eq(engine.toFetch[0], "nojson");
+    do_check_eq(engine.toFetch[1], "nojson2");
+    do_check_eq(engine.toFetch[2], "nodecrypt");
+    do_check_eq(engine.toFetch[3], "nodecrypt2");
+
+    // Ensure the observer was notified
+    do_check_eq(observerData, engine.name);
+    do_check_eq(observerSubject.applied, 2);
+    do_check_eq(observerSubject.failed, 4);
+
+  } finally {
+    server.stop(do_test_finished);
+    Svc.Prefs.resetBranch("");
+    Records.clearCache();
+    syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
+  }
+}
+
+
 function test_uploadOutgoing_toEmptyServer() {
   _("SyncEngine._uploadOutgoing uploads new records to server");
 
   Svc.Prefs.set("clusterURL", "http://localhost:8080/");
   Svc.Prefs.set("username", "foo");
   let collection = new ServerCollection();
   collection.wbos.flying = new ServerWBO('flying');
   collection.wbos.scotsman = new ServerWBO('scotsman');
@@ -1123,16 +1415,20 @@ function run_test() {
   test_syncStartup_serverHasNewerVersion();
   test_syncStartup_syncIDMismatchResetsClient();
   test_processIncoming_emptyServer();
   test_processIncoming_createFromServer();
   test_processIncoming_reconcile();
   test_processIncoming_mobile_batchSize();
   test_processIncoming_store_toFetch();
   test_processIncoming_resume_toFetch();
+  test_processIncoming_applyIncomingBatchSize_smaller();
+  test_processIncoming_applyIncomingBatchSize_multiple();
+  test_processIncoming_failed_records();
+  test_processIncoming_decrypt_failed();
   test_uploadOutgoing_toEmptyServer();
   test_uploadOutgoing_failed();
   test_uploadOutgoing_MAX_UPLOAD_RECORDS();
   test_syncFinish_noDelete();
   test_syncFinish_deleteByIds();
   test_syncFinish_deleteLotsInBatches();
   test_sync_partialUpload();
   test_canDecrypt_noCryptoKeys();
--- a/services/sync/tests/unit/test_utils_makeGUID.js
+++ b/services/sync/tests/unit/test_utils_makeGUID.js
@@ -1,29 +1,40 @@
-_("Make sure makeGUID makes guids of the right length/characters");
 Cu.import("resource://services-sync/util.js");
 
 const base64url =
   "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_";
 
 function run_test() {
+  _("Make sure makeGUID makes guids of the right length/characters");
   _("Create a bunch of guids to make sure they don't conflict");
   let guids = [];
   for (let i = 0; i < 1000; i++) {
     let newGuid = Utils.makeGUID();
     _("Generated " + newGuid);
 
     // Verify that the GUID's length is correct, even when it's URL encoded.
     do_check_eq(newGuid.length, 12);
     do_check_eq(encodeURIComponent(newGuid).length, 12);
 
     // Verify that the GUID only contains base64url characters
     do_check_true(Array.every(newGuid, function(chr) {
       return base64url.indexOf(chr) != -1;
     }));
 
+    // Verify that Utils.checkGUID() correctly identifies them as valid.
+    do_check_true(Utils.checkGUID(newGuid));
+
     // Verify uniqueness within our sample of 1000. This could cause random
     // failures, but they should be extremely rare. Otherwise we'd have a
     // problem with GUID collisions.
     do_check_true(guids.every(function(g) { return g != newGuid; }));
     guids.push(newGuid);
   }
+
+  _("Make sure checkGUID fails for invalid GUIDs");
+  do_check_false(Utils.checkGUID(undefined));
+  do_check_false(Utils.checkGUID(null));
+  do_check_false(Utils.checkGUID(""));
+  do_check_false(Utils.checkGUID("blergh"));
+  do_check_false(Utils.checkGUID("ThisGUIDisWayTooLong"));
+  do_check_false(Utils.checkGUID("Invalid!!!!!"));
 }