Bug 1291821 - Track incomplete stages and re-sync them r=rnewman draft
authorGrisha Kruglov <gkruglov@mozilla.com>
Tue, 29 Nov 2016 20:38:17 -0800
changeset 445751 c49abe8e93423cd3c52de26a6cb8ab3419e0aed0
parent 445750 b5365e77e4af402bd45f646e2df34387d1fff2d2
child 445752 7fb9f3be33549ee0f78ed0661e280702a2afb792
push id37599
push usergkruglov@mozilla.com
push dateWed, 30 Nov 2016 06:33:43 +0000
reviewersrnewman
bugs1291821
milestone53.0a1
Bug 1291821 - Track incomplete stages and re-sync them r=rnewman Stage re-sync is requested if: - We hit a 412 either during batching download or batching upload - We hit a sync deadline either during batching download or when merging records from the buffer SessionStoreDelegate interface was expanded with onStoreFailed, indicating that not just a particular record failed, but the whole operation did. onFetchFailed is used to inform delegates of 412/deadline failures during downloads. Three new exception types were added, to facilitated messaging between different layers. MozReview-Commit-ID: Ltdi5noEvdV
mobile/android/base/android-services.mozbuild
mobile/android/services/src/main/java/org/mozilla/gecko/fxa/sync/FxAccountSyncAdapter.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/CollectionConcurrentModificationException.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/GlobalSession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/ReflowIsNecessaryException.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/SyncDeadlineReachedException.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/delegates/GlobalSessionCallback.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ConcurrentRecordConsumer.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsConsumerDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ServerLocalSynchronizerSession.java
mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java
mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java
mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/SynchronizerHelpers.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer15RepositorySession.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServerLocalSynchronizer.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/MockGlobalSessionCallback.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -861,16 +861,17 @@ sync_java_files = [TOPSRCDIR + '/mobile/
     'fxa/SyncStatusListener.java',
     'push/autopush/AutopushClient.java',
     'push/autopush/AutopushClientException.java',
     'push/RegisterUserAgentResponse.java',
     'push/SubscribeChannelResponse.java',
     'sync/AlreadySyncingException.java',
     'sync/BackoffHandler.java',
     'sync/BadRequiredFieldJSONException.java',
+    'sync/CollectionConcurrentModificationException.java',
     'sync/CollectionKeys.java',
     'sync/CommandProcessor.java',
     'sync/CommandRunner.java',
     'sync/CredentialException.java',
     'sync/crypto/CryptoException.java',
     'sync/crypto/CryptoInfo.java',
     'sync/crypto/HKDF.java',
     'sync/crypto/HMACVerificationException.java',
@@ -939,16 +940,17 @@ sync_java_files = [TOPSRCDIR + '/mobile/
     'sync/net/WBORequestDelegate.java',
     'sync/NoCollectionKeysSetException.java',
     'sync/NodeAuthenticationException.java',
     'sync/NonArrayJSONException.java',
     'sync/NonObjectJSONException.java',
     'sync/NullClusterURLException.java',
     'sync/PersistedMetaGlobal.java',
     'sync/PrefsBackoffHandler.java',
+    'sync/ReflowIsNecessaryException.java',
     'sync/repositories/android/AndroidBrowserBookmarksDataAccessor.java',
     'sync/repositories/android/AndroidBrowserBookmarksRepository.java',
     'sync/repositories/android/AndroidBrowserBookmarksRepositorySession.java',
     'sync/repositories/android/AndroidBrowserHistoryDataAccessor.java',
     'sync/repositories/android/AndroidBrowserHistoryRepository.java',
     'sync/repositories/android/AndroidBrowserHistoryRepositorySession.java',
     'sync/repositories/android/AndroidBrowserRepository.java',
     'sync/repositories/android/AndroidBrowserRepositoryDataAccessor.java',
@@ -1053,16 +1055,17 @@ sync_java_files = [TOPSRCDIR + '/mobile/
     'sync/stage/PasswordsServerSyncStage.java',
     'sync/stage/ServerSyncStage.java',
     'sync/stage/SyncClientsEngineStage.java',
     'sync/stage/UploadMetaGlobalStage.java',
     'sync/Sync11Configuration.java',
     'sync/SyncConfiguration.java',
     'sync/SyncConfigurationException.java',
     'sync/SyncConstants.java',
+    'sync/SyncDeadlineReachedException.java',
     'sync/SyncException.java',
     'sync/synchronizer/ConcurrentRecordConsumer.java',
     'sync/synchronizer/RecordConsumer.java',
     'sync/synchronizer/RecordsChannel.java',
     'sync/synchronizer/RecordsChannelDelegate.java',
     'sync/synchronizer/RecordsConsumerDelegate.java',
     'sync/synchronizer/ServerLocalSynchronizer.java',
     'sync/synchronizer/ServerLocalSynchronizerSession.java',
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/fxa/sync/FxAccountSyncAdapter.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/fxa/sync/FxAccountSyncAdapter.java
@@ -48,19 +48,20 @@ import org.mozilla.gecko.sync.stage.Glob
 import org.mozilla.gecko.sync.telemetry.TelemetryContract;
 import org.mozilla.gecko.tokenserver.TokenServerClient;
 import org.mozilla.gecko.tokenserver.TokenServerClientDelegate;
 import org.mozilla.gecko.tokenserver.TokenServerException;
 import org.mozilla.gecko.tokenserver.TokenServerToken;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumSet;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 public class FxAccountSyncAdapter extends AbstractThreadedSyncAdapter {
   private static final String LOG_TAG = FxAccountSyncAdapter.class.getSimpleName();
@@ -118,18 +119,25 @@ public class FxAccountSyncAdapter extend
       super.postponeSync(millis);
     }
 
     @Override
     public void rejectSync() {
       super.rejectSync();
     }
 
+    /* package-local */ void requestFollowUpSync(String stage) {
+      this.stageNamesForFollowUpSync.add(stage);
+    }
+
     protected final Collection<String> stageNamesToSync;
 
+    // Keeps track of incomplete stages during this sync that need to be re-synced once we're done.
+    private final List<String> stageNamesForFollowUpSync = Collections.synchronizedList(new ArrayList<String>());
+
     public SyncDelegate(BlockingQueue<Result> latch, SyncResult syncResult, AndroidFxAccount fxAccount, Collection<String> stageNamesToSync) {
       super(latch, syncResult);
       this.stageNamesToSync = Collections.unmodifiableCollection(stageNamesToSync);
     }
 
     public Collection<String> getStageNamesToSync() {
       return this.stageNamesToSync;
     }
@@ -178,16 +186,25 @@ public class FxAccountSyncAdapter extend
           "Firefox Account informMigrated called, but it's not yet possible to migrate.  " +
           "Ignoring even though something is terribly wrong.");
     }
 
     @Override
     public void handleStageCompleted(Stage currentState, GlobalSession globalSession) {
     }
 
+    /**
+     * Schedule an incomplete stage for a follow-up sync.
+     */
+    @Override
+    public void handleIncompleteStage(Stage currentState,
+                                      GlobalSession globalSession) {
+      syncDelegate.requestFollowUpSync(currentState.getRepositoryName());
+    }
+
     @Override
     public void handleSuccess(GlobalSession globalSession) {
       Logger.info(LOG_TAG, "Global session succeeded.");
 
       // Get the number of clients, so we can schedule the sync interval accordingly.
       try {
         int otherClientsCount = globalSession.getClientsDelegate().getClientsCount();
         Logger.debug(LOG_TAG, "" + otherClientsCount + " other client(s).");
@@ -569,12 +586,29 @@ public class FxAccountSyncAdapter extend
       latch.take();
     } catch (Exception e) {
       Logger.error(LOG_TAG, "Got error syncing.", e);
       syncDelegate.handleError(e);
     } finally {
       fxAccount.releaseSharedAccountStateLock();
     }
 
-    Logger.info(LOG_TAG, "Syncing done.");
+    // If there are any incomplete stages, request a follow-up sync. Otherwise, we're done.
+    // Incomplete stage is:
+    // - one that hit a 412 error during either upload or download of data, indicating that
+    //   its collection has been modified remotely, or
+    // - one that hit a sync deadline
+    final String[] stagesToSyncAgain;
+    synchronized (syncDelegate.stageNamesForFollowUpSync) {
+      stagesToSyncAgain = syncDelegate.stageNamesForFollowUpSync.toArray(
+              new String[syncDelegate.stageNamesForFollowUpSync.size()]
+      );
+    }
+
+    if (stagesToSyncAgain.length > 0) {
+      Logger.info(LOG_TAG, "Syncing done. Requesting an immediate follow-up sync.");
+      fxAccount.requestImmediateSync(stagesToSyncAgain, null);
+    } else {
+      Logger.info(LOG_TAG, "Syncing done.");
+    }
     lastSyncRealtimeMillis = SystemClock.elapsedRealtime();
   }
 }
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/CollectionConcurrentModificationException.java
@@ -0,0 +1,15 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync;
+
+/**
+ * Thrown when a collection has been modified by another client while we were either
+ * downloading from it or uploading to it.
+ *
+ * @author grisha
+ */
+public class CollectionConcurrentModificationException extends ReflowIsNecessaryException {
+    private static final long serialVersionUID = 2701457832508838524L;
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/GlobalSession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/GlobalSession.java
@@ -474,16 +474,21 @@ public class GlobalSession implements Ht
       //  e is null, or we aborted for a non-HTTP reason; okay to upload new meta/global record.
       if (this.hasUpdatedMetaGlobal()) {
         this.uploadUpdatedMetaGlobal(); // Only logs errors; does not call abort.
       }
     }
     this.callback.handleError(this, e);
   }
 
+  public void handleIncompleteStage() {
+    // Let our delegate know that current stage is incomplete and needs to be synced again.
+    callback.handleIncompleteStage(this.currentState, this);
+  }
+
   public void handleHTTPError(SyncStorageResponse response, String reason) {
     // TODO: handling of 50x (backoff), 401 (node reassignment or auth error).
     // Fall back to aborting.
     Logger.warn(LOG_TAG, "Aborting sync due to HTTP " + response.getStatusCode());
     this.interpretHTTPFailure(response.httpResponse());
     this.abort(new HTTPFailureException(response), reason);
   }
 
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/ReflowIsNecessaryException.java
@@ -0,0 +1,18 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync;
+
+/**
+ * Used by SynchronizerSession to indicate that reflow of a stage is necessary.
+ * To reflow a stage is to request that it is synced using the same (or earlier) timestamps.
+ *
+ * Stages which complete only partially due to hitting a concurrent collection modification error or
+ * hitting a sync deadline should be re-flowed as soon as possible.
+ *
+ * @author grisha
+ */
+public class ReflowIsNecessaryException extends Exception {
+    private static final long serialVersionUID = -2614772437814638768L;
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/SyncDeadlineReachedException.java
@@ -0,0 +1,14 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync;
+
+/**
+ * Thrown when we've hit a self-imposed sync deadline, and decided not to proceed.
+ *
+ * @author grisha
+ */
+public class SyncDeadlineReachedException extends ReflowIsNecessaryException {
+    private static final long serialVersionUID = 2305367921350245484L;
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/delegates/GlobalSessionCallback.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/delegates/GlobalSessionCallback.java
@@ -33,17 +33,17 @@ public interface GlobalSessionCallback {
    * This account should stop syncing immediately, and arrange to delete itself.
    */
   void informMigrated(GlobalSession session);
 
   void handleAborted(GlobalSession globalSession, String reason);
   void handleError(GlobalSession globalSession, Exception ex);
   void handleSuccess(GlobalSession globalSession);
   void handleStageCompleted(Stage currentState, GlobalSession globalSession);
-
+  void handleIncompleteStage(Stage currentState, GlobalSession globalSession);
   /**
    * Called when a {@link GlobalSession} wants to know if it should continue
    * to make storage requests.
    *
    * @return false if the session should make no further requests.
    */
   boolean shouldBackOffStorage();
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
@@ -2,16 +2,17 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.middleware;
 
 import android.os.SystemClock;
 import android.support.annotation.VisibleForTesting;
 
+import org.mozilla.gecko.sync.SyncDeadlineReachedException;
 import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
 import org.mozilla.gecko.sync.repositories.InactiveSessionException;
 import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
@@ -30,18 +31,16 @@ import java.util.concurrent.Executors;
  * @author grisha
  */
 /* package-local */ class BufferingMiddlewareRepositorySession extends MiddlewareRepositorySession {
     private final BufferStorage bufferStorage;
     private final long syncDeadlineMillis;
 
     private ExecutorService storeDelegateExecutor = Executors.newSingleThreadExecutor();
 
-    private volatile boolean storeMarkedIncomplete = false;
-
     /* package-local */ BufferingMiddlewareRepositorySession(
             RepositorySession repositorySession, MiddlewareRepository repository,
             long syncDeadlineMillis, BufferStorage bufferStorage) {
         super(repositorySession, repository);
         this.syncDeadlineMillis = syncDeadlineMillis;
         this.bufferStorage = bufferStorage;
     }
 
@@ -70,97 +69,84 @@ import java.util.concurrent.Executors;
         bufferStorage.clear();
     }
 
     @Override
     public void store(Record record) throws NoStoreDelegateException {
         bufferStorage.addOrReplace(record);
     }
 
+    /**
+     * When source fails to provide all records, we need to decide what to do with the buffer.
+     * We might fail because of a network partition, or because of a concurrent modification of a
+     * collection, or because we ran out of time fetching records, or some other reason.
+     *
+     * Either way we do not clear the buffer in any error scenario, but rather
+     * allow it to be re-filled, replacing existing records with their newer versions if necessary.
+     *
+     * If a collection has been modified, affected records' last-modified timestamps will be bumped,
+     * and we will receive those records during the next sync. If we already have them in our buffer,
+     * we replace our now-old copy. Otherwise, they are new records and we just append them.
+     *
+     * Incoming records are mapped to existing ones via GUIDs.
+     */
     @Override
     public void storeIncomplete() {
-        storeMarkedIncomplete = true;
+        bufferStorage.flush();
     }
 
     @Override
     public void storeDone() {
         storeDone(System.currentTimeMillis());
     }
 
     @Override
     public void storeFlush() {
         bufferStorage.flush();
     }
 
     @Override
     public void storeDone(final long end) {
-        doStoreDonePrepare();
+        bufferStorage.flush();
 
-        // Determine if we have enough time to perform consistency checks on the buffered data and
-        // then store it. If we don't have enough time now, we keep our buffer and try again later.
-        // We don't store results of a buffer consistency check anywhere, so we can't treat it
-        // separately from storage.
-        if (storeMarkedIncomplete || !mayProceedToMergeBuffer()) {
+        // Determine if we have enough time to merge the buffer data.
+        // If we don't have enough time now, we keep our buffer and try again later.
+        if (!mayProceedToMergeBuffer()) {
             super.abort();
-            storeDelegate.deferredStoreDelegate(storeDelegateExecutor).onStoreCompleted(end);
+            storeDelegate.deferredStoreDelegate(storeDelegateExecutor).onStoreFailed(new SyncDeadlineReachedException());
             return;
         }
 
-        // Separate actual merge, so that it may be tested without involving system clock.
-        doStoreDone(end);
+        doMergeBuffer(end);
     }
 
     @VisibleForTesting
-    public void doStoreDonePrepare() {
-        // Now that records stopped flowing, persist them.
-        bufferStorage.flush();
-    }
-
-    @VisibleForTesting
-    public void doStoreDone(final long end) {
-        final Collection<Record> buffer = bufferStorage.all();
+    /* package-local */ void doMergeBuffer(long end) {
+        final Collection<Record> bufferData = bufferStorage.all();
 
         // Trivial case of an empty buffer.
-        if (buffer.isEmpty()) {
+        if (bufferData.isEmpty()) {
             super.storeDone(end);
             return;
         }
 
-        // Flush our buffer to the wrapped local repository. Data goes live!
+        // Let session handle actual storing of records as it pleases.
         try {
-            for (Record record : buffer) {
+            for (Record record : bufferData) {
                 this.inner.store(record);
             }
         } catch (NoStoreDelegateException e) {
-            // At this point we should have a delegate, so this won't happen.
+            // At this point we should have a store delegate set on the session, so this won't happen.
         }
 
-        // And, we're done!
+        // Let session know that there are no more records to store.
         super.storeDone(end);
     }
 
     /**
-     * When source fails to provide more records, we need to decide what to do with the buffer.
-     * We might fail because of a network partition, or because of a concurrent modification of a
-     * collection. Either way we do not clear the buffer in a general case. If a collection has been
-     * modified, affected records' last-modified timestamps will be bumped, and we will receive those
-     * records during the next sync. If we already have them in our buffer, we replace our now-old
-     * copy. Otherwise, they are new records and we just append them.
-     *
-     * We depend on GUIDs to be a primary key for incoming records.
-     *
-     * @param e indicates reason of failure.
-     */
-    @Override
-    public void sourceFailed(Exception e) {
-        bufferStorage.flush();
-        super.sourceFailed(e);
-    }
-
-    /**
      * Session abnormally aborted. This doesn't mean our so-far buffered data is invalid.
      * Clean up after ourselves, if there's anything to clean up.
      */
     @Override
     public void abort() {
         bufferStorage.flush();
         super.abort();
     }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
@@ -19,17 +19,17 @@ import org.mozilla.gecko.sync.repositori
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
 /**
  * A <code>RepositorySession</code> is created and used thusly:
  *
  *<ul>
  * <li>Construct, with a reference to its parent {@link Repository}, by calling
- *   {@link Repository#createSession(RepositorySessionCreationDelegate, android.content.Context)}.</li>
+ *   {@link Repository#createSession(org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate, android.content.Context)}.</li>
  * <li>Populate with saved information by calling {@link #unbundle(RepositorySessionBundle)}.</li>
  * <li>Begin a sync by calling {@link #begin(RepositorySessionBeginDelegate)}. <code>begin()</code>
  *   is an appropriate place to initialize expensive resources.</li>
  * <li>Perform operations such as {@link #fetchSince(long, RepositorySessionFetchRecordsDelegate)} and
  *   {@link #store(Record)}.</li>
  * <li>Finish by calling {@link #finish(RepositorySessionFinishDelegate)}, retrieving and storing
  *   the current bundle.</li>
  *</ul>
@@ -154,24 +154,16 @@ public abstract class RepositorySession 
   /**
    * Indicates that a number of records have been stored, more are still to come but after some time,
    * and now would be a good time to flush records and perform any other similar operations.
    */
   public void storeFlush() {
   }
 
   /**
-   * During flow of records, indicates that source failed.
-   *
-   * @param e indicates reason of failure.
-     */
-  public void sourceFailed(Exception e) {
-  }
-
-  /**
    * Indicates that a flow of records have been completed.
    */
   public void performCleanup() {
   }
 
   public abstract void wipe(RepositorySessionWipeDelegate delegate);
 
   /**
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java
@@ -49,9 +49,19 @@ public class DeferredRepositorySessionSt
   public void onStoreCompleted(final long storeEnd) {
     executor.execute(new Runnable() {
       @Override
       public void run() {
         inner.onStoreCompleted(storeEnd);
       }
     });
   }
+
+  @Override
+  public void onStoreFailed(final Exception e) {
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        inner.onStoreFailed(e);
+      }
+    });
+  }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java
@@ -9,15 +9,16 @@ import java.util.concurrent.ExecutorServ
 /**
  * These methods *must* be invoked asynchronously. Use deferredStoreDelegate if you
  * need help doing this.
  *
  * @author rnewman
  *
  */
 public interface RepositorySessionStoreDelegate {
-  public void onRecordStoreFailed(Exception ex, String recordGuid);
+  void onRecordStoreFailed(Exception ex, String recordGuid);
 
   // Called with a GUID when store has succeeded.
-  public void onRecordStoreSucceeded(String guid);
-  public void onStoreCompleted(long storeEnd);
-  public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor);
+  void onRecordStoreSucceeded(String guid);
+  void onStoreCompleted(long storeEnd);
+  void onStoreFailed(Exception e);
+  RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor);
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
@@ -5,31 +5,32 @@
 package org.mozilla.gecko.sync.repositories.downloaders;
 
 import android.net.Uri;
 import android.os.SystemClock;
 import android.support.annotation.Nullable;
 import android.support.annotation.VisibleForTesting;
 
 import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
 import org.mozilla.gecko.sync.CryptoRecord;
 import org.mozilla.gecko.sync.DelayedWorkTracker;
+import org.mozilla.gecko.sync.SyncDeadlineReachedException;
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
-import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Batching Downloader implements batching protocol as supported by Sync 1.5.
  *
  * Downloader's batching behaviour is configured via two parameters, obtained from the repository:
@@ -180,19 +181,19 @@ public class BatchingDownloader {
                 this.lastModified = currentLastModifiedTimestamp;
             }
             lastModifiedChanged = !this.lastModified.equals(currentLastModifiedTimestamp);
         }
 
         // We expected server to fail our request with 412 in case of concurrent modifications, so
         // this is unexpected. However, let's treat this case just as if we received a 412.
         if (lastModifiedChanged) {
-            this.abort(
+            this.handleFetchFailed(
                     fetchRecordsDelegate,
-                    new ConcurrentModificationException("Last-modified timestamp has changed unexpectedly")
+                    new CollectionConcurrentModificationException()
             );
             return;
         }
 
         // If we can (or must) stop batching at this point, let the delegate know that we're all done!
         final String offset = response.weaveOffset();
         if (offset == null || !allowMultipleBatches) {
             final long normalizedTimestamp = response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED);
@@ -217,17 +218,17 @@ public class BatchingDownloader {
             public void run() {
                 Logger.debug(LOG_TAG, "Running onBatchCompleted.");
                 fetchRecordsDelegate.onBatchCompleted();
             }
         });
 
         // Should we proceed, however? Do we have enough time?
         if (!mayProceedWithBatching(fetchDeadline)) {
-            this.abort(fetchRecordsDelegate, new Exception("Not enough time to complete next batch"));
+            this.handleFetchFailed(fetchRecordsDelegate, new SyncDeadlineReachedException());
             return;
         }
 
         // Create and execute new batch request.
         try {
             final SyncStorageCollectionRequest newRequest = makeSyncStorageCollectionRequest(newer,
                     limit, full, sort, ids, offset);
             this.fetchWithParameters(newer, limit, full, sort, ids, newRequest, fetchRecordsDelegate);
@@ -237,20 +238,26 @@ public class BatchingDownloader {
                 public void run() {
                     Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
                     fetchRecordsDelegate.onFetchFailed(e);
                 }
             });
         }
     }
 
-    public void onFetchFailed(final Exception ex,
-                              final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
-                              final SyncStorageCollectionRequest request) {
-        removeRequestFromPending(request);
+    private void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+                                  final Exception ex) {
+        handleFetchFailed(fetchRecordsDelegate, ex, null);
+    }
+
+    /* package-local */ void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+                              final Exception ex,
+                              @Nullable final SyncStorageCollectionRequest request) {
+        this.removeRequestFromPending(request);
+        this.abortRequests();
         this.workTracker.delayWorkItem(new Runnable() {
             @Override
             public void run() {
                 Logger.debug(LOG_TAG, "Running onFetchFailed.");
                 fetchRecordsDelegate.onFetchFailed(ex);
             }
         });
     }
@@ -286,28 +293,16 @@ public class BatchingDownloader {
         }
     }
 
     @Nullable
     protected synchronized String getLastModified() {
         return this.lastModified;
     }
 
-    private void abort(final RepositorySessionFetchRecordsDelegate delegate, final Exception exception) {
-        Logger.error(LOG_TAG, exception.getMessage());
-        this.abortRequests();
-        this.workTracker.delayWorkItem(new Runnable() {
-            @Override
-            public void run() {
-                Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
-                delegate.onFetchFailed(exception);
-            }
-        });
-    }
-
     private static boolean mayProceedWithBatching(long deadline) {
         // For simplicity, allow batching to proceed if there's at least a minute left for the sync.
         // This should be enough to fetch and process records in the batch.
         final long timeLeft = deadline - SystemClock.elapsedRealtime();
         return timeLeft > TimeUnit.MINUTES.toMillis(1);
     }
 
     @VisibleForTesting
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java
@@ -1,27 +1,26 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.repositories.downloaders;
 
 import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
 import org.mozilla.gecko.sync.CryptoRecord;
 import org.mozilla.gecko.sync.HTTPFailureException;
 import org.mozilla.gecko.sync.crypto.KeyBundle;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 
 
-import java.util.ConcurrentModificationException;
-
 /**
  * Delegate that gets passed into fetch methods to handle server response from fetch.
  */
 public class BatchingDownloaderDelegate extends WBOCollectionRequestDelegate {
     public static final String LOG_TAG = "BatchingDownloaderDelegate";
 
     private final BatchingDownloader downloader;
     private final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate;
@@ -55,47 +54,48 @@ public class BatchingDownloaderDelegate 
     @Override
     public String ifUnmodifiedSince() {
         return this.downloader.getLastModified();
     }
 
     @Override
     public void handleRequestSuccess(SyncStorageResponse response) {
         Logger.debug(LOG_TAG, "Fetch done.");
-        if (response.lastModified() != null) {
-            this.downloader.onFetchCompleted(response, this.fetchRecordsDelegate, this.request,
-                    this.newer, this.batchLimit, this.full, this.sort, this.ids);
+
+        // Sanity check.
+        if (response.lastModified() == null) {
+            this.downloader.handleFetchFailed(
+                    this.fetchRecordsDelegate,
+                    new IllegalStateException("Missing last modified header from response"),
+                    this.request
+            );
             return;
         }
-        this.downloader.onFetchFailed(
-                new IllegalStateException("Missing last modified header from response"),
-                this.fetchRecordsDelegate,
-                this.request);
+
+        this.downloader.onFetchCompleted(response, this.fetchRecordsDelegate, this.request,
+                this.newer, this.batchLimit, this.full, this.sort, this.ids);
     }
 
     @Override
     public void handleRequestFailure(SyncStorageResponse response) {
         Logger.warn(LOG_TAG, "Got a non-success response.");
-        // Handle concurrent modification errors separately. We will need to signal upwards that
-        // this happened, in case stage buffer will want to clean up.
+        // Handle concurrent modification errors separately.
+        final Exception ex;
         if (response.getStatusCode() == 412) {
-            this.downloader.onFetchFailed(
-                    new ConcurrentModificationException(),
-                    this.fetchRecordsDelegate,
-                    this.request
-            );
+            ex = new CollectionConcurrentModificationException();
         } else {
-            this.handleRequestError(new HTTPFailureException(response));
+            ex = new HTTPFailureException(response);
         }
+        this.handleRequestError(ex);
     }
 
     @Override
     public void handleRequestError(final Exception ex) {
         Logger.warn(LOG_TAG, "Got request error.", ex);
-        this.downloader.onFetchFailed(ex, this.fetchRecordsDelegate, this.request);
+        this.downloader.handleFetchFailed(this.fetchRecordsDelegate, ex, this.request);
     }
 
     @Override
     public void handleWBO(CryptoRecord record) {
         this.downloader.onFetchedRecord(record, this.fetchRecordsDelegate);
     }
 
     @Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
@@ -3,16 +3,17 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.repositories.uploaders;
 
 import android.net.Uri;
 import android.support.annotation.VisibleForTesting;
 
 import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.Server15RecordPostFailedException;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
 import org.mozilla.gecko.sync.repositories.domain.Record;
@@ -175,17 +176,17 @@ public class BatchingUploader {
             @Override
             public void run() {
                 commitIfNecessaryAfterLastPayload();
             }
         });
     }
 
     @VisibleForTesting
-    protected void commitIfNecessaryAfterLastPayload() {
+    /* package-local */ void commitIfNecessaryAfterLastPayload() {
         // Must be called after last payload upload finishes.
         synchronized (payload) {
             // If we have any pending records in the Payload, flush them!
             if (!payload.isEmpty()) {
                 flush(true, true);
 
             // If we have an empty payload but need to commit the batch in the batching mode, flush!
             } else if (batchMeta.needToCommit() && Boolean.TRUE.equals(inBatchingMode)) {
@@ -278,16 +279,20 @@ public class BatchingUploader {
     }
 
     public void recordFailed(final Exception e, final String recordGuid) {
         Logger.debug(LOG_TAG, "Record store failed for guid " + recordGuid + " with exception: " + e.toString());
         recordUploadFailed = true;
         sessionStoreDelegate.onRecordStoreFailed(e, recordGuid);
     }
 
+    /* package-local */ void concurrentModificationDetected() {
+        sessionStoreDelegate.onStoreFailed(new CollectionConcurrentModificationException());
+    }
+
     private static void bumpTimestampTo(final AtomicLong current, long newValue) {
         while (true) {
             long existing = current.get();
             if (existing > newValue) {
                 return;
             }
             if (current.compareAndSet(existing, newValue)) {
                 return;
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java
@@ -162,17 +162,21 @@ import java.util.ArrayList;
         failed = null;
 
         // And we're done! Let uploader finish up.
         uploader.payloadSucceeded(response, isCommit, isLastPayload);
     }
 
     @Override
     public void handleRequestFailure(final SyncStorageResponse response) {
-        this.handleRequestError(new HTTPFailureException(response));
+        if (response.getStatusCode() == 412) {
+            uploader.concurrentModificationDetected();
+        } else {
+            this.handleRequestError(new HTTPFailureException(response));
+        }
     }
 
     @Override
     public void handleRequestError(Exception e) {
         for (String guid : postedRecordGuids) {
             uploader.recordFailed(e, guid);
         }
         // GC
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
@@ -9,16 +9,17 @@ import android.os.SystemClock;
 
 import org.mozilla.gecko.background.common.log.Logger;
 import org.mozilla.gecko.sync.EngineSettings;
 import org.mozilla.gecko.sync.GlobalSession;
 import org.mozilla.gecko.sync.HTTPFailureException;
 import org.mozilla.gecko.sync.MetaGlobalException;
 import org.mozilla.gecko.sync.NoCollectionKeysSetException;
 import org.mozilla.gecko.sync.NonObjectJSONException;
+import org.mozilla.gecko.sync.ReflowIsNecessaryException;
 import org.mozilla.gecko.sync.SynchronizerConfiguration;
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.crypto.KeyBundle;
 import org.mozilla.gecko.sync.delegates.WipeServerDelegate;
 import org.mozilla.gecko.sync.middleware.Crypto5MiddlewareRepository;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.BaseResource;
 import org.mozilla.gecko.sync.net.SyncStorageRequest;
@@ -617,13 +618,19 @@ public abstract class ServerSyncStage ex
       if (response.retryAfterInSeconds() > 0) {
         session.handleHTTPError(response, reason); // Calls session.abort().
         return;
       } else {
         session.interpretHTTPFailure(response.httpResponse()); // Does not call session.abort().
       }
     }
 
+    // Let global session know that this stage is not complete (due to a 412 or hitting a deadline).
+    // This stage will be re-synced once current sync is complete.
+    if (lastException instanceof ReflowIsNecessaryException) {
+      session.handleIncompleteStage();
+    }
+
     Logger.info(LOG_TAG, "Advancing session even though stage failed (took " + getStageDurationString() +
         "). Timestamps not persisted.");
     session.advance();
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ConcurrentRecordConsumer.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ConcurrentRecordConsumer.java
@@ -60,17 +60,21 @@ class ConcurrentRecordConsumer extends R
     Logger.trace(LOG_TAG, "Record stored. Notifying.");
     synchronized (countMonitor) {
       counter++;
     }
   }
 
   private void consumerIsDone() {
     Logger.debug(LOG_TAG, "Consumer is done. Processed " + counter + ((counter == 1) ? " record." : " records."));
-    delegate.consumerIsDone(allRecordsQueued);
+    if (allRecordsQueued) {
+      delegate.consumerIsDoneFull();
+    } else {
+      delegate.consumerIsDonePartial();
+    }
   }
 
   @Override
   public void run() {
     Record record;
 
     while (true) {
       // The queue is concurrent-safe.
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -1,19 +1,23 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.synchronizer;
 
+import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
+
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.ReflowIsNecessaryException;
 import org.mozilla.gecko.sync.ThreadPool;
 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
 import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionBeginDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionStoreDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
@@ -67,16 +71,18 @@ public class RecordsChannel implements
   RepositorySessionBeginDelegate {
 
   private static final String LOG_TAG = "RecordsChannel";
   public RepositorySession source;
   public RepositorySession sink;
   private final RecordsChannelDelegate delegate;
   private long fetchEnd = -1;
 
+  private volatile ReflowIsNecessaryException reflowException;
+
   protected final AtomicInteger numFetched = new AtomicInteger();
   protected final AtomicInteger numFetchFailed = new AtomicInteger();
   protected final AtomicInteger numStored = new AtomicInteger();
   protected final AtomicInteger numStoreFailed = new AtomicInteger();
 
   public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
     this.source    = source;
     this.sink      = sink;
@@ -88,17 +94,17 @@ public class RecordsChannel implements
    * A separate thread is waiting for us to notify it of work to do.
    * When we tell it to stop, it'll stop. We do that when the fetch
    * is completed.
    * When it stops, we tell the sink that there are no more records,
    * and wait for the sink to tell us that storing is done.
    * Then we notify our delegate of completion.
    */
   private RecordConsumer consumer;
-  private boolean waitingForQueueDone = false;
+  private volatile boolean waitingForQueueDone = false;
   private final ConcurrentLinkedQueue<Record> toProcess = new ConcurrentLinkedQueue<Record>();
 
   @Override
   public ConcurrentLinkedQueue<Record> getQueue() {
     return toProcess;
   }
 
   protected boolean isReady() {
@@ -199,19 +205,22 @@ public class RecordsChannel implements
     } catch (NoStoreDelegateException e) {
       Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e);
       delegate.onFlowStoreFailed(this, e, record.guid);
     }
   }
 
   @Override
   public void onFetchFailed(Exception ex) {
-    Logger.warn(LOG_TAG, "onFetchFailed. Informing sink, calling for immediate stop.", ex);
-    sink.sourceFailed(ex);
+    Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
     numFetchFailed.incrementAndGet();
+    if (ex instanceof ReflowIsNecessaryException) {
+      setReflowException((ReflowIsNecessaryException) ex);
+    }
+    // Sink will be informed once consumer finishes.
     this.consumer.halt();
     delegate.onFlowFetchFailed(this, ex);
   }
 
   @Override
   public void onFetchedRecord(Record record) {
     numFetched.incrementAndGet();
     this.toProcess.add(record);
@@ -241,38 +250,80 @@ public class RecordsChannel implements
   }
 
   @Override
   public void onRecordStoreSucceeded(String guid) {
     Logger.trace(LOG_TAG, "Stored record with guid " + guid);
     this.consumer.stored();
   }
 
+  @Override
+  public void consumerIsDoneFull() {
+    Logger.trace(LOG_TAG, "Consumer is done, processed all records. Are we waiting for it? " + waitingForQueueDone);
+    if (waitingForQueueDone) {
+      waitingForQueueDone = false;
+
+      // Now we'll be waiting for sink to call its delegate's onStoreCompleted or onStoreFailed.
+      this.sink.storeDone();
+    }
+  }
 
   @Override
-  public void consumerIsDone(boolean allRecordsQueued) {
-    Logger.trace(LOG_TAG, "Consumer is done. Are we waiting for it? " + waitingForQueueDone);
+  public void consumerIsDonePartial() {
+    Logger.trace(LOG_TAG, "Consumer is done, processed some records. Are we waiting for it? " + waitingForQueueDone);
     if (waitingForQueueDone) {
       waitingForQueueDone = false;
-      if (!allRecordsQueued) {
-        this.sink.storeIncomplete();
-      }
-      this.sink.storeDone(); // Now we'll be waiting for onStoreCompleted.
+
+      // Let sink clean up or flush records if necessary.
+      this.sink.storeIncomplete();
+
+      delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis());
     }
   }
 
   @Override
   public void onStoreCompleted(long storeEnd) {
     Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " +
                           "Fetch end is " + fetchEnd + ", store end is " + storeEnd);
     // Source might have used caches used to facilitate flow of records, so now is a good
     // time to clean up. Particularly pertinent for buffered sources.
+    // Rephrasing this in a more concrete way, buffers are cleared only once records have been merged
+    // locally and results of the merge have been uploaded to the server successfully.
     this.source.performCleanup();
-    // TODO: synchronize on consumer callback?
     delegate.onFlowCompleted(this, fetchEnd, storeEnd);
+
+  }
+
+  @Override
+  public void onStoreFailed(Exception ex) {
+    Logger.warn(LOG_TAG, "onStoreFailed. Calling for immediate stop.", ex);
+    if (ex instanceof ReflowIsNecessaryException) {
+      setReflowException((ReflowIsNecessaryException) ex);
+    }
+
+    // NB: consumer might or might not be running at this point. There are two cases here:
+    // 1) If we're storing records remotely, we might fail due to a 412.
+    // -- we might hit 412 at any point, so consumer might be in either state.
+    // Action: ignore consumer state, we have nothing else to do other to inform our delegate
+    // that we're done with this flow. Based on the reflow exception, it'll determine what to do.
+
+    // 2) If we're storing (merging) records locally, we might fail due to a sync deadline.
+    // -- we might hit a deadline only prior to attempting to merge records,
+    // -- at which point consumer would have finished already, and storeDone was called.
+    // Action: consumer state is known (done), so we can ignore it safely and inform our delegate
+    // that we're done.
+
+    // Prevent "once consumer is done..." actions from taking place. They already have (case 2), or
+    // we don't need them (case 1).
+    waitingForQueueDone = false;
+
+    // If consumer is still going at it, tell it to stop.
+    this.consumer.halt();
+
+    delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis());
   }
 
   @Override
   public void onBeginFailed(Exception ex) {
     delegate.onFlowBeginFailed(this, ex);
   }
 
   @Override
@@ -305,9 +356,22 @@ public class RecordsChannel implements
     return new DeferredRepositorySessionBeginDelegate(this, executor);
   }
 
   @Override
   public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
     // Lie outright. We know that all of our fetch methods are safe.
     return this;
   }
+
+  @Nullable
+  /* package-local */ synchronized ReflowIsNecessaryException getReflowException() {
+    return reflowException;
+  }
+
+  private synchronized void setReflowException(@NonNull ReflowIsNecessaryException e) {
+    // It is a mistake to set reflow exception multiple times.
+    if (reflowException != null) {
+      throw new IllegalStateException("Reflow exception already set: " + reflowException);
+    }
+    reflowException = e;
+  }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsConsumerDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsConsumerDelegate.java
@@ -4,20 +4,24 @@
 
 package org.mozilla.gecko.sync.synchronizer;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
 interface RecordsConsumerDelegate {
-  public abstract ConcurrentLinkedQueue<Record> getQueue();
+  ConcurrentLinkedQueue<Record> getQueue();
 
   /**
    * Called when no more items will be processed.
-   * If forced is true, the consumer is terminating because it was told to halt;
-   * not all items will necessarily have been processed.
-   * If forced is false, the consumer has invoked store and received an onStoreCompleted callback.
-   * @param forced
+   * Indicates that all items have been processed.
    */
-  public abstract void consumerIsDone(boolean forced);
-  public abstract void store(Record record);
+  void consumerIsDoneFull();
+
+  /**
+   * Called when no more items will be processed.
+   * Indicates that only some of the items have been processed.
+   */
+  void consumerIsDonePartial();
+
+  void store(Record record);
 }
\ No newline at end of file
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ServerLocalSynchronizerSession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ServerLocalSynchronizerSession.java
@@ -1,15 +1,16 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.synchronizer;
 
 import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.ReflowIsNecessaryException;
 import org.mozilla.gecko.sync.repositories.FetchFailedException;
 import org.mozilla.gecko.sync.repositories.StoreFailedException;
 
 /**
  * A <code>SynchronizerSession</code> designed to be used between a remote
  * server and a local repository.
  * <p>
  * Handles failure cases as follows (in the order they will occur during a sync):
@@ -24,16 +25,25 @@ public class ServerLocalSynchronizerSess
   protected static final String LOG_TAG = "ServLocSynchronizerSess";
 
   public ServerLocalSynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) {
     super(synchronizer, delegate);
   }
 
   @Override
   public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
+    // If a "reflow exception" was thrown, consider this synchronization failed.
+    final ReflowIsNecessaryException reflowException = recordsChannel.getReflowException();
+    if (reflowException != null) {
+      final String message = "Reflow is necessary: " + reflowException;
+      Logger.warn(LOG_TAG, message + " Aborting session.");
+      delegate.onSynchronizeFailed(this, reflowException, message);
+      return;
+    }
+
     // Fetch failures always abort.
     int numRemoteFetchFailed = recordsChannel.getFetchFailureCount();
     if (numRemoteFetchFailed > 0) {
       final String message = "Got " + numRemoteFetchFailed + " failures fetching remote records!";
       Logger.warn(LOG_TAG, message + " Aborting session.");
       delegate.onSynchronizeFailed(this, new FetchFailedException(), message);
       return;
     }
@@ -48,16 +58,25 @@ public class ServerLocalSynchronizerSess
       Logger.trace(LOG_TAG, "No failures storing local records.");
     }
 
     super.onFirstFlowCompleted(recordsChannel, fetchEnd, storeEnd);
   }
 
   @Override
   public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
+    // If a "reflow exception" was thrown, consider this synchronization failed.
+    final ReflowIsNecessaryException reflowException = recordsChannel.getReflowException();
+    if (reflowException != null) {
+      final String message = "Reflow is necessary: " + reflowException;
+      Logger.warn(LOG_TAG, message + " Aborting session.");
+      delegate.onSynchronizeFailed(this, reflowException, message);
+      return;
+    }
+
     // Fetch failures always abort.
     int numLocalFetchFailed = recordsChannel.getFetchFailureCount();
     if (numLocalFetchFailed > 0) {
       final String message = "Got " + numLocalFetchFailed + " failures fetching local records!";
       Logger.warn(LOG_TAG, message + " Aborting session.");
       delegate.onSynchronizeFailed(this, new FetchFailedException(), message);
       return;
     }
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java
@@ -741,16 +741,21 @@ public class TestBookmarks extends Andro
               }
             }
             finishAndNotify(session);
           }
 
           @Override
           public void onRecordStoreSucceeded(String guid) {
           }
+
+          @Override
+          public void onStoreFailed(Exception e) {
+
+          }
         };
         session.setStoreDelegate(storeDelegate);
         for (BookmarkRecord record : records) {
           try {
             session.store(record);
           } catch (NoStoreDelegateException e) {
             // Never happens.
           }
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java
@@ -116,16 +116,21 @@ public class TestStoreTracking extends A
             public void onBatchCompleted() {
 
             }
           });
         } catch (InactiveSessionException e) {
           performNotify(e);
         }
       }
+
+      @Override
+      public void onStoreFailed(Exception e) {
+
+      }
     };
 
     session.setStoreDelegate(storeDelegate);
     try {
       Logger.debug(getName(), "Storing...");
       session.store(record);
       session.storeDone();
     } catch (NoStoreDelegateException e) {
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java
@@ -6,30 +6,35 @@ package org.mozilla.gecko.background.syn
 import java.util.concurrent.ExecutorService;
 
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
 
 public class DefaultStoreDelegate extends DefaultDelegate implements RepositorySessionStoreDelegate {
 
   @Override
   public void onRecordStoreFailed(Exception ex, String guid) {
-    performNotify("Store failed", ex);
+    performNotify("Record store failed", ex);
   }
 
   @Override
   public void onRecordStoreSucceeded(String guid) {
     performNotify("DefaultStoreDelegate used", null);
   }
 
   @Override
   public void onStoreCompleted(long storeEnd) {
     performNotify("DefaultStoreDelegate used", null);
   }
 
   @Override
+  public void onStoreFailed(Exception ex) {
+    performNotify("Store failed", ex);
+  }
+
+  @Override
   public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) {
     final RepositorySessionStoreDelegate self = this;
     return new RepositorySessionStoreDelegate() {
 
       @Override
       public void onRecordStoreSucceeded(final String guid) {
         executor.execute(new Runnable() {
           @Override
@@ -55,16 +60,21 @@ public class DefaultStoreDelegate extend
           @Override
           public void run() {
             self.onStoreCompleted(storeEnd);
           }
         });
       }
 
       @Override
+      public void onStoreFailed(Exception e) {
+
+      }
+
+      @Override
       public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService newExecutor) {
         if (newExecutor == executor) {
           return this;
         }
         throw new IllegalArgumentException("Can't re-defer this delegate.");
       }
     };
   }
--- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java
+++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java
@@ -24,16 +24,22 @@ public class DefaultGlobalSessionCallbac
   public void informUpgradeRequiredResponse(GlobalSession session) {
   }
 
   @Override
   public void informMigrated(GlobalSession globalSession) {
   }
 
   @Override
+  public void handleIncompleteStage(Stage currentState,
+                                    GlobalSession globalSession) {
+
+  }
+
+  @Override
   public void handleAborted(GlobalSession globalSession, String reason) {
   }
 
   @Override
   public void handleError(GlobalSession globalSession, Exception ex) {
   }
 
   @Override
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/SynchronizerHelpers.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/SynchronizerHelpers.java
@@ -1,16 +1,18 @@
 /* Any copyright is dedicated to the Public Domain.
    http://creativecommons.org/publicdomain/zero/1.0/ */
 
 package org.mozilla.android.sync.test;
 
 import android.content.Context;
 import org.mozilla.gecko.background.common.log.Logger;
 import org.mozilla.gecko.background.testhelpers.WBORepository;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
+import org.mozilla.gecko.sync.SyncDeadlineReachedException;
 import org.mozilla.gecko.sync.repositories.FetchFailedException;
 import org.mozilla.gecko.sync.repositories.InactiveSessionException;
 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
 import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
 import org.mozilla.gecko.sync.repositories.StoreFailedException;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
@@ -18,32 +20,60 @@ import org.mozilla.gecko.sync.repositori
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
 import java.util.ArrayList;
 import java.util.concurrent.ExecutorService;
 
 public class SynchronizerHelpers {
   public static final String FAIL_SENTINEL = "Fail";
 
+  enum FailMode {
+    COLLECTION_MODIFIED,
+    DEADLINE_REACHED,
+    FETCH,
+    STORE
+  }
+
+  private static Exception getFailException(FailMode failMode) {
+    switch (failMode) {
+      case COLLECTION_MODIFIED:
+        return new CollectionConcurrentModificationException();
+      case DEADLINE_REACHED:
+        return new SyncDeadlineReachedException();
+      case FETCH:
+        return new FetchFailedException();
+      case STORE:
+        return new StoreFailedException();
+      default:
+        throw new IllegalStateException();
+    }
+  }
+
   /**
    * Store one at a time, failing if the guid contains FAIL_SENTINEL.
    */
   public static class FailFetchWBORepository extends WBORepository {
+    private final FailMode failMode;
+
+    public FailFetchWBORepository(FailMode failMode) {
+      this.failMode = failMode;
+    }
+
     @Override
     public void createSession(RepositorySessionCreationDelegate delegate,
                               Context context) {
       delegate.deferredCreationDelegate().onSessionCreated(new WBORepositorySession(this) {
         @Override
         public void fetchSince(long timestamp,
                                final RepositorySessionFetchRecordsDelegate delegate) {
           super.fetchSince(timestamp, new RepositorySessionFetchRecordsDelegate() {
             @Override
             public void onFetchedRecord(Record record) {
               if (record.guid.contains(FAIL_SENTINEL)) {
-                delegate.onFetchFailed(new FetchFailedException());
+                delegate.onFetchFailed(getFailException(failMode));
               } else {
                 delegate.onFetchedRecord(record);
               }
             }
 
             @Override
             public void onFetchFailed(Exception ex) {
               delegate.onFetchFailed(ex);
@@ -68,27 +98,38 @@ public class SynchronizerHelpers {
       });
     }
   }
 
   /**
    * Store one at a time, failing if the guid contains FAIL_SENTINEL.
    */
   public static class SerialFailStoreWBORepository extends WBORepository {
+    private final FailMode failMode;
+
+    public SerialFailStoreWBORepository(FailMode failMode) {
+      this.failMode = failMode;
+    }
+
     @Override
     public void createSession(RepositorySessionCreationDelegate delegate,
                               Context context) {
       delegate.deferredCreationDelegate().onSessionCreated(new WBORepositorySession(this) {
         @Override
         public void store(final Record record) throws NoStoreDelegateException {
           if (storeDelegate == null) {
             throw new NoStoreDelegateException();
           }
           if (record.guid.contains(FAIL_SENTINEL)) {
-            storeDelegate.onRecordStoreFailed(new StoreFailedException(), record.guid);
+            Exception ex = getFailException(failMode);
+            if (ex instanceof CollectionConcurrentModificationException) {
+              storeDelegate.onStoreFailed(ex);
+            } else {
+              storeDelegate.onRecordStoreFailed(ex, record.guid);
+            }
           } else {
             super.store(record);
           }
         }
       });
     }
   }
 
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java
@@ -1,108 +1,101 @@
 /* Any copyright is dedicated to the Public Domain.
    http://creativecommons.org/publicdomain/zero/1.0/ */
 
 package org.mozilla.android.sync.test;
 
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.android.sync.test.SynchronizerHelpers.FailFetchWBORepository;
 import org.mozilla.android.sync.test.helpers.ExpectSuccessRepositorySessionCreationDelegate;
 import org.mozilla.android.sync.test.helpers.ExpectSuccessRepositorySessionFinishDelegate;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
 import org.mozilla.gecko.background.testhelpers.WBORepository;
 import org.mozilla.gecko.background.testhelpers.WaitHelper;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
+import org.mozilla.gecko.sync.SyncDeadlineReachedException;
 import org.mozilla.gecko.sync.repositories.InactiveSessionException;
 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
-import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
 import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
 import org.mozilla.gecko.sync.synchronizer.RecordsChannel;
 import org.mozilla.gecko.sync.synchronizer.RecordsChannelDelegate;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(TestRunner.class)
 public class TestRecordsChannel {
 
-  protected WBORepository remote;
-  protected WBORepository local;
+  private WBORepository sourceRepository;
+  private RepositorySession sourceSession;
+  private WBORepository sinkRepository;
+  private RepositorySession sinkSession;
 
-  protected RepositorySession source;
-  protected RepositorySession sink;
-  protected RecordsChannelDelegate rcDelegate;
-
-  protected AtomicInteger numFlowFetchFailed;
-  protected AtomicInteger numFlowStoreFailed;
-  protected AtomicInteger numFlowCompleted;
-  protected AtomicBoolean flowBeginFailed;
-  protected AtomicBoolean flowFinishFailed;
+  private RecordsChannelDelegate rcDelegate;
 
-  public void doFlow(final Repository remote, final Repository local) throws Exception {
-    WaitHelper.getTestWaiter().performWait(new Runnable() {
-      @Override
-      public void run() {
-        remote.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) {
-          @Override
-          public void onSessionCreated(RepositorySession session) {
-            source = session;
-            local.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) {
-              @Override
-              public void onSessionCreated(RepositorySession session) {
-                sink = session;
-                WaitHelper.getTestWaiter().performNotify();
-              }
-            }, null);
-          }
-        }, null);
-      }
-    });
+  private AtomicInteger numFlowFetchFailed;
+  private AtomicInteger numFlowStoreFailed;
+  private AtomicInteger numFlowCompleted;
+  private AtomicBoolean flowBeginFailed;
+  private AtomicBoolean flowFinishFailed;
 
-    assertNotNull(source);
-    assertNotNull(sink);
+  private volatile RecordsChannel recordsChannel;
+  private volatile Exception fetchException;
+  private volatile Exception storeException;
 
+  @Before
+  public void setUp() throws Exception {
     numFlowFetchFailed = new AtomicInteger(0);
     numFlowStoreFailed = new AtomicInteger(0);
     numFlowCompleted = new AtomicInteger(0);
     flowBeginFailed = new AtomicBoolean(false);
     flowFinishFailed = new AtomicBoolean(false);
 
+    // Repositories and sessions will be set/created by tests.
+    sourceRepository = null;
+    sourceSession = null;
+    sinkRepository = null;
+    sinkSession = null;
+
     rcDelegate = new RecordsChannelDelegate() {
       @Override
       public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) {
         numFlowFetchFailed.incrementAndGet();
+        fetchException = ex;
       }
 
       @Override
       public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
         numFlowStoreFailed.incrementAndGet();
+        storeException = ex;
       }
 
       @Override
       public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) {
         flowFinishFailed.set(true);
         WaitHelper.getTestWaiter().performNotify();
       }
 
       @Override
       public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
         numFlowCompleted.incrementAndGet();
         try {
-          sink.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) {
+          sinkSession.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) {
             @Override
             public void onFinishSucceeded(RepositorySession session, RepositorySessionBundle bundle) {
               try {
-                source.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) {
+                sourceSession.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) {
                   @Override
                   public void onFinishSucceeded(RepositorySession session, RepositorySessionBundle bundle) {
                     performNotify();
                   }
                 });
               } catch (InactiveSessionException e) {
                 WaitHelper.getTestWaiter().performNotify(e);
               }
@@ -114,30 +107,57 @@ public class TestRecordsChannel {
       }
 
       @Override
       public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
         flowBeginFailed.set(true);
         WaitHelper.getTestWaiter().performNotify();
       }
     };
+  }
 
-    final RecordsChannel rc = new RecordsChannel(source,  sink, rcDelegate);
+  private void createSessions() {
+    WaitHelper.getTestWaiter().performWait(new Runnable() {
+      @Override
+      public void run() {
+        sourceRepository.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) {
+          @Override
+          public void onSessionCreated(RepositorySession session) {
+            sourceSession = session;
+            sinkRepository.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) {
+              @Override
+              public void onSessionCreated(RepositorySession session) {
+                sinkSession = session;
+                WaitHelper.getTestWaiter().performNotify();
+              }
+            }, null);
+          }
+        }, null);
+      }
+    });
+  }
+
+  public void doFlow() throws Exception {
+    createSessions();
+    assertNotNull(sourceSession);
+    assertNotNull(sinkSession);
+    recordsChannel = new RecordsChannel(sourceSession,  sinkSession, rcDelegate);
     WaitHelper.getTestWaiter().performWait(new Runnable() {
       @Override
       public void run() {
         try {
-          rc.beginAndFlow();
+          recordsChannel.beginAndFlow();
         } catch (InvalidSessionTransitionException e) {
           WaitHelper.getTestWaiter().performNotify(e);
         }
       }
     });
   }
 
+  // NB: records in WBORepository are stored in a HashMap, so don't assume an order.
   public static final BookmarkRecord[] inbounds = new BookmarkRecord[] {
     new BookmarkRecord("inboundSucc1", "bookmarks", 1, false),
     new BookmarkRecord("inboundSucc2", "bookmarks", 1, false),
     new BookmarkRecord("inboundFail1", "bookmarks", 1, false),
     new BookmarkRecord("inboundSucc3", "bookmarks", 1, false),
     new BookmarkRecord("inboundSucc4", "bookmarks", 1, false),
     new BookmarkRecord("inboundFail2", "bookmarks", 1, false),
   };
@@ -158,72 +178,161 @@ public class TestRecordsChannel {
   protected WBORepository full() {
     WBORepository repo = new SynchronizerHelpers.TrackingWBORepository();
     for (BookmarkRecord outbound : outbounds) {
       repo.wbos.put(outbound.guid, outbound);
     }
     return repo;
   }
 
-  protected WBORepository failingFetch() {
-    WBORepository repo = new FailFetchWBORepository();
+  protected WBORepository failingFetch(SynchronizerHelpers.FailMode failMode) {
+    WBORepository repo = new FailFetchWBORepository(failMode);
+
     for (BookmarkRecord outbound : outbounds) {
       repo.wbos.put(outbound.guid, outbound);
     }
     return repo;
   }
 
   @Test
   public void testSuccess() throws Exception {
-    WBORepository source = full();
-    WBORepository sink = empty();
-    doFlow(source, sink);
+    sourceRepository = full();
+    sinkRepository = empty();
+    doFlow();
     assertEquals(1, numFlowCompleted.get());
     assertEquals(0, numFlowFetchFailed.get());
     assertEquals(0, numFlowStoreFailed.get());
-    assertEquals(source.wbos, sink.wbos);
+    assertEquals(sourceRepository.wbos, sinkRepository.wbos);
+    assertEquals(0, recordsChannel.getFetchFailureCount());
+    assertEquals(0, recordsChannel.getStoreFailureCount());
+    assertEquals(6, recordsChannel.getStoreCount());
   }
 
   @Test
   public void testFetchFail() throws Exception {
-    WBORepository source = failingFetch();
-    WBORepository sink = empty();
-    doFlow(source, sink);
+    sourceRepository = failingFetch(SynchronizerHelpers.FailMode.FETCH);
+    sinkRepository = empty();
+    doFlow();
+    assertEquals(1, numFlowCompleted.get());
+    assertTrue(numFlowFetchFailed.get() > 0);
+    assertEquals(0, numFlowStoreFailed.get());
+    assertTrue(sinkRepository.wbos.size() < 6);
+    assertTrue(recordsChannel.getFetchFailureCount() > 0);
+    assertEquals(0, recordsChannel.getStoreFailureCount());
+    assertTrue(recordsChannel.getStoreCount() < 6);
+  }
+
+  @Test
+  public void testStoreFetchFailedCollectionModified() throws Exception {
+    sourceRepository = failingFetch(SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
+    sinkRepository = empty();
+    doFlow();
     assertEquals(1, numFlowCompleted.get());
     assertTrue(numFlowFetchFailed.get() > 0);
     assertEquals(0, numFlowStoreFailed.get());
-    assertTrue(sink.wbos.size() < 6);
+    assertTrue(sinkRepository.wbos.size() < 6);
+
+    assertTrue(recordsChannel.getFetchFailureCount() > 0);
+    assertEquals(0, recordsChannel.getStoreFailureCount());
+    assertTrue(recordsChannel.getStoreCount() < sourceRepository.wbos.size());
+
+    assertEquals(CollectionConcurrentModificationException.class, fetchException.getClass());
+    final Exception ex = recordsChannel.getReflowException();
+    assertNotNull(ex);
+    assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
+  }
+
+  @Test
+  public void testStoreFetchFailedDeadline() throws Exception {
+    sourceRepository = failingFetch(SynchronizerHelpers.FailMode.DEADLINE_REACHED);
+    sinkRepository = empty();
+    doFlow();
+    assertEquals(1, numFlowCompleted.get());
+    assertTrue(numFlowFetchFailed.get() > 0);
+    assertEquals(0, numFlowStoreFailed.get());
+    assertTrue(sinkRepository.wbos.size() < 6);
+
+    assertTrue(recordsChannel.getFetchFailureCount() > 0);
+    assertEquals(0, recordsChannel.getStoreFailureCount());
+    assertTrue(recordsChannel.getStoreCount() < sourceRepository.wbos.size());
+
+    assertEquals(SyncDeadlineReachedException.class, fetchException.getClass());
+    final Exception ex = recordsChannel.getReflowException();
+    assertNotNull(ex);
+    assertEquals(SyncDeadlineReachedException.class, ex.getClass());
   }
 
   @Test
   public void testStoreSerialFail() throws Exception {
-    WBORepository source = full();
-    WBORepository sink = new SynchronizerHelpers.SerialFailStoreWBORepository();
-    doFlow(source, sink);
+    sourceRepository = full();
+    sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
+            SynchronizerHelpers.FailMode.STORE);
+    doFlow();
     assertEquals(1, numFlowCompleted.get());
     assertEquals(0, numFlowFetchFailed.get());
     assertEquals(1, numFlowStoreFailed.get());
-    assertEquals(5, sink.wbos.size());
+    // We will fail to store one of the records but expect flow to continue.
+    assertEquals(5, sinkRepository.wbos.size());
+
+    assertEquals(0, recordsChannel.getFetchFailureCount());
+    assertEquals(1, recordsChannel.getStoreFailureCount());
+    // Number of store attempts.
+    assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount());
+  }
+
+  @Test
+  public void testStoreSerialFailCollectionModified() throws Exception {
+    sourceRepository = full();
+    sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
+            SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
+    doFlow();
+    assertEquals(1, numFlowCompleted.get());
+    assertEquals(0, numFlowFetchFailed.get());
+    assertEquals(1, numFlowStoreFailed.get());
+    // One of the records will fail, at which point we'll stop flowing them.
+    final int sunkenRecords = sinkRepository.wbos.size();
+    assertTrue(sunkenRecords > 0 && sunkenRecords < 6);
+
+    assertEquals(0, recordsChannel.getFetchFailureCount());
+    // RecordChannel's storeFail count is only incremented for failures of individual records.
+    assertEquals(0, recordsChannel.getStoreFailureCount());
+    final int channelStoreAttemptCount = recordsChannel.getStoreCount();
+    assertTrue(channelStoreAttemptCount > 0 && channelStoreAttemptCount < sourceRepository.wbos.size());
+
+    assertEquals(CollectionConcurrentModificationException.class, storeException.getClass());
+    final Exception ex = recordsChannel.getReflowException();
+    assertNotNull(ex);
+    assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
   }
 
   @Test
   public void testStoreBatchesFail() throws Exception {
-    WBORepository source = full();
-    WBORepository sink = new SynchronizerHelpers.BatchFailStoreWBORepository(3);
-    doFlow(source, sink);
+    sourceRepository = full();
+    sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(3);
+    doFlow();
     assertEquals(1, numFlowCompleted.get());
     assertEquals(0, numFlowFetchFailed.get());
     assertEquals(3, numFlowStoreFailed.get()); // One batch fails.
-    assertEquals(3, sink.wbos.size()); // One batch succeeds.
+    assertEquals(3, sinkRepository.wbos.size()); // One batch succeeds.
+
+    assertEquals(0, recordsChannel.getFetchFailureCount());
+    assertEquals(3, recordsChannel.getStoreFailureCount());
+    // Number of store attempts.
+    assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount());
   }
 
 
   @Test
   public void testStoreOneBigBatchFail() throws Exception {
-    WBORepository source = full();
-    WBORepository sink = new SynchronizerHelpers.BatchFailStoreWBORepository(50);
-    doFlow(source, sink);
+    sourceRepository = full();
+    sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(50);
+    doFlow();
     assertEquals(1, numFlowCompleted.get());
     assertEquals(0, numFlowFetchFailed.get());
     assertEquals(6, numFlowStoreFailed.get()); // One (big) batch fails.
-    assertEquals(0, sink.wbos.size()); // No batches succeed.
+    assertEquals(0, sinkRepository.wbos.size()); // No batches succeed.
+
+    assertEquals(0, recordsChannel.getFetchFailureCount());
+    assertEquals(6, recordsChannel.getStoreFailureCount());
+    // Number of store attempts.
+    assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount());
   }
 }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer15RepositorySession.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer15RepositorySession.java
@@ -7,26 +7,28 @@ import android.os.SystemClock;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.android.sync.test.SynchronizerHelpers.TrackingWBORepository;
 import org.mozilla.android.sync.test.helpers.BaseTestStorageRequestDelegate;
 import org.mozilla.android.sync.test.helpers.HTTPServerTestHelper;
 import org.mozilla.android.sync.test.helpers.MockServer;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.crypto.KeyBundle;
 import org.mozilla.gecko.sync.middleware.Crypto5MiddlewareRepository;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.BaseResource;
 import org.mozilla.gecko.sync.net.BasicAuthHeaderProvider;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.repositories.FetchFailedException;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.StoreFailedException;
 import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
 import org.mozilla.gecko.sync.repositories.domain.BookmarkRecordFactory;
 import org.mozilla.gecko.sync.synchronizer.ServerLocalSynchronizer;
 import org.mozilla.gecko.sync.synchronizer.Synchronizer;
 import org.simpleframework.http.ContentType;
 import org.simpleframework.http.Request;
@@ -102,17 +104,18 @@ public class TestServer15RepositorySessi
   }
 
   protected Exception doSynchronize(MockServer server) throws Exception {
     final String COLLECTION = "test";
 
     final TrackingWBORepository local = getLocal(100);
     final Server15Repository remote = new Server15Repository(
             COLLECTION, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
-            getCollectionURL(COLLECTION), authHeaderProvider, infoCollections, infoConfiguration);
+            getCollectionURL(COLLECTION), authHeaderProvider, infoCollections, infoConfiguration,
+            new NonPersistentRepositoryStateProvider());
     KeyBundle collectionKey = new KeyBundle(TEST_USERNAME, SYNC_KEY);
     Crypto5MiddlewareRepository cryptoRepo = new Crypto5MiddlewareRepository(remote, collectionKey);
     cryptoRepo.recordFactory = new BookmarkRecordFactory();
 
     final Synchronizer synchronizer = new ServerLocalSynchronizer();
     synchronizer.repositoryA = cryptoRepo;
     synchronizer.repositoryB = local;
 
@@ -133,16 +136,24 @@ public class TestServer15RepositorySessi
   public void testFetchFailure() throws Exception {
     MockServer server = new MockServer(404, "error");
     Exception e = doSynchronize(server);
     assertNotNull(e);
     assertEquals(FetchFailedException.class, e.getClass());
   }
 
   @Test
+  public void testFetch412Failure() throws Exception {
+    MockServer server = new MockServer(412, "error");
+    Exception e = doSynchronize(server);
+    assertNotNull(e);
+    assertEquals(CollectionConcurrentModificationException.class, e.getClass());
+  }
+
+  @Test
   public void testStorePostSuccessWithFailingRecords() throws Exception {
     MockServer server = new MockServer(200, "{ modified: \" + " + Utils.millisecondsToDecimalSeconds(System.currentTimeMillis()) + ", " +
         "success: []," +
         "failed: { outboundFail2: [] } }");
     Exception e = doSynchronize(server);
     assertNotNull(e);
     assertEquals(StoreFailedException.class, e.getClass());
   }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServerLocalSynchronizer.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServerLocalSynchronizer.java
@@ -102,47 +102,47 @@ public class TestServerLocalSynchronizer
 
     assertEquals(12, local.wbos.size());
     assertEquals(12, remote.wbos.size());
   }
 
   @Test
   public void testLocalFetchErrors() {
     WBORepository remote = new TrackingWBORepository();
-    WBORepository local  = new FailFetchWBORepository();
+    WBORepository local  = new FailFetchWBORepository(SynchronizerHelpers.FailMode.FETCH);
 
     Synchronizer synchronizer = getSynchronizer(remote, local);
     Exception e = doSynchronize(synchronizer);
     assertNotNull(e);
     assertEquals(FetchFailedException.class, e.getClass());
 
     // Neither session gets finished successfully, so all records are dropped.
     assertEquals(6, local.wbos.size());
     assertEquals(6, remote.wbos.size());
   }
 
   @Test
   public void testRemoteFetchErrors() {
-    WBORepository remote = new FailFetchWBORepository();
+    WBORepository remote = new FailFetchWBORepository(SynchronizerHelpers.FailMode.FETCH);
     WBORepository local  = new TrackingWBORepository();
 
     Synchronizer synchronizer = getSynchronizer(remote, local);
     Exception e = doSynchronize(synchronizer);
     assertNotNull(e);
     assertEquals(FetchFailedException.class, e.getClass());
 
     // Neither session gets finished successfully, so all records are dropped.
     assertEquals(6, local.wbos.size());
     assertEquals(6, remote.wbos.size());
   }
 
   @Test
   public void testLocalSerialStoreErrorsAreIgnored() {
     WBORepository remote = new TrackingWBORepository();
-    WBORepository local  = new SerialFailStoreWBORepository();
+    WBORepository local  = new SerialFailStoreWBORepository(SynchronizerHelpers.FailMode.FETCH);
 
     Synchronizer synchronizer = getSynchronizer(remote, local);
     assertNull(doSynchronize(synchronizer));
 
     assertEquals(9,  local.wbos.size());
     assertEquals(12, remote.wbos.size());
   }
 
@@ -153,17 +153,17 @@ public class TestServerLocalSynchronizer
     Synchronizer synchronizer = getSynchronizer(new TrackingWBORepository(), new BatchFailStoreWBORepository(BATCH_SIZE));
 
     Exception e = doSynchronize(synchronizer);
     assertNull(e);
   }
 
   @Test
   public void testRemoteSerialStoreErrorsAreNotIgnored() throws Exception {
-    Synchronizer synchronizer = getSynchronizer(new SerialFailStoreWBORepository(), new TrackingWBORepository()); // Tracking so we don't send incoming records back.
+    Synchronizer synchronizer = getSynchronizer(new SerialFailStoreWBORepository(SynchronizerHelpers.FailMode.STORE), new TrackingWBORepository()); // Tracking so we don't send incoming records back.
 
     Exception e = doSynchronize(synchronizer);
     assertNotNull(e);
     assertEquals(StoreFailedException.class, e.getClass());
   }
 
   @Test
   public void testRemoteBatchStoreErrorsAreNotIgnoredManyBatches() throws Exception {
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java
@@ -28,12 +28,18 @@ public class ExpectSuccessRepositorySess
   }
 
   @Override
   public void onStoreCompleted(long storeEnd) {
     log("Record store completed at " + storeEnd);
   }
 
   @Override
+  public void onStoreFailed(Exception e) {
+    log("Store failed.", e);
+    performNotify(new AssertionFailedError("onStoreFailed: store should not have failed."));
+  }
+
+  @Override
   public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
     return this;
   }
 }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/MockGlobalSessionCallback.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/MockGlobalSessionCallback.java
@@ -52,16 +52,22 @@ public class MockGlobalSessionCallback i
   @Override
   public void handleError(GlobalSession globalSession, Exception ex) {
     this.calledError = true;
     this.calledErrorException = ex;
     this.testWaiter().performNotify();
   }
 
   @Override
+  public void handleIncompleteStage(Stage currentState,
+                                    GlobalSession globalSession) {
+
+  }
+
+  @Override
   public void handleStageCompleted(Stage currentState,
            GlobalSession globalSession) {
     stageCounter--;
   }
 
   @Override
   public void requestBackoff(long backoff) {
     this.calledRequestBackoff = true;
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java
@@ -40,12 +40,18 @@ public class DefaultGlobalSessionCallbac
   }
 
   @Override
   public void handleStageCompleted(Stage currentState,
                                    GlobalSession globalSession) {
   }
 
   @Override
+  public void handleIncompleteStage(Stage currentState,
+                                    GlobalSession globalSession) {
+
+  }
+
+  @Override
   public boolean shouldBackOffStorage() {
     return false;
   }
 }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
@@ -76,23 +76,18 @@ public class BufferingMiddlewareReposito
         // Ensure inner session doesn't see incoming records.
         verify(innerRepositorySession, never()).store(record);
         verify(innerRepositorySession, never()).store(record1);
         verify(innerRepositorySession, never()).store(record2);
     }
 
     @Test
     public void storeDone() throws Exception {
-        // Verify that storage's flush is called.
-        verify(bufferStorageMocked, times(0)).flush();
-        bufferingSessionMocked.doStoreDonePrepare();
-        verify(bufferStorageMocked, times(1)).flush();
-
         // Trivial case, no records to merge.
-        bufferingSession.doStoreDone(123L);
+        bufferingSession.doMergeBuffer(123L);
         verify(innerRepositorySession, times(1)).storeDone(123L);
         verify(innerRepositorySession, never()).store(any(Record.class));
 
         // Reset call counters.
         reset(innerRepositorySession);
 
         // Now store some records.
         MockRecord record = new MockRecord("guid1", null, 1, false);
@@ -104,17 +99,17 @@ public class BufferingMiddlewareReposito
         MockRecord record3 = new MockRecord("guid3", null, 5, false);
         bufferingSession.store(record3);
 
         // NB: same guid as above.
         MockRecord record4 = new MockRecord("guid3", null, -1, false);
         bufferingSession.store(record4);
 
         // Done storing.
-        bufferingSession.doStoreDone(123L);
+        bufferingSession.doMergeBuffer(123L);
 
         // Ensure all records where stored in the wrapped session.
         verify(innerRepositorySession, times(1)).store(record);
         verify(innerRepositorySession, times(1)).store(record2);
         verify(innerRepositorySession, times(1)).store(record4);
 
         // Ensure storeDone was called on the wrapped session.
         verify(innerRepositorySession, times(1)).storeDone(123L);
@@ -150,42 +145,16 @@ public class BufferingMiddlewareReposito
         assertEquals(2, bufferStorage.all().size());
 
         // Test that buffer storage is cleaned up.
         bufferingSession.performCleanup();
         assertEquals(0, bufferStorage.all().size());
     }
 
     @Test
-    public void sourceFailed() throws Exception {
-        // Source failes before any records have been stored.
-        bufferingSession.sourceFailed(new Exception());
-        assertEquals(0, bufferStorage.all().size());
-
-        // Store some records now.
-        MockRecord record = new MockRecord("guid1", null, 1, false);
-        bufferingSession.store(record);
-
-        MockRecord record2 = new MockRecord("guid2", null, 13, false);
-        bufferingSession.store(record2);
-
-        MockRecord record3 = new MockRecord("guid3", null, 5, false);
-        bufferingSession.store(record3);
-
-        // Verify that buffer is intact after source fails.
-        bufferingSession.sourceFailed(new Exception());
-        assertEquals(3, bufferStorage.all().size());
-
-        // Verify that buffer is flushed after source fails.
-        verify(bufferStorageMocked, times(0)).flush();
-        bufferingSessionMocked.sourceFailed(new Exception());
-        verify(bufferStorageMocked, times(1)).flush();
-    }
-
-    @Test
     public void abort() throws Exception {
         MockRecord record = new MockRecord("guid1", null, 1, false);
         bufferingSession.store(record);
 
         MockRecord record2 = new MockRecord("guid2", null, 13, false);
         bufferingSession.store(record2);
 
         MockRecord record3 = new MockRecord("guid3", null, 5, false);
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
@@ -6,16 +6,17 @@ package org.mozilla.gecko.sync.repositor
 
 import android.net.Uri;
 import android.os.SystemClock;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
 import org.mozilla.gecko.sync.CryptoRecord;
 import org.mozilla.gecko.sync.HTTPFailureException;
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
@@ -61,18 +62,18 @@ public class BatchingDownloaderDelegateT
         public void onFetchCompleted(SyncStorageResponse response,
                                      final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
                                      final SyncStorageCollectionRequest request,
                                      long l, long newerTimestamp, boolean full, String sort, String ids) {
             this.isSuccess = true;
         }
 
         @Override
-        public void onFetchFailed(final Exception ex,
-                                  final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+        public void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+                                  final Exception ex,
                                   final SyncStorageCollectionRequest request) {
             this.isFailure = true;
             this.ex = ex;
         }
 
         @Override
         public void onFetchedRecord(CryptoRecord record,
                                     RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
@@ -169,16 +170,28 @@ public class BatchingDownloaderDelegateT
         downloaderDelegate.handleRequestFailure(response);
         assertTrue(mockDownloader.isFailure);
         assertEquals(HTTPFailureException.class, mockDownloader.ex.getClass());
         assertFalse(mockDownloader.isSuccess);
         assertFalse(mockDownloader.isFetched);
     }
 
     @Test
+    public void testFailure412() throws Exception {
+        BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null,
+                new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
+        SyncStorageResponse response = makeSyncStorageResponse(412, null);
+        downloaderDelegate.handleRequestFailure(response);
+        assertTrue(mockDownloader.isFailure);
+        assertEquals(CollectionConcurrentModificationException.class, mockDownloader.ex.getClass());
+        assertFalse(mockDownloader.isSuccess);
+        assertFalse(mockDownloader.isFetched);
+    }
+
+    @Test
     public void testFailureRequestError() throws Exception {
         BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null,
                 new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
         downloaderDelegate.handleRequestError(new ClientProtocolException());
         assertTrue(mockDownloader.isFailure);
         assertEquals(ClientProtocolException.class, mockDownloader.ex.getClass());
         assertFalse(mockDownloader.isSuccess);
         assertFalse(mockDownloader.isFetched);
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
@@ -348,17 +348,17 @@ public class BatchingDownloaderTest {
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertTrue(sessionFetchRecordsDelegate.isFailure);
     }
 
     @Test
     public void testFailureException() throws Exception {
         Exception ex = new IllegalStateException();
         SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI("http://dummy.url"));
-        mockDownloader.onFetchFailed(ex, sessionFetchRecordsDelegate, request);
+        mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
 
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertTrue(sessionFetchRecordsDelegate.isFailure);
         assertEquals(ex.getClass(), sessionFetchRecordsDelegate.ex.getClass());
         assertNull(sessionFetchRecordsDelegate.record);
     }
 
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -18,34 +18,61 @@ import org.mozilla.gecko.sync.ExtendedJS
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
 
 import java.net.URISyntaxException;
+import java.util.List;
 import java.util.Random;
-import java.util.concurrent.Executor;
+import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 @RunWith(TestRunner.class)
 public class BatchingUploaderTest {
-    class MockExecutorService implements Executor {
+    class MockExecutorService extends AbstractExecutorService {
         public int totalPayloads = 0;
         public int commitPayloads = 0;
 
         @Override
         public void execute(@NonNull Runnable command) {
             ++totalPayloads;
             if (((RecordUploadRunnable) command).isCommit) {
                 ++commitPayloads;
             }
         }
+
+        @Override
+        public void shutdown() {
+
+        }
+
+        @NonNull
+        @Override
+        public List<Runnable> shutdownNow() {
+            return null;
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return false;
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return false;
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            return false;
+        }
     }
 
     class MockStoreDelegate implements RepositorySessionStoreDelegate {
         public int storeFailed = 0;
         public int storeSucceeded = 0;
         public int storeCompleted = 0;
 
         @Override
@@ -59,22 +86,27 @@ public class BatchingUploaderTest {
         }
 
         @Override
         public void onStoreCompleted(long storeEnd) {
             ++storeCompleted;
         }
 
         @Override
+        public void onStoreFailed(Exception e) {
+
+        }
+
+        @Override
         public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
             return null;
         }
     }
 
-    private Executor workQueue;
+    private ExecutorService workQueue;
     private RepositorySessionStoreDelegate storeDelegate;
 
     @Before
     public void setUp() throws Exception {
         workQueue = new MockExecutorService();
         storeDelegate = new MockStoreDelegate();
     }
 
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
@@ -19,17 +19,17 @@ import org.mozilla.gecko.sync.net.SyncSt
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
 
 import java.io.ByteArrayInputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import ch.boye.httpclientandroidlib.HttpResponse;
 import ch.boye.httpclientandroidlib.ProtocolVersion;
 import ch.boye.httpclientandroidlib.entity.BasicHttpEntity;
 import ch.boye.httpclientandroidlib.message.BasicHttpResponse;
 import ch.boye.httpclientandroidlib.message.BasicStatusLine;
 
@@ -43,17 +43,17 @@ public class PayloadUploadDelegateTest {
         public final ArrayList<String> successRecords = new ArrayList<>();
         public final HashMap<String, Exception> failedRecords = new HashMap<>();
         public boolean didLastPayloadFail = false;
 
         public ArrayList<SyncStorageResponse> successResponses = new ArrayList<>();
         public int commitPayloadsSucceeded = 0;
         public int lastPayloadsSucceeded = 0;
 
-        public MockUploader(final RepositorySession repositorySession, final Executor workQueue,
+        public MockUploader(final RepositorySession repositorySession, final ExecutorService workQueue,
                             final RepositorySessionStoreDelegate sessionStoreDelegate) {
             super(repositorySession, workQueue, sessionStoreDelegate, Uri.EMPTY, null, new InfoConfiguration(), null);
         }
 
         @Override
         public void payloadSucceeded(final SyncStorageResponse response, final boolean isCommit, final boolean isLastPayload) {
             successResponses.add(response);
             if (isCommit) {