Bug 1336001 - Refactor BatchingUploader's state-holder objects to fix threading problems r=rnewman
authorGrigory Kruglov <gkruglov@mozilla.com>
Wed, 22 Feb 2017 11:26:35 -0800
changeset 373350 85f2279e848f2d9145228614fb7e1dbb99cf71ac
parent 373349 60012e404044558218bd9aec667bbfac313a9c34
child 373351 f50c99a7e3818f79c7976c183c0d810e53acdd7f
push id10863
push userjlorenzo@mozilla.com
push dateMon, 06 Mar 2017 23:02:23 +0000
treeherdermozilla-aurora@0931190cd725 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersrnewman
bugs1336001
milestone54.0a1
Bug 1336001 - Refactor BatchingUploader's state-holder objects to fix threading problems r=rnewman Previous state: - Two threads were racing to get to batchMeta - one to reset its state, and the other to read its internal state to construct a network request, and then to update its internal state. - This resulted in data corruption when payloads had to be split into multiple batches. A core problem was that there is a lot of state shared across thread boundaries. Specifically, BatchMeta is being written and read both by record consumer threads running off of a thread pool, and by the network worker thread(s). This patch refactors BatchMeta and scheduling of network runnables to ensure that cross-thread access is minimized, and "who owns/accesses what" is explicit. - PayloadDispatcher owns scheduling payload runnables and any data they need to share between each other. - UploaderMeta owns information that's necessary to process incoming records. MozReview-Commit-ID: 9hFs3fXGaGM
mobile/android/base/android-services.mozbuild
mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncStorageRequest.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchMeta.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BufferSizeTracker.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/Payload.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/RecordUploadRunnable.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/UploaderMeta.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer11RepositorySession.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchMetaTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/RecordUploadRunnableTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/UploaderMetaTest.java
--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -1017,18 +1017,20 @@ sync_java_files = [TOPSRCDIR + '/mobile/
     'sync/repositories/StoreFailedException.java',
     'sync/repositories/StoreTracker.java',
     'sync/repositories/StoreTrackingRepositorySession.java',
     'sync/repositories/uploaders/BatchingUploader.java',
     'sync/repositories/uploaders/BatchMeta.java',
     'sync/repositories/uploaders/BufferSizeTracker.java',
     'sync/repositories/uploaders/MayUploadProvider.java',
     'sync/repositories/uploaders/Payload.java',
+    'sync/repositories/uploaders/PayloadDispatcher.java',
     'sync/repositories/uploaders/PayloadUploadDelegate.java',
     'sync/repositories/uploaders/RecordUploadRunnable.java',
+    'sync/repositories/uploaders/UploaderMeta.java',
     'sync/Server11PreviousPostFailedException.java',
     'sync/Server11RecordPostFailedException.java',
     'sync/setup/activities/ActivityUtils.java',
     'sync/setup/activities/WebURLFinder.java',
     'sync/setup/Constants.java',
     'sync/setup/InvalidSyncKeyException.java',
     'sync/SharedPreferencesClientsDataDelegate.java',
     'sync/stage/AbstractNonRepositorySyncStage.java',
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncStorageRequest.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncStorageRequest.java
@@ -163,20 +163,16 @@ public class SyncStorageRequest implemen
       }
     }
   }
 
   protected BaseResourceDelegate resourceDelegate;
   public SyncStorageRequestDelegate delegate;
   protected BaseResource resource;
 
-  public SyncStorageRequest() {
-    super();
-  }
-
   // Default implementation. Override this.
   protected BaseResourceDelegate makeResourceDelegate(SyncStorageRequest request) {
     return new SyncStorageResourceDelegate(request);
   }
 
   @Override
   public void get() {
     this.resource.get();
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchMeta.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchMeta.java
@@ -1,165 +1,122 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.repositories.uploaders;
 
-import android.support.annotation.CheckResult;
-import android.support.annotation.NonNull;
 import android.support.annotation.Nullable;
 
 import org.mozilla.gecko.background.common.log.Logger;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
-import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader.TokenModifiedException;
-import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader.LastModifiedChangedUnexpectedly;
-import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader.LastModifiedDidNotChange;
 
 /**
- * Keeps track of token, Last-Modified value and GUIDs of succeeded records.
+ * Keeps track of various meta information about a batch series.
+ *
+ * NB regarding concurrent access:
+ * - this class expects access by possibly different, sequentially running threads.
+ * - concurrent access is not supported.
  */
-/* @ThreadSafe */
-public class BatchMeta extends BufferSizeTracker {
+public class BatchMeta {
     private static final String LOG_TAG = "BatchMeta";
 
-    // Will be set once first payload upload succeeds. We don't expect this to change until we
-    // commit the batch, and which point it must change.
-    /* @GuardedBy("this") */ private Long lastModified;
+    private volatile Boolean inBatchingMode;
+    @Nullable private volatile Long lastModified;
+    private volatile String token;
 
-    // Will be set once first payload upload succeeds. We don't expect this to ever change until
-    // a commit succeeds, at which point this gets set to null.
-    /* @GuardedBy("this") */ private String token;
+    // NB: many of the operations on ConcurrentLinkedQueue are not atomic (toArray, for example),
+    // and so use of this queue type is only possible because this class does not support concurrent
+    // access.
+    private final ConcurrentLinkedQueue<String> successRecordGuids = new ConcurrentLinkedQueue<>();
 
-    /* @GuardedBy("accessLock") */ private boolean isUnlimited = false;
+    BatchMeta(@Nullable Long initialLastModified, Boolean initialInBatchingMode) {
+        lastModified = initialLastModified;
+        inBatchingMode = initialInBatchingMode;
+    }
 
-    // Accessed by synchronously running threads.
-    /* @GuardedBy("accessLock") */ private final List<String> successRecordGuids = new ArrayList<>();
-
-    /* @GuardedBy("accessLock") */ private boolean needsCommit = false;
+    String[] getSuccessRecordGuids() {
+        // NB: This really doesn't play well with concurrent access.
+        final String[] guids = new String[this.successRecordGuids.size()];
+        this.successRecordGuids.toArray(guids);
+        return guids;
+    }
 
-    protected final Long collectionLastModified;
+    void recordSucceeded(final String recordGuid) {
+        // Sanity check.
+        if (recordGuid == null) {
+            throw new IllegalStateException("Record guid is unexpectedly null");
+        }
 
-    public BatchMeta(@NonNull Object payloadLock, long maxBytes, long maxRecords, @Nullable Long collectionLastModified) {
-        super(payloadLock, maxBytes, maxRecords);
-        this.collectionLastModified = collectionLastModified;
+        successRecordGuids.add(recordGuid);
     }
 
-    protected void setIsUnlimited(boolean isUnlimited) {
-        synchronized (accessLock) {
-            this.isUnlimited = isUnlimited;
+    /* package-local */ void setInBatchingMode(boolean inBatchingMode) {
+        this.inBatchingMode = inBatchingMode;
+    }
+
+    /* package-local */ Boolean getInBatchingMode() {
+        return inBatchingMode;
+    }
+
+    @Nullable
+    protected Long getLastModified() {
+        return lastModified;
+    }
+
+    void setLastModified(final Long newLastModified, final boolean expectedToChange) throws BatchingUploader.LastModifiedChangedUnexpectedly, BatchingUploader.LastModifiedDidNotChange {
+        if (lastModified == null) {
+            lastModified = newLastModified;
+            return;
+        }
+
+        if (!expectedToChange && !lastModified.equals(newLastModified)) {
+            Logger.debug(LOG_TAG, "Last-Modified timestamp changed when we didn't expect it");
+            throw new BatchingUploader.LastModifiedChangedUnexpectedly();
+
+        } else if (expectedToChange && lastModified.equals(newLastModified)) {
+            Logger.debug(LOG_TAG, "Last-Modified timestamp did not change when we expected it to");
+            throw new BatchingUploader.LastModifiedDidNotChange();
+
+        } else {
+            lastModified = newLastModified;
         }
     }
 
-    @Override
-    protected boolean canFit(long recordDeltaByteCount) {
-        synchronized (accessLock) {
-            return isUnlimited || super.canFit(recordDeltaByteCount);
-        }
-    }
-
-    @Override
-    @CheckResult
-    protected boolean addAndEstimateIfFull(long recordDeltaByteCount) {
-        synchronized (accessLock) {
-            needsCommit = true;
-            boolean isFull = super.addAndEstimateIfFull(recordDeltaByteCount);
-            return !isUnlimited && isFull;
-        }
-    }
-
-    protected boolean needToCommit() {
-        synchronized (accessLock) {
-            return needsCommit;
-        }
-    }
-
-    protected synchronized String getToken() {
+    @Nullable
+    protected String getToken() {
         return token;
     }
 
-    protected synchronized void setToken(final String newToken, boolean isCommit) throws TokenModifiedException {
+    void setToken(final String newToken, boolean isCommit) throws BatchingUploader.TokenModifiedException {
         // Set token once in a batching mode.
         // In a non-batching mode, this.token and newToken will be null, and this is a no-op.
         if (token == null) {
             token = newToken;
             return;
         }
 
         // Sanity checks.
         if (isCommit) {
             // We expect token to be null when commit payload succeeds.
             if (newToken != null) {
-                throw new TokenModifiedException();
+                throw new BatchingUploader.TokenModifiedException();
             } else {
                 token = null;
             }
             return;
         }
 
         // We expect new token to always equal current token for non-commit payloads.
         if (!token.equals(newToken)) {
-            throw new TokenModifiedException();
-        }
-    }
-
-    protected synchronized Long getLastModified() {
-        if (lastModified == null) {
-            return collectionLastModified;
-        }
-        return lastModified;
-    }
-
-    protected synchronized void setLastModified(final Long newLastModified, final boolean expectedToChange) throws LastModifiedChangedUnexpectedly, LastModifiedDidNotChange {
-        if (lastModified == null) {
-            lastModified = newLastModified;
-            return;
-        }
-
-        if (!expectedToChange && !lastModified.equals(newLastModified)) {
-            Logger.debug(LOG_TAG, "Last-Modified timestamp changed when we didn't expect it");
-            throw new LastModifiedChangedUnexpectedly();
-
-        } else if (expectedToChange && lastModified.equals(newLastModified)) {
-            Logger.debug(LOG_TAG, "Last-Modified timestamp did not change when we expected it to");
-            throw new LastModifiedDidNotChange();
-
-        } else {
-            lastModified = newLastModified;
+            throw new BatchingUploader.TokenModifiedException();
         }
     }
 
-    protected ArrayList<String> getSuccessRecordGuids() {
-        synchronized (accessLock) {
-            return new ArrayList<>(this.successRecordGuids);
-        }
-    }
-
-    protected void recordSucceeded(final String recordGuid) {
-        // Sanity check.
-        if (recordGuid == null) {
-            throw new IllegalStateException();
-        }
-
-        synchronized (accessLock) {
-            successRecordGuids.add(recordGuid);
-        }
+    BatchMeta nextBatchMeta() {
+        return new BatchMeta(lastModified, inBatchingMode);
     }
-
-    @Override
-    protected boolean canFitRecordByteDelta(long byteDelta, long recordCount, long byteCount) {
-        return isUnlimited || super.canFitRecordByteDelta(byteDelta, recordCount, byteCount);
-    }
-
-    @Override
-    protected void reset() {
-        synchronized (accessLock) {
-            super.reset();
-            token = null;
-            lastModified = null;
-            successRecordGuids.clear();
-            needsCommit = false;
-        }
-    }
-}
\ No newline at end of file
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
@@ -12,16 +12,19 @@ import org.mozilla.gecko.sync.InfoConfig
 import org.mozilla.gecko.sync.Server11RecordPostFailedException;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Uploader which implements batching introduced in Sync 1.5.
  *
  * Batch vs payload terminology:
  * - batch is comprised of a series of payloads, which are all committed at the same time.
@@ -33,94 +36,96 @@ import java.util.concurrent.atomic.Atomi
  * InfoConfiguration object.
  *
  * If we can't fit everything we'd like to upload into one batch (according to max-total-* limits),
  * then we commit that batch, and start a new one. There are no explicit limits on total number of
  * batches we might use, although at some point we'll start to run into storage limit errors from the API.
  *
  * Once we go past using one batch this uploader is no longer "atomic". Partial state is exposed
  * to other clients after our first batch is committed and before our last batch is committed.
- * However, our per-batch limits are high, X-I-U-S mechanics help protect downloading clients
- * (as long as they implement X-I-U-S) with 412 error codes in case of interleaving upload and download,
- * and most mobile clients will not be uploading large-enough amounts of data (especially structured
- * data, such as bookmarks).
+ * However, our per-batch limits are (hopefully) high, X-I-U-S mechanics help protect downloading clients
+ * (as long as they implement X-I-U-S) with 412 error codes in case of interleaving upload and download.
  *
  * Last-Modified header returned with the first batch payload POST success is maintained for a batch,
  * to guard against concurrent-modification errors (different uploader commits before we're done).
  *
+ * Implementation notes:
+ * - RecordsChannel (via RepositorySession) delivers a stream of records for upload via {@link #process(Record)}
+ * - UploaderMeta is used to track batch-level information necessary for processing outgoing records
+ * - PayloadMeta is used to track payload-level information necessary for processing outgoing records
+ * - BatchMeta within PayloadDispatcher acts as a shared whiteboard which is used for tracking
+ *   information across batches (last-modified, batching mode) as well as batch side-effects (stored guids)
+ *
  * Non-batching mode notes:
  * We also support Sync servers which don't enable batching for uploads. In this case, we respect
  * payload limits for individual uploads, and every upload is considered a commit. Batching limits
  * do not apply, and batch token is irrelevant.
  * We do keep track of Last-Modified and send along X-I-U-S with our uploads, to protect against
  * concurrent modifications by other clients.
  */
 public class BatchingUploader {
     private static final String LOG_TAG = "BatchingUploader";
 
-    private final Uri collectionUri;
-
-    private volatile boolean recordUploadFailed = false;
-
-    private final BatchMeta batchMeta;
-    private final Payload payload;
-
-    // Accessed by synchronously running threads, OK to not synchronize and just make it volatile.
-    private volatile Boolean inBatchingMode;
-
-    // Used to ensure we have thread-safe access to the following:
-    // - byte and record counts in both Payload and BatchMeta objects
-    // - buffers in the Payload object
-    private final Object payloadLock = new Object();
-
-    protected Executor workQueue;
-    protected final RepositorySessionStoreDelegate sessionStoreDelegate;
-    protected final Server11RepositorySession repositorySession;
-
-    protected AtomicLong uploadTimestamp = new AtomicLong(0);
-
-    protected static final int PER_RECORD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORD_SEPARATOR.length;
-    protected static final int PER_PAYLOAD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORDS_END.length;
+    private static final int PER_RECORD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORD_SEPARATOR.length;
+    /* package-local */ static final int PER_PAYLOAD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORDS_END.length;
 
     // Sanity check. RECORD_SEPARATOR and RECORD_START are assumed to be of the same length.
     static {
         if (RecordUploadRunnable.RECORD_SEPARATOR.length != RecordUploadRunnable.RECORDS_START.length) {
             throw new IllegalStateException("Separator and start tokens must be of the same length");
         }
     }
 
+    // Accessed by the record consumer thread pool.
+    // Will be re-created, so mark it as volatile.
+    private volatile Payload payload;
+
+    // Accessed by both the record consumer thread pool and the network worker thread(s).
+    /* package-local */ final Uri collectionUri;
+    /* package-local */ final RepositorySessionStoreDelegate sessionStoreDelegate;
+    /* package-local */ @VisibleForTesting final PayloadDispatcher payloadDispatcher;
+    private final Server11RepositorySession repositorySession;
+    // Will be re-created, so mark it as volatile.
+    private volatile UploaderMeta uploaderMeta;
+
+    // Used to ensure we have thread-safe access to the following:
+    // - byte and record counts in both Payload and BatchMeta objects
+    // - buffers in the Payload object
+    private final Object payloadLock = new Object();
+
+
     public BatchingUploader(final Server11RepositorySession repositorySession, final Executor workQueue, final RepositorySessionStoreDelegate sessionStoreDelegate) {
         this.repositorySession = repositorySession;
-        this.workQueue = workQueue;
         this.sessionStoreDelegate = sessionStoreDelegate;
         this.collectionUri = Uri.parse(repositorySession.getServerRepository().collectionURI().toString());
 
         InfoConfiguration config = repositorySession.getServerRepository().getInfoConfiguration();
-        this.batchMeta = new BatchMeta(
-                payloadLock, config.maxTotalBytes, config.maxTotalRecords,
-                repositorySession.getServerRepository().getCollectionLastModified()
-        );
+        this.uploaderMeta = new UploaderMeta(payloadLock, config.maxTotalBytes, config.maxTotalRecords);
         this.payload = new Payload(payloadLock, config.maxPostBytes, config.maxPostRecords);
+
+        this.payloadDispatcher = new PayloadDispatcher(
+                workQueue, this, repositorySession.getServerRepository().getCollectionLastModified());
     }
 
+    // Called concurrently from the threads running off of a record consumer thread pool.
     public void process(final Record record) {
         final String guid = record.guid;
         final byte[] recordBytes = record.toJSONBytes();
         final long recordDeltaByteCount = recordBytes.length + PER_RECORD_OVERHEAD_BYTE_COUNT;
 
         Logger.debug(LOG_TAG, "Processing a record with guid: " + guid);
 
         // We can't upload individual records which exceed our payload byte limit.
         if ((recordDeltaByteCount + PER_PAYLOAD_OVERHEAD_BYTE_COUNT) > payload.maxBytes) {
             sessionStoreDelegate.onRecordStoreFailed(new RecordTooLargeToUpload(), guid);
             return;
         }
 
         synchronized (payloadLock) {
-            final boolean canFitRecordIntoBatch = batchMeta.canFit(recordDeltaByteCount);
+            final boolean canFitRecordIntoBatch = uploaderMeta.canFit(recordDeltaByteCount);
             final boolean canFitRecordIntoPayload = payload.canFit(recordDeltaByteCount);
 
             // Record fits!
             if (canFitRecordIntoBatch && canFitRecordIntoPayload) {
                 Logger.debug(LOG_TAG, "Record fits into the current batch and payload");
                 addAndFlushIfNecessary(recordDeltaByteCount, recordBytes, guid);
 
             // Payload won't fit the record.
@@ -134,211 +139,102 @@ public class BatchingUploader {
                 addAndFlushIfNecessary(recordDeltaByteCount, recordBytes, guid);
 
             // Batch won't fit the record.
             } else {
                 Logger.debug(LOG_TAG, "Current batch won't fit incoming record, committing batch.");
                 flush(true, false);
 
                 Logger.debug(LOG_TAG, "Recording the incoming record into a new batch");
-                batchMeta.reset();
 
                 // Keep track of the overflow record.
                 addAndFlushIfNecessary(recordDeltaByteCount, recordBytes, guid);
             }
         }
     }
 
     // Convenience function used from the process method; caller must hold a payloadLock.
     private void addAndFlushIfNecessary(long byteCount, byte[] recordBytes, String guid) {
         boolean isPayloadFull = payload.addAndEstimateIfFull(byteCount, recordBytes, guid);
-        boolean isBatchFull = batchMeta.addAndEstimateIfFull(byteCount);
+        boolean isBatchFull = uploaderMeta.addAndEstimateIfFull(byteCount);
 
         // Preemptive commit batch or upload a payload if they're estimated to be full.
         if (isBatchFull) {
             flush(true, false);
-            batchMeta.reset();
         } else if (isPayloadFull) {
             flush(false, false);
         }
     }
 
     public void noMoreRecordsToUpload() {
         Logger.debug(LOG_TAG, "Received 'no more records to upload' signal.");
 
-        // Run this after the last payload succeeds, so that we know for sure if we're in a batching
-        // mode and need to commit with a potentially empty payload.
-        workQueue.execute(new Runnable() {
+        // If we have any pending records in the Payload, flush them!
+        if (!payload.isEmpty()) {
+            flush(true, true);
+            return;
+        }
+
+        // If we don't have any pending records, we still might need to send an empty "commit"
+        // payload if we are in the batching mode.
+        // The dispatcher will run the final flush on its executor if necessary after all payloads
+        // succeed and it knows for sure if we're in a batching mode.
+        payloadDispatcher.finalizeQueue(uploaderMeta.needToCommit(), new Runnable() {
             @Override
             public void run() {
-                commitIfNecessaryAfterLastPayload();
+                flush(true, true);
             }
         });
     }
 
-    @VisibleForTesting
-    protected void commitIfNecessaryAfterLastPayload() {
-        // Must be called after last payload upload finishes.
-        synchronized (payload) {
-            // If we have any pending records in the Payload, flush them!
-            if (!payload.isEmpty()) {
-                flush(true, true);
-
-            // If we have an empty payload but need to commit the batch in the batching mode, flush!
-            } else if (batchMeta.needToCommit() && Boolean.TRUE.equals(inBatchingMode)) {
-                flush(true, true);
-
-            // Otherwise, we're done.
-            } else {
-                finished(uploadTimestamp);
-            }
-        }
-    }
-
-    /**
-     * We've been told by our upload delegate that a payload succeeded.
-     * Depending on the type of payload and batch mode status, inform our delegate of progress.
-     *
-     * @param response success response to our commit post
-     * @param isCommit was this a commit upload?
-     * @param isLastPayload was this a very last payload we'll upload?
-     */
-    public void payloadSucceeded(final SyncStorageResponse response, final boolean isCommit, final boolean isLastPayload) {
-        // Sanity check.
-        if (inBatchingMode == null) {
-            throw new IllegalStateException("Can't process payload success until we know if we're in a batching mode");
-        }
-
-        // We consider records to have been committed if we're not in a batching mode or this was a commit.
-        // If records have been committed, notify our store delegate.
-        if (!inBatchingMode || isCommit) {
-            for (String guid : batchMeta.getSuccessRecordGuids()) {
-                sessionStoreDelegate.onRecordStoreSucceeded(guid);
-            }
-        }
-
-        // If this was our very last commit, we're done storing records.
-        // Get Last-Modified timestamp from the response, and pass it upstream.
-        if (isLastPayload) {
-            finished(response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED));
-        }
-    }
-
-    public void lastPayloadFailed() {
-        finished(uploadTimestamp);
-    }
-
-    private void finished(long lastModifiedTimestamp) {
-        bumpTimestampTo(uploadTimestamp, lastModifiedTimestamp);
-        finished(uploadTimestamp);
-    }
-
-    private void finished(AtomicLong lastModifiedTimestamp) {
+    /* package-local */ void finished(AtomicLong lastModifiedTimestamp) {
         repositorySession.storeDone(lastModifiedTimestamp.get());
     }
 
-    public BatchMeta getCurrentBatch() {
-        return batchMeta;
-    }
-
-    public void setInBatchingMode(boolean inBatchingMode) {
-        this.inBatchingMode = inBatchingMode;
-
+    // Will be called from a thread dispatched by PayloadDispatcher.
+    // NB: Access to `uploaderMeta.isUnlimited` is guarded by the payloadLock.
+    /* package-local */ void setUnlimitedMode(boolean isUnlimited) {
         // If we know for sure that we're not in a batching mode,
         // consider our batch to be of unlimited size.
-        this.batchMeta.setIsUnlimited(!inBatchingMode);
-    }
-
-    public Boolean getInBatchingMode() {
-        return inBatchingMode;
-    }
-
-    public void setLastModified(final Long lastModified, final boolean isCommit) throws BatchingUploaderException {
-        // Sanity check.
-        if (inBatchingMode == null) {
-            throw new IllegalStateException("Can't process Last-Modified before we know we're in a batching mode.");
-        }
-
-        // In non-batching mode, every time we receive a Last-Modified timestamp, we expect it to change
-        // since records are "committed" (become visible to other clients) on every payload.
-        // In batching mode, we only expect Last-Modified to change when we commit a batch.
-        batchMeta.setLastModified(lastModified, isCommit || !inBatchingMode);
+        this.uploaderMeta.setIsUnlimited(isUnlimited);
     }
 
-    public void recordSucceeded(final String recordGuid) {
-        Logger.debug(LOG_TAG, "Record store succeeded: " + recordGuid);
-        batchMeta.recordSucceeded(recordGuid);
-    }
-
-    public void recordFailed(final String recordGuid) {
-        recordFailed(new Server11RecordPostFailedException(), recordGuid);
-    }
-
-    public void recordFailed(final Exception e, final String recordGuid) {
-        Logger.debug(LOG_TAG, "Record store failed for guid " + recordGuid + " with exception: " + e.toString());
-        recordUploadFailed = true;
-        sessionStoreDelegate.onRecordStoreFailed(e, recordGuid);
-    }
-
-    public Server11RepositorySession getRepositorySession() {
+    /* package-local */ Server11RepositorySession getRepositorySession() {
         return repositorySession;
     }
 
-    private static void bumpTimestampTo(final AtomicLong current, long newValue) {
-        while (true) {
-            long existing = current.get();
-            if (existing > newValue) {
-                return;
-            }
-            if (current.compareAndSet(existing, newValue)) {
-                return;
-            }
-        }
-    }
-
     private void flush(final boolean isCommit, final boolean isLastPayload) {
         final ArrayList<byte[]> outgoing;
         final ArrayList<String> outgoingGuids;
         final long byteCount;
 
         // Even though payload object itself is thread-safe, we want to ensure we get these altogether
         // as a "unit". Another approach would be to create a wrapper object for these values, but this works.
         synchronized (payloadLock) {
             outgoing = payload.getRecordsBuffer();
             outgoingGuids = payload.getRecordGuidsBuffer();
             byteCount = payload.getByteCount();
         }
+        payload = payload.nextPayload();
 
-        workQueue.execute(new RecordUploadRunnable(
-                new BatchingAtomicUploaderMayUploadProvider(),
-                collectionUri,
-                batchMeta,
-                new PayloadUploadDelegate(this, outgoingGuids, isCommit, isLastPayload),
-                outgoing,
-                byteCount,
-                isCommit
-        ));
+        payloadDispatcher.queue(outgoing, outgoingGuids, byteCount, isCommit, isLastPayload);
 
-        payload.reset();
-    }
-
-    private class BatchingAtomicUploaderMayUploadProvider implements MayUploadProvider {
-        public boolean mayUpload() {
-            return !recordUploadFailed;
+        if (isCommit && !isLastPayload) {
+            uploaderMeta = uploaderMeta.nextUploaderMeta();
         }
     }
 
-    public static class BatchingUploaderException extends Exception {
+    /* package-local */ static class BatchingUploaderException extends Exception {
         private static final long serialVersionUID = 1L;
     }
-    public static class RecordTooLargeToUpload extends BatchingUploaderException {
+    /* package-local */ static class LastModifiedDidNotChange extends BatchingUploaderException {
         private static final long serialVersionUID = 1L;
     }
-    public static class LastModifiedDidNotChange extends BatchingUploaderException {
+    /* package-local */ static class LastModifiedChangedUnexpectedly extends BatchingUploaderException {
         private static final long serialVersionUID = 1L;
     }
-    public static class LastModifiedChangedUnexpectedly extends BatchingUploaderException {
+    /* package-local */ static class TokenModifiedException extends BatchingUploaderException {
+        private static final long serialVersionUID = 1L;
+    };
+    private static class RecordTooLargeToUpload extends BatchingUploaderException {
         private static final long serialVersionUID = 1L;
     }
-    public static class TokenModifiedException extends BatchingUploaderException {
-        private static final long serialVersionUID = 1L;
-    };
 }
\ No newline at end of file
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BufferSizeTracker.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BufferSizeTracker.java
@@ -14,17 +14,17 @@ import android.support.annotation.CheckR
  * - checking if a record can fit
  */
 /* @ThreadSafe */
 public abstract class BufferSizeTracker {
     protected final Object accessLock;
 
     /* @GuardedBy("accessLock") */ private long byteCount = BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT;
     /* @GuardedBy("accessLock") */ private long recordCount = 0;
-    /* @GuardedBy("accessLock") */ protected Long smallestRecordByteCount;
+    /* @GuardedBy("accessLock") */ private Long smallestRecordByteCount;
 
     protected final long maxBytes;
     protected final long maxRecords;
 
     public BufferSizeTracker(Object accessLock, long maxBytes, long maxRecords) {
         this.accessLock = accessLock;
         this.maxBytes = maxBytes;
         this.maxRecords = maxRecords;
@@ -83,21 +83,13 @@ public abstract class BufferSizeTracker 
 
     protected long getRecordCount() {
         synchronized (accessLock) {
             return recordCount;
         }
     }
 
     @CallSuper
-    protected void reset() {
-        synchronized (accessLock) {
-            byteCount = BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT;
-            recordCount = 0;
-        }
-    }
-
-    @CallSuper
     protected boolean canFitRecordByteDelta(long byteDelta, long recordCount, long byteCount) {
         return recordCount < maxRecords
                 && (byteCount + byteDelta) <= maxBytes;
     }
 }
\ No newline at end of file
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/Payload.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/Payload.java
@@ -32,25 +32,16 @@ public class Payload extends BufferSizeT
     protected boolean addAndEstimateIfFull(long recordDelta, byte[] recordBytes, String guid) {
         synchronized (accessLock) {
             recordsBuffer.add(recordBytes);
             recordGuidsBuffer.add(guid);
             return super.addAndEstimateIfFull(recordDelta);
         }
     }
 
-    @Override
-    protected void reset() {
-        synchronized (accessLock) {
-            super.reset();
-            recordsBuffer.clear();
-            recordGuidsBuffer.clear();
-        }
-    }
-
     protected ArrayList<byte[]> getRecordsBuffer() {
         synchronized (accessLock) {
             return new ArrayList<>(recordsBuffer);
         }
     }
 
     protected ArrayList<String> getRecordGuidsBuffer() {
         synchronized (accessLock) {
@@ -58,9 +49,13 @@ public class Payload extends BufferSizeT
         }
     }
 
     protected boolean isEmpty() {
         synchronized (accessLock) {
             return recordsBuffer.isEmpty();
         }
     }
+
+    Payload nextPayload() {
+        return new Payload(accessLock, maxBytes, maxRecords);
+    }
 }
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
@@ -0,0 +1,184 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.uploaders;
+
+import android.support.annotation.Nullable;
+import android.support.annotation.VisibleForTesting;
+
+import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.Server11RecordPostFailedException;
+import org.mozilla.gecko.sync.net.SyncResponse;
+import org.mozilla.gecko.sync.net.SyncStorageResponse;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Dispatches payload runnables and handles their results.
+ *
+ * All of the methods, except for `queue` and `finalizeQueue`, will be called from the thread(s)
+ * running sequentially on the SingleThreadExecutor `executor`.
+ */
+class PayloadDispatcher {
+    private static final String LOG_TAG = "PayloadDispatcher";
+
+    // All payload runnables share the same whiteboard.
+    // It's accessed directly by the runnables; tests also make use of this direct access.
+    volatile BatchMeta batchWhiteboard;
+    private final AtomicLong uploadTimestamp = new AtomicLong(0);
+
+    // Accessed from different threads sequentially running on the 'executor'.
+    private volatile boolean recordUploadFailed = false;
+
+    private final Executor executor;
+    private final BatchingUploader uploader;
+
+    PayloadDispatcher(Executor executor, BatchingUploader uploader, @Nullable Long initialLastModified) {
+        // Initially we don't know if we're in a batching mode.
+        this.batchWhiteboard = new BatchMeta(initialLastModified, null);
+        this.uploader = uploader;
+        this.executor = executor;
+    }
+
+    void queue(
+            final ArrayList<byte[]> outgoing,
+            final ArrayList<String> outgoingGuids,
+            final long byteCount,
+            final boolean isCommit, final boolean isLastPayload) {
+
+        // Note that `executor` is expected to be a SingleThreadExecutor.
+        executor.execute(new BatchContextRunnable(isCommit) {
+            @Override
+            public void run() {
+                new RecordUploadRunnable(
+                        new BatchingAtomicUploaderMayUploadProvider(),
+                        uploader.collectionUri,
+                        batchWhiteboard.getToken(),
+                        new PayloadUploadDelegate(
+                                uploader.getRepositorySession().getServerRepository().getAuthHeaderProvider(),
+                                PayloadDispatcher.this,
+                                outgoingGuids,
+                                isCommit,
+                                isLastPayload
+                        ),
+                        outgoing,
+                        byteCount,
+                        isCommit
+                ).run();
+            }
+        });
+    }
+
+    void setInBatchingMode(boolean inBatchingMode) {
+        batchWhiteboard.setInBatchingMode(inBatchingMode);
+        uploader.setUnlimitedMode(!inBatchingMode);
+    }
+
+    /**
+     * We've been told by our upload delegate that a payload succeeded.
+     * Depending on the type of payload and batch mode status, inform our delegate of progress.
+     *
+     * @param response success response to our commit post
+     * @param guids list of successfully posted record guids
+     * @param isCommit was this a commit upload?
+     * @param isLastPayload was this a very last payload we'll upload?
+     */
+    void payloadSucceeded(final SyncStorageResponse response, final String[] guids, final boolean isCommit, final boolean isLastPayload) {
+        // Sanity check.
+        if (batchWhiteboard.getInBatchingMode() == null) {
+            throw new IllegalStateException("Can't process payload success until we know if we're in a batching mode");
+        }
+
+        // We consider records to have been committed if we're not in a batching mode or this was a commit.
+        // If records have been committed, notify our store delegate.
+        if (!batchWhiteboard.getInBatchingMode() || isCommit) {
+            for (String guid : guids) {
+                uploader.sessionStoreDelegate.onRecordStoreSucceeded(guid);
+            }
+        }
+
+        // If this was our very last commit, we're done storing records.
+        // Get Last-Modified timestamp from the response, and pass it upstream.
+        if (isLastPayload) {
+            finished(response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED));
+        }
+    }
+
+    void lastPayloadFailed() {
+        uploader.finished(uploadTimestamp);
+    }
+
+    private void finished(long lastModifiedTimestamp) {
+        bumpTimestampTo(uploadTimestamp, lastModifiedTimestamp);
+        uploader.finished(uploadTimestamp);
+    }
+
+    void finalizeQueue(final boolean needToCommit, final Runnable finalRunnable) {
+        executor.execute(new NonPayloadContextRunnable() {
+            @Override
+            public void run() {
+                // Must be called after last payload upload finishes.
+                if (needToCommit && Boolean.TRUE.equals(batchWhiteboard.getInBatchingMode())) {
+                    finalRunnable.run();
+
+                    // Otherwise, we're done.
+                } else {
+                    uploader.finished(uploadTimestamp);
+                }
+            }
+        });
+    }
+
+    void recordFailed(final String recordGuid) {
+        recordFailed(new Server11RecordPostFailedException(), recordGuid);
+    }
+
+    void recordFailed(final Exception e, final String recordGuid) {
+        Logger.debug(LOG_TAG, "Record store failed for guid " + recordGuid + " with exception: " + e.toString());
+        recordUploadFailed = true;
+        uploader.sessionStoreDelegate.onRecordStoreFailed(e, recordGuid);
+    }
+
+    void prepareForNextBatch() {
+        batchWhiteboard = batchWhiteboard.nextBatchMeta();
+    }
+
+    private static void bumpTimestampTo(final AtomicLong current, long newValue) {
+        while (true) {
+            long existing = current.get();
+            if (existing > newValue) {
+                return;
+            }
+            if (current.compareAndSet(existing, newValue)) {
+                return;
+            }
+        }
+    }
+
+    /**
+     * Allows tests to easily peek into the flow of upload tasks.
+     */
+    @VisibleForTesting
+    abstract static class BatchContextRunnable implements Runnable {
+        boolean isCommit;
+
+        BatchContextRunnable(boolean isCommit) {
+            this.isCommit = isCommit;
+        }
+    }
+
+    /**
+     * Allows tests to tell apart non-payload runnables going through the executor.
+     */
+    @VisibleForTesting
+    abstract static class NonPayloadContextRunnable implements Runnable {}
+
+    private class BatchingAtomicUploaderMayUploadProvider implements MayUploadProvider {
+        public boolean mayUpload() {
+            return !recordUploadFailed;
+        }
+    }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java
@@ -13,45 +13,48 @@ import org.mozilla.gecko.sync.NonObjectJ
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 
 import java.util.ArrayList;
 
-public class PayloadUploadDelegate implements SyncStorageRequestDelegate {
+class PayloadUploadDelegate implements SyncStorageRequestDelegate {
     private static final String LOG_TAG = "PayloadUploadDelegate";
 
     private static final String KEY_BATCH = "batch";
 
-    private final BatchingUploader uploader;
+    private final AuthHeaderProvider headerProvider;
+    private final PayloadDispatcher dispatcher;
     private ArrayList<String> postedRecordGuids;
     private final boolean isCommit;
     private final boolean isLastPayload;
 
-    public PayloadUploadDelegate(BatchingUploader uploader, ArrayList<String> postedRecordGuids, boolean isCommit, boolean isLastPayload) {
-        this.uploader = uploader;
+    PayloadUploadDelegate(AuthHeaderProvider headerProvider, PayloadDispatcher dispatcher, ArrayList<String> postedRecordGuids, boolean isCommit, boolean isLastPayload) {
+        this.headerProvider = headerProvider;
+        this.dispatcher = dispatcher;
         this.postedRecordGuids = postedRecordGuids;
         this.isCommit = isCommit;
         this.isLastPayload = isLastPayload;
     }
 
     @Override
     public AuthHeaderProvider getAuthHeaderProvider() {
-        return uploader.getRepositorySession().getServerRepository().getAuthHeaderProvider();
+        return headerProvider;
     }
 
     @Override
     public String ifUnmodifiedSince() {
-        final Long lastModified = uploader.getCurrentBatch().getLastModified();
+        final Long lastModified = dispatcher.batchWhiteboard.getLastModified();
         if (lastModified == null) {
             return null;
+        } else {
+            return Utils.millisecondsToDecimalSecondsString(lastModified);
         }
-        return Utils.millisecondsToDecimalSecondsString(lastModified);
     }
 
     @Override
     public void handleRequestSuccess(final SyncStorageResponse response) {
         // First, do some sanity checking.
         if (response.getStatusCode() != 200 && response.getStatusCode() != 202) {
             handleRequestError(
                 new IllegalStateException("handleRequestSuccess received a non-200/202 response: " + response.getStatusCode())
@@ -75,52 +78,57 @@ public class PayloadUploadDelegate imple
             Logger.error(LOG_TAG, "Got exception parsing POST success body.", e);
             this.handleRequestError(e);
             return;
         }
 
         // If we got a 200, it could be either a non-batching result, or a batch commit.
         // - if we're in a batching mode, we expect this to be a commit.
         // If we got a 202, we expect there to be a token present in the response
-        if (response.getStatusCode() == 200 && uploader.getCurrentBatch().getToken() != null) {
-            if (uploader.getInBatchingMode() && !isCommit) {
+        if (response.getStatusCode() == 200 && dispatcher.batchWhiteboard.getToken() != null) {
+            if (dispatcher.batchWhiteboard.getInBatchingMode() && !isCommit) {
                 handleRequestError(
                         new IllegalStateException("Got 200 OK in batching mode, but this was not a commit payload")
                 );
                 return;
             }
         } else if (response.getStatusCode() == 202) {
             if (!body.containsKey(KEY_BATCH)) {
                 handleRequestError(
                         new IllegalStateException("Batch response did not have a batch ID")
                 );
                 return;
             }
         }
 
         // With sanity checks out of the way, can now safely say if we're in a batching mode or not.
         // We only do this once per session.
-        if (uploader.getInBatchingMode() == null) {
-            uploader.setInBatchingMode(body.containsKey(KEY_BATCH));
+        if (dispatcher.batchWhiteboard.getInBatchingMode() == null) {
+            dispatcher.setInBatchingMode(body.containsKey(KEY_BATCH));
         }
 
         // Tell current batch about the token we've received.
         // Throws if token changed after being set once, or if we got a non-null token after a commit.
         try {
-            uploader.getCurrentBatch().setToken(body.getString(KEY_BATCH), isCommit);
+            dispatcher.batchWhiteboard.setToken(body.getString(KEY_BATCH), isCommit);
         } catch (BatchingUploader.BatchingUploaderException e) {
             handleRequestError(e);
             return;
         }
 
         // Will throw if Last-Modified changed when it shouldn't have.
         try {
-            uploader.setLastModified(
+            // In non-batching mode, every time we receive a Last-Modified timestamp, we expect it
+            // to change since records are "committed" (become visible to other clients) on every
+            // payload.
+            // In batching mode, we only expect Last-Modified to change when we commit a batch.
+            dispatcher.batchWhiteboard.setLastModified(
                     response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED),
-                    isCommit);
+                    isCommit || !dispatcher.batchWhiteboard.getInBatchingMode()
+            );
         } catch (BatchingUploader.BatchingUploaderException e) {
             handleRequestError(e);
             return;
         }
 
         // All looks good up to this point, let's process success and failed arrays.
         JSONArray success;
         try {
@@ -129,17 +137,17 @@ public class PayloadUploadDelegate imple
             handleRequestError(e);
             return;
         }
 
         if (success != null && !success.isEmpty()) {
             Logger.trace(LOG_TAG, "Successful records: " + success.toString());
             for (Object o : success) {
                 try {
-                    uploader.recordSucceeded((String) o);
+                    dispatcher.batchWhiteboard.recordSucceeded((String) o);
                 } catch (ClassCastException e) {
                     Logger.error(LOG_TAG, "Got exception parsing POST success guid.", e);
                     // Not much to be done.
                 }
             }
         }
         // GC
         success = null;
@@ -150,36 +158,45 @@ public class PayloadUploadDelegate imple
         } catch (NonObjectJSONException e) {
             handleRequestError(e);
             return;
         }
 
         if (failed != null && !failed.object.isEmpty()) {
             Logger.debug(LOG_TAG, "Failed records: " + failed.object.toString());
             for (String guid : failed.keySet()) {
-                uploader.recordFailed(guid);
+                dispatcher.recordFailed(guid);
             }
         }
         // GC
         failed = null;
 
         // And we're done! Let uploader finish up.
-        uploader.payloadSucceeded(response, isCommit, isLastPayload);
+        dispatcher.payloadSucceeded(
+                response,
+                dispatcher.batchWhiteboard.getSuccessRecordGuids(),
+                isCommit,
+                isLastPayload
+        );
+
+        if (isCommit && !isLastPayload) {
+            dispatcher.prepareForNextBatch();
+        }
     }
 
     @Override
     public void handleRequestFailure(final SyncStorageResponse response) {
         this.handleRequestError(new HTTPFailureException(response));
     }
 
     @Override
     public void handleRequestError(Exception e) {
         for (String guid : postedRecordGuids) {
-            uploader.recordFailed(e, guid);
+            dispatcher.recordFailed(e, guid);
         }
         // GC
         postedRecordGuids = null;
 
         if (isLastPayload) {
-            uploader.lastPayloadFailed();
+            dispatcher.lastPayloadFailed();
         }
     }
 }
\ No newline at end of file
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/RecordUploadRunnable.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/RecordUploadRunnable.java
@@ -1,19 +1,21 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.repositories.uploaders;
 
 import android.net.Uri;
+import android.support.annotation.Nullable;
 import android.support.annotation.VisibleForTesting;
 
 import org.mozilla.gecko.background.common.log.Logger;
 import org.mozilla.gecko.sync.Server11PreviousPostFailedException;
+import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.net.SyncStorageRequest;
 import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -40,30 +42,30 @@ public class RecordUploadRunnable implem
 
     private final ArrayList<byte[]> outgoing;
     private final long byteCount;
 
     // Used to construct POST URI during run().
     @VisibleForTesting
     public final boolean isCommit;
     private final Uri collectionUri;
-    private final BatchMeta batchMeta;
+    private final String batchToken;
 
     public RecordUploadRunnable(MayUploadProvider mayUploadProvider,
                                 Uri collectionUri,
-                                BatchMeta batchMeta,
+                                String batchToken,
                                 SyncStorageRequestDelegate uploadDelegate,
                                 ArrayList<byte[]> outgoing,
                                 long byteCount,
                                 boolean isCommit) {
         this.mayUploadProvider = mayUploadProvider;
         this.uploadDelegate = uploadDelegate;
         this.outgoing = outgoing;
         this.byteCount = byteCount;
-        this.batchMeta = batchMeta;
+        this.batchToken = batchToken;
         this.collectionUri = collectionUri;
         this.isCommit = isCommit;
     }
 
     public static class ByteArraysContentProducer implements ContentProducer {
         ArrayList<byte[]> outgoing;
         public ByteArraysContentProducer(ArrayList<byte[]> arrays) {
             outgoing = arrays;
@@ -139,28 +141,27 @@ public class RecordUploadRunnable implem
         }
 
         Logger.trace(LOG_TAG, "Running upload task. Outgoing records: " + outgoing.size());
 
         // We don't want the task queue to proceed until this request completes.
         // Fortunately, BaseResource is currently synchronous.
         // If that ever changes, you'll need to block here.
 
-        final URI postURI = buildPostURI(isCommit, batchMeta, collectionUri);
+        final URI postURI = buildPostURI(isCommit, batchToken, collectionUri);
         final SyncStorageRequest request = new SyncStorageRequest(postURI);
         request.delegate = uploadDelegate;
 
         ByteArraysEntity body = new ByteArraysEntity(outgoing, byteCount);
         request.post(body);
     }
 
     @VisibleForTesting
-    public static URI buildPostURI(boolean isCommit, BatchMeta batchMeta, Uri collectionUri) {
+    public static URI buildPostURI(boolean isCommit, @Nullable String batchToken, Uri collectionUri) {
         final Uri.Builder uriBuilder = collectionUri.buildUpon();
-        final String batchToken = batchMeta.getToken();
 
         if (batchToken != null) {
             uriBuilder.appendQueryParameter(QUERY_PARAM_BATCH, batchToken);
         } else {
             uriBuilder.appendQueryParameter(QUERY_PARAM_BATCH, QUERY_PARAM_TRUE);
         }
 
         if (isCommit) {
copy from mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchMeta.java
copy to mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/UploaderMeta.java
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchMeta.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/UploaderMeta.java
@@ -1,54 +1,26 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.repositories.uploaders;
 
 import android.support.annotation.CheckResult;
 import android.support.annotation.NonNull;
-import android.support.annotation.Nullable;
 
-import org.mozilla.gecko.background.common.log.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader.TokenModifiedException;
-import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader.LastModifiedChangedUnexpectedly;
-import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader.LastModifiedDidNotChange;
 
-/**
- * Keeps track of token, Last-Modified value and GUIDs of succeeded records.
- */
-/* @ThreadSafe */
-public class BatchMeta extends BufferSizeTracker {
-    private static final String LOG_TAG = "BatchMeta";
-
-    // Will be set once first payload upload succeeds. We don't expect this to change until we
-    // commit the batch, and which point it must change.
-    /* @GuardedBy("this") */ private Long lastModified;
-
-    // Will be set once first payload upload succeeds. We don't expect this to ever change until
-    // a commit succeeds, at which point this gets set to null.
-    /* @GuardedBy("this") */ private String token;
-
-    /* @GuardedBy("accessLock") */ private boolean isUnlimited = false;
-
-    // Accessed by synchronously running threads.
-    /* @GuardedBy("accessLock") */ private final List<String> successRecordGuids = new ArrayList<>();
+public class UploaderMeta extends BufferSizeTracker {
+    // Will be written and read by different threads.
+    /* @GuardedBy("accessLock") */ private volatile boolean isUnlimited = false;
 
     /* @GuardedBy("accessLock") */ private boolean needsCommit = false;
 
-    protected final Long collectionLastModified;
-
-    public BatchMeta(@NonNull Object payloadLock, long maxBytes, long maxRecords, @Nullable Long collectionLastModified) {
+    public UploaderMeta(@NonNull Object payloadLock, long maxBytes, long maxRecords) {
         super(payloadLock, maxBytes, maxRecords);
-        this.collectionLastModified = collectionLastModified;
     }
 
     protected void setIsUnlimited(boolean isUnlimited) {
         synchronized (accessLock) {
             this.isUnlimited = isUnlimited;
         }
     }
 
@@ -70,96 +42,17 @@ public class BatchMeta extends BufferSiz
     }
 
     protected boolean needToCommit() {
         synchronized (accessLock) {
             return needsCommit;
         }
     }
 
-    protected synchronized String getToken() {
-        return token;
-    }
-
-    protected synchronized void setToken(final String newToken, boolean isCommit) throws TokenModifiedException {
-        // Set token once in a batching mode.
-        // In a non-batching mode, this.token and newToken will be null, and this is a no-op.
-        if (token == null) {
-            token = newToken;
-            return;
-        }
-
-        // Sanity checks.
-        if (isCommit) {
-            // We expect token to be null when commit payload succeeds.
-            if (newToken != null) {
-                throw new TokenModifiedException();
-            } else {
-                token = null;
-            }
-            return;
-        }
-
-        // We expect new token to always equal current token for non-commit payloads.
-        if (!token.equals(newToken)) {
-            throw new TokenModifiedException();
-        }
-    }
-
-    protected synchronized Long getLastModified() {
-        if (lastModified == null) {
-            return collectionLastModified;
-        }
-        return lastModified;
-    }
-
-    protected synchronized void setLastModified(final Long newLastModified, final boolean expectedToChange) throws LastModifiedChangedUnexpectedly, LastModifiedDidNotChange {
-        if (lastModified == null) {
-            lastModified = newLastModified;
-            return;
-        }
-
-        if (!expectedToChange && !lastModified.equals(newLastModified)) {
-            Logger.debug(LOG_TAG, "Last-Modified timestamp changed when we didn't expect it");
-            throw new LastModifiedChangedUnexpectedly();
-
-        } else if (expectedToChange && lastModified.equals(newLastModified)) {
-            Logger.debug(LOG_TAG, "Last-Modified timestamp did not change when we expected it to");
-            throw new LastModifiedDidNotChange();
-
-        } else {
-            lastModified = newLastModified;
-        }
-    }
-
-    protected ArrayList<String> getSuccessRecordGuids() {
-        synchronized (accessLock) {
-            return new ArrayList<>(this.successRecordGuids);
-        }
-    }
-
-    protected void recordSucceeded(final String recordGuid) {
-        // Sanity check.
-        if (recordGuid == null) {
-            throw new IllegalStateException();
-        }
-
-        synchronized (accessLock) {
-            successRecordGuids.add(recordGuid);
-        }
-    }
-
     @Override
     protected boolean canFitRecordByteDelta(long byteDelta, long recordCount, long byteCount) {
         return isUnlimited || super.canFitRecordByteDelta(byteDelta, recordCount, byteCount);
     }
 
-    @Override
-    protected void reset() {
-        synchronized (accessLock) {
-            super.reset();
-            token = null;
-            lastModified = null;
-            successRecordGuids.clear();
-            needsCommit = false;
-        }
+    UploaderMeta nextUploaderMeta() {
+        return new UploaderMeta(accessLock, maxBytes, maxRecords);
     }
 }
\ No newline at end of file
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer11RepositorySession.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer11RepositorySession.java
@@ -67,17 +67,22 @@ public class TestServer11RepositorySessi
   static final String LOCAL_COUNTS_URL    = LOCAL_INFO_BASE_URL + "collection_counts";
 
   // Corresponds to rnewman+atest1@mozilla.com, local.
   static final String TEST_USERNAME          = "n6ec3u5bee3tixzp2asys7bs6fve4jfw";
   static final String TEST_PASSWORD          = "passowrd";
   static final String SYNC_KEY          = "eh7ppnb82iwr5kt3z3uyi5vr44";
 
   public final AuthHeaderProvider authHeaderProvider = new BasicAuthHeaderProvider(TEST_USERNAME, TEST_PASSWORD);
-  protected final InfoCollections infoCollections = new InfoCollections();
+  protected final InfoCollections infoCollections = new InfoCollections() {
+    @Override
+    public Long getTimestamp(String collection) {
+      return 0L;
+    }
+  };
   protected final InfoConfiguration infoConfiguration = new InfoConfiguration();
 
   // Few-second timeout so that our longer operations don't time out and cause spurious error-handling results.
   private static final int SHORT_TIMEOUT = 10000;
 
   public AuthHeaderProvider getAuthHeaderProvider() {
     return new BasicAuthHeaderProvider(TEST_USERNAME, TEST_PASSWORD);
   }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchMetaTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchMetaTest.java
@@ -1,109 +1,94 @@
-/* Any copyright is dedicated to the Public Domain.
-   http://creativecommons.org/publicdomain/zero/1.0/ */
-
 package org.mozilla.gecko.sync.repositories.uploaders;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
 
 import static org.junit.Assert.*;
 
 @RunWith(TestRunner.class)
 public class BatchMetaTest {
     private BatchMeta batchMeta;
-    private long byteLimit = 1024;
-    private long recordLimit = 5;
-    private Object lock = new Object();
-    private Long collectionLastModified = 123L;
 
     @Before
     public void setUp() throws Exception {
-        batchMeta = new BatchMeta(lock, byteLimit, recordLimit, collectionLastModified);
-    }
-
-    @Test
-    public void testConstructor() {
-        assertEquals(batchMeta.collectionLastModified, collectionLastModified);
-
-        BatchMeta otherBatchMeta = new BatchMeta(lock, byteLimit, recordLimit, null);
-        assertNull(otherBatchMeta.collectionLastModified);
+        batchMeta = new BatchMeta(null, null);
     }
 
     @Test
     public void testGetLastModified() {
-        // Defaults to collection L-M
-        assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
+        assertNull(batchMeta.getLastModified());
 
         try {
             batchMeta.setLastModified(333L, true);
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {}
+        } catch (BatchingUploader.LastModifiedDidNotChange | BatchingUploader.LastModifiedChangedUnexpectedly e) {
+        }
 
-        assertEquals(batchMeta.getLastModified(), Long.valueOf(333L));
+        assertEquals(Long.valueOf(333L), batchMeta.getLastModified());
     }
 
     @Test
     public void testSetLastModified() {
-        assertEquals(batchMeta.getLastModified(), collectionLastModified);
-
-        try {
-            batchMeta.setLastModified(123L, true);
-            assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-            fail("Should not check for modifications on first L-M set");
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {
-            fail("Should not check for modifications on first L-M set");
-        }
+        BatchingUploaderTest.TestRunnableWithTarget<BatchMeta> tests = new BatchingUploaderTest.TestRunnableWithTarget<BatchMeta>() {
+            @Override
+            void tests() {
+                try {
+                    batchMeta.setLastModified(123L, true);
+                    assertEquals(Long.valueOf(123L), batchMeta.getLastModified());
+                } catch (BatchingUploader.LastModifiedDidNotChange | BatchingUploader.LastModifiedChangedUnexpectedly e) {
+                    fail("Should not check for modifications on first L-M set");
+                }
 
-        // Now the same, but passing in 'false' for "expecting to change".
-        batchMeta.reset();
-        assertEquals(batchMeta.getLastModified(), collectionLastModified);
+                try {
+                    batchMeta.setLastModified(123L, false);
+                    assertEquals(Long.valueOf(123L), batchMeta.getLastModified());
+                } catch (BatchingUploader.LastModifiedDidNotChange | BatchingUploader.LastModifiedChangedUnexpectedly e) {
+                    fail("Should not check for modifications on first L-M set");
+                }
 
-        try {
-            batchMeta.setLastModified(123L, false);
-            assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-            fail("Should not check for modifications on first L-M set");
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {
-            fail("Should not check for modifications on first L-M set");
-        }
+                // Test that we can't modify L-M when we're not expecting to
+                try {
+                    batchMeta.setLastModified(333L, false);
+                } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
+                    assertTrue("Must throw when L-M changes unexpectedly", true);
+                } catch (BatchingUploader.LastModifiedDidNotChange e) {
+                    fail("Not expecting did-not-change throw");
+                }
+                assertEquals(Long.valueOf(123L), batchMeta.getLastModified());
 
-        // Test that we can't modify L-M when we're not expecting to
-        try {
-            batchMeta.setLastModified(333L, false);
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-            assertTrue("Must throw when L-M changes unexpectedly", true);
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {
-            fail("Not expecting did-not-change throw");
-        }
-        assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
+                // Test that we can modify L-M when we're expecting to
+                try {
+                    batchMeta.setLastModified(333L, true);
+                } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
+                    fail("Not expecting changed-unexpectedly throw");
+                } catch (BatchingUploader.LastModifiedDidNotChange e) {
+                    fail("Not expecting did-not-change throw");
+                }
+                assertEquals(Long.valueOf(333L), batchMeta.getLastModified());
 
-        // Test that we can modify L-M when we're expecting to
-        try {
-            batchMeta.setLastModified(333L, true);
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-            fail("Not expecting changed-unexpectedly throw");
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {
-            fail("Not expecting did-not-change throw");
-        }
-        assertEquals(batchMeta.getLastModified(), Long.valueOf(333L));
+                // Test that we catch L-M modifications that expect to change but actually don't
+                try {
+                    batchMeta.setLastModified(333L, true);
+                } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
+                    fail("Not expecting changed-unexpectedly throw");
+                } catch (BatchingUploader.LastModifiedDidNotChange e) {
+                    assertTrue("Expected-to-change-but-did-not-change didn't throw", true);
+                }
+                assertEquals(Long.valueOf(333), batchMeta.getLastModified());
+            }
+        };
 
-        // Test that we catch L-M modifications that expect to change but actually don't
-        try {
-            batchMeta.setLastModified(333L, true);
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-            fail("Not expecting changed-unexpectedly throw");
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {
-            assertTrue("Expected-to-change-but-did-not-change didn't throw", true);
-        }
-        assertEquals(batchMeta.getLastModified(), Long.valueOf(333));
+        tests
+                .setTarget(batchMeta)
+                .run()
+                .setTarget(new BatchMeta(123L, null))
+                .run();
     }
 
     @Test
     public void testSetToken() {
         assertNull(batchMeta.getToken());
 
         try {
             batchMeta.setToken("MTIzNA", false);
@@ -131,152 +116,23 @@ public class BatchMetaTest {
         } catch (BatchingUploader.TokenModifiedException e) {
             fail("Should be able to set token to null during onCommit set");
         }
         assertNull(batchMeta.getToken());
     }
 
     @Test
     public void testRecordSucceeded() {
-        assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
+        assertEquals(0, batchMeta.getSuccessRecordGuids().length);
 
         batchMeta.recordSucceeded("guid1");
 
-        assertTrue(batchMeta.getSuccessRecordGuids().size() == 1);
-        assertTrue(batchMeta.getSuccessRecordGuids().contains("guid1"));
+        assertEquals(1, batchMeta.getSuccessRecordGuids().length);
+        assertEquals("guid1", batchMeta.getSuccessRecordGuids()[0]);
 
         try {
             batchMeta.recordSucceeded(null);
             fail();
         } catch (IllegalStateException e) {
             assertTrue("Should not be able to 'succeed' a null guid", true);
         }
     }
-
-    @Test
-    public void testByteLimits() {
-        assertTrue(batchMeta.canFit(0));
-
-        // Should just fit
-        assertTrue(batchMeta.canFit(byteLimit - BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
-
-        // Can't fit a record due to payload overhead.
-        assertFalse(batchMeta.canFit(byteLimit));
-
-        assertFalse(batchMeta.canFit(byteLimit + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
-        assertFalse(batchMeta.canFit(byteLimit * 1000));
-
-        long recordDelta = byteLimit / 2;
-        assertFalse(batchMeta.addAndEstimateIfFull(recordDelta));
-
-        // Record delta shouldn't fit due to payload overhead.
-        assertFalse(batchMeta.canFit(recordDelta));
-    }
-
-    @Test
-    public void testCountLimits() {
-        // Our record limit is 5, let's add 4.
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-
-        // 5th record still fits in
-        assertTrue(batchMeta.canFit(1));
-
-        // Add the 5th record
-        assertTrue(batchMeta.addAndEstimateIfFull(1));
-
-        // 6th record won't fit
-        assertFalse(batchMeta.canFit(1));
-    }
-
-    @Test
-    public void testNeedCommit() {
-        assertFalse(batchMeta.needToCommit());
-
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-
-        assertTrue(batchMeta.needToCommit());
-
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-
-        assertTrue(batchMeta.needToCommit());
-
-        batchMeta.reset();
-
-        assertFalse(batchMeta.needToCommit());
-    }
-
-    @Test
-    public void testAdd() {
-        // Ensure we account for payload overhead twice when the batch is empty.
-        // Payload overhead is either RECORDS_START or RECORDS_END, and for an empty payload
-        // we need both.
-        assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
-        assertTrue(batchMeta.getRecordCount() == 0);
-
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-
-        assertTrue(batchMeta.getByteCount() == (1 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
-        assertTrue(batchMeta.getRecordCount() == 1);
-
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-
-        assertTrue(batchMeta.getByteCount() == (4 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
-        assertTrue(batchMeta.getRecordCount() == 4);
-
-        assertTrue(batchMeta.addAndEstimateIfFull(1));
-
-        try {
-            assertTrue(batchMeta.addAndEstimateIfFull(1));
-            fail("BatchMeta should not let us insert records that won't fit");
-        } catch (IllegalStateException e) {
-            assertTrue(true);
-        }
-    }
-
-    @Test
-    public void testReset() {
-        assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
-        assertTrue(batchMeta.getRecordCount() == 0);
-        assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
-
-        // Shouldn't throw even if already empty
-        batchMeta.reset();
-        assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
-        assertTrue(batchMeta.getRecordCount() == 0);
-        assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
-
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        batchMeta.recordSucceeded("guid1");
-        try {
-            batchMeta.setToken("MTIzNA", false);
-        } catch (BatchingUploader.TokenModifiedException e) {}
-        try {
-            batchMeta.setLastModified(333L, true);
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {}
-        assertEquals(Long.valueOf(333L), batchMeta.getLastModified());
-        assertEquals("MTIzNA", batchMeta.getToken());
-        assertTrue(batchMeta.getSuccessRecordGuids().size() == 1);
-
-        batchMeta.reset();
-
-        // Counts must be reset
-        assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
-        assertTrue(batchMeta.getRecordCount() == 0);
-        assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
-
-        // Collection L-M shouldn't change
-        assertEquals(batchMeta.collectionLastModified, collectionLastModified);
-
-        // Token must be reset
-        assertNull(batchMeta.getToken());
-
-        // L-M must be reverted to collection L-M
-        assertEquals(batchMeta.getLastModified(), collectionLastModified);
-    }
 }
\ No newline at end of file
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -23,32 +23,38 @@ import org.mozilla.gecko.sync.repositori
 import java.net.URISyntaxException;
 import java.util.Random;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 
 @RunWith(TestRunner.class)
 public class BatchingUploaderTest {
     class MockExecutorService implements Executor {
-        public int totalPayloads = 0;
-        public int commitPayloads = 0;
+        int totalPayloads = 0;
+        int commitPayloads = 0;
 
         @Override
         public void execute(@NonNull Runnable command) {
+            if (command instanceof PayloadDispatcher.NonPayloadContextRunnable) {
+                command.run();
+                return;
+            }
+
             ++totalPayloads;
-            if (((RecordUploadRunnable) command).isCommit) {
+            if (((PayloadDispatcher.BatchContextRunnable) command).isCommit) {
                 ++commitPayloads;
             }
+            command.run();
         }
     }
 
     class MockStoreDelegate implements RepositorySessionStoreDelegate {
-        public int storeFailed = 0;
-        public int storeSucceeded = 0;
-        public int storeCompleted = 0;
+        int storeFailed = 0;
+        int storeSucceeded = 0;
+        int storeCompleted = 0;
 
         @Override
         public void onRecordStoreFailed(Exception ex, String recordGuid) {
             ++storeFailed;
         }
 
         @Override
         public void onRecordStoreSucceeded(String guid) {
@@ -71,72 +77,86 @@ public class BatchingUploaderTest {
 
     @Before
     public void setUp() throws Exception {
         workQueue = new MockExecutorService();
         storeDelegate = new MockStoreDelegate();
     }
 
     @Test
-    public void testProcessEvenPayloadBatch() {
-        BatchingUploader uploader = makeConstrainedUploader(2, 4);
+    public void testProcessEvenPayloadBatch() throws Exception {
+        TestRunnableWithTarget<BatchingUploader> tests = new TestRunnableWithTarget<BatchingUploader>() {
+            @Override
+            public void tests() {
+                MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
+                // 1st
+                target.process(record);
+                assertEquals(0, ((MockExecutorService) workQueue).totalPayloads);
+                assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+                // 2nd -> payload full
+                target.process(record);
+                assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+                assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+                // 3rd
+                target.process(record);
+                assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+                assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
+                // 4th -> batch & payload full
+                target.process(record);
+                assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
+                assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+                // 5th
+                target.process(record);
+                assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
+                assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+                // 6th -> payload full
+                target.process(record);
+                assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
+                assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+                // 7th
+                target.process(record);
+                assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
+                assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
+                // 8th -> batch & payload full
+                target.process(record);
+                assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
+                assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
+                // 9th
+                target.process(record);
+                assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
+                assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
+                // 10th -> payload full
+                target.process(record);
+                assertEquals(5, ((MockExecutorService) workQueue).totalPayloads);
+                assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
+                // 11th
+                target.process(record);
+                assertEquals(5, ((MockExecutorService) workQueue).totalPayloads);
+                assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
+                // 12th -> batch & payload full
+                target.process(record);
+                assertEquals(6, ((MockExecutorService) workQueue).totalPayloads);
+                assertEquals(3, ((MockExecutorService) workQueue).commitPayloads);
+                // 13th
+                target.process(record);
+                assertEquals(6, ((MockExecutorService) workQueue).totalPayloads);
+                assertEquals(3, ((MockExecutorService) workQueue).commitPayloads);
+            }
+        };
 
-        MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
-        // 1st
-        uploader.process(record);
-        assertEquals(0, ((MockExecutorService) workQueue).totalPayloads);
-        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
-        // 2nd -> payload full
-        uploader.process(record);
-        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
-        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
-        // 3rd
-        uploader.process(record);
-        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
-        assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
-        // 4th -> batch & payload full
-        uploader.process(record);
-        assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
-        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
-        // 5th
-        uploader.process(record);
-        assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
-        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
-        // 6th -> payload full
-        uploader.process(record);
-        assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
-        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
-        // 7th
-        uploader.process(record);
-        assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
-        assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
-        // 8th -> batch & payload full
-        uploader.process(record);
-        assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
-        assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
-        // 9th
-        uploader.process(record);
-        assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
-        assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
-        // 10th -> payload full
-        uploader.process(record);
-        assertEquals(5, ((MockExecutorService) workQueue).totalPayloads);
-        assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
-        // 11th
-        uploader.process(record);
-        assertEquals(5, ((MockExecutorService) workQueue).totalPayloads);
-        assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
-        // 12th -> batch & payload full
-        uploader.process(record);
-        assertEquals(6, ((MockExecutorService) workQueue).totalPayloads);
-        assertEquals(3, ((MockExecutorService) workQueue).commitPayloads);
-        // 13th
-        uploader.process(record);
-        assertEquals(6, ((MockExecutorService) workQueue).totalPayloads);
-        assertEquals(3, ((MockExecutorService) workQueue).commitPayloads);
+        tests
+                .setTarget(makeConstrainedUploader(2, 4))
+                .run();
+
+        // clear up between test runs
+        setUp();
+
+        tests
+                .setTarget(makeConstrainedUploader(2, 4, true))
+                .run();
     }
 
     @Test
     public void testProcessUnevenPayloadBatch() {
         BatchingUploader uploader = makeConstrainedUploader(2, 5);
 
         MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
         // 1st
@@ -209,17 +229,17 @@ public class BatchingUploaderTest {
 
         // 5th
         uploader.process(record);
         assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
         assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
 
         // And now we tell uploader that batching isn't supported.
         // It shouldn't bother with batches from now on, just payloads.
-        uploader.setInBatchingMode(false);
+        uploader.setUnlimitedMode(true);
 
         // 6th
         uploader.process(record);
         assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
         assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
 
         // 7th
         uploader.process(record);
@@ -289,153 +309,192 @@ public class BatchingUploaderTest {
         }
     }
 
     @Test
     public void testRandomPayloadSizesNonBatching() {
         BatchingUploader uploader = makeConstrainedUploader(2, 4);
 
         final Random random = new Random();
-        uploader.setInBatchingMode(false);
+        uploader.setUnlimitedMode(true);
         for (int i = 0; i < 15000; i++) {
             uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, random.nextInt(15000)));
         }
     }
 
     @Test
     public void testRandomPayloadSizesNonBatchingDelayed() {
         BatchingUploader uploader = makeConstrainedUploader(2, 4);
 
         final Random random = new Random();
         // Delay telling uploader that batching isn't supported.
         // Randomize how many records we wait for.
         final int delay = random.nextInt(20);
         for (int i = 0; i < 15000; i++) {
             if (delay == i) {
-                uploader.setInBatchingMode(false);
+                uploader.setUnlimitedMode(true);
             }
             uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, random.nextInt(15000)));
         }
     }
 
     @Test
     public void testNoMoreRecordsAfterPayloadPost() {
         BatchingUploader uploader = makeConstrainedUploader(2, 4);
 
         // Process two records (payload limit is also two, batch is four),
         // and ensure that 'no more records' commits.
         MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
         uploader.process(record);
         uploader.process(record);
-        uploader.setInBatchingMode(true);
-        uploader.commitIfNecessaryAfterLastPayload();
+        uploader.payloadDispatcher.setInBatchingMode(true);
+        uploader.noMoreRecordsToUpload();
         // One will be a payload post, the other one is batch commit (empty payload)
         assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
         assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
     }
 
     @Test
     public void testNoMoreRecordsAfterPayloadPostWithOneRecordLeft() {
         BatchingUploader uploader = makeConstrainedUploader(2, 4);
 
         // Process two records (payload limit is also two, batch is four),
         // and ensure that 'no more records' commits.
         MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
         uploader.process(record);
         uploader.process(record);
         uploader.process(record);
-        uploader.commitIfNecessaryAfterLastPayload();
+        uploader.noMoreRecordsToUpload();
         // One will be a payload post, the other one is batch commit (one record payload)
         assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
         assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
     }
 
     @Test
     public void testNoMoreRecordsNoOp() {
         BatchingUploader uploader = makeConstrainedUploader(2, 4);
 
-        uploader.commitIfNecessaryAfterLastPayload();
+        uploader.noMoreRecordsToUpload();
         assertEquals(0, ((MockExecutorService) workQueue).totalPayloads);
         assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
     }
 
     @Test
     public void testNoMoreRecordsNoOpAfterCommit() {
         BatchingUploader uploader = makeConstrainedUploader(2, 4);
 
         MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
         uploader.process(record);
         uploader.process(record);
         uploader.process(record);
         uploader.process(record);
         assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
         assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
 
-        uploader.commitIfNecessaryAfterLastPayload();
+        uploader.noMoreRecordsToUpload();
         assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
         assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
     }
 
     @Test
     public void testNoMoreRecordsEvenNonBatching() {
         BatchingUploader uploader = makeConstrainedUploader(2, 4);
 
         // Process two records (payload limit is also two, batch is four),
         // set non-batching mode, and ensure that 'no more records' doesn't commit.
         MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
         uploader.process(record);
         uploader.process(record);
-        uploader.setInBatchingMode(false);
-        uploader.commitIfNecessaryAfterLastPayload();
+        uploader.setUnlimitedMode(true);
+        uploader.noMoreRecordsToUpload();
         // One will be a payload post, the other one is batch commit (one record payload)
         assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
         assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
     }
 
     @Test
     public void testNoMoreRecordsIncompletePayload() {
         BatchingUploader uploader = makeConstrainedUploader(2, 4);
 
         // We have one record (payload limit is 2), and "no-more-records" signal should commit it.
         MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
         uploader.process(record);
 
-        uploader.commitIfNecessaryAfterLastPayload();
+        uploader.noMoreRecordsToUpload();
         assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
         assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
     }
 
     private BatchingUploader makeConstrainedUploader(long maxPostRecords, long maxTotalRecords) {
+        return makeConstrainedUploader(maxPostRecords, maxTotalRecords, false);
+    }
+
+    private BatchingUploader makeConstrainedUploader(long maxPostRecords, long maxTotalRecords, boolean firstSync) {
         Server11RepositorySession server11RepositorySession = new Server11RepositorySession(
-                makeCountConstrainedRepository(maxPostRecords, maxTotalRecords)
+                makeCountConstrainedRepository(maxPostRecords, maxTotalRecords, firstSync)
         );
         server11RepositorySession.setStoreDelegate(storeDelegate);
         return new BatchingUploader(server11RepositorySession, workQueue, storeDelegate);
     }
 
-    private Server11Repository makeCountConstrainedRepository(long maxPostRecords, long maxTotalRecords) {
-        return makeConstrainedRepository(1024, 1024, maxPostRecords, 4096, maxTotalRecords);
+    private Server11Repository makeCountConstrainedRepository(long maxPostRecords, long maxTotalRecords, boolean firstSync) {
+        return makeConstrainedRepository(1024, 1024, maxPostRecords, 4096, maxTotalRecords, firstSync);
     }
 
-    private Server11Repository makeConstrainedRepository(long maxRequestBytes, long maxPostBytes, long maxPostRecords, long maxTotalBytes, long maxTotalRecords) {
+    private Server11Repository makeConstrainedRepository(long maxRequestBytes, long maxPostBytes, long maxPostRecords, long maxTotalBytes, long maxTotalRecords, boolean firstSync) {
         ExtendedJSONObject infoConfigurationJSON = new ExtendedJSONObject();
         infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_BYTES, maxTotalBytes);
         infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_RECORDS, maxTotalRecords);
         infoConfigurationJSON.put(InfoConfiguration.MAX_POST_RECORDS, maxPostRecords);
         infoConfigurationJSON.put(InfoConfiguration.MAX_POST_BYTES, maxPostBytes);
         infoConfigurationJSON.put(InfoConfiguration.MAX_REQUEST_BYTES, maxRequestBytes);
 
         InfoConfiguration infoConfiguration = new InfoConfiguration(infoConfigurationJSON);
 
+        InfoCollections infoCollections;
+        if (firstSync) {
+            infoCollections = new InfoCollections() {
+                @Override
+                public Long getTimestamp(String collection) {
+                    return null;
+                }
+            };
+        } else {
+            infoCollections = new InfoCollections() {
+                @Override
+                public Long getTimestamp(String collection) {
+                    return 0L;
+                }
+            };
+        }
+
         try {
             return new Server11Repository(
                     "dummyCollection",
                     "http://dummy.url/",
                     null,
-                    new InfoCollections(),
+                    infoCollections,
                     infoConfiguration
             );
         } catch (URISyntaxException e) {
             // Won't throw, and this won't happen.
             return null;
         }
     }
+
+    static abstract class TestRunnableWithTarget<T> {
+        T target;
+
+        TestRunnableWithTarget() {}
+
+        TestRunnableWithTarget<T> setTarget(T target) {
+            this.target = target;
+            return this;
+        }
+
+        TestRunnableWithTarget<T> run() {
+            tests();
+            return this;
+        }
+
+        abstract void tests();
+    }
 }
\ No newline at end of file
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadTest.java
@@ -101,37 +101,9 @@ public class PayloadTest {
 
         try {
             assertTrue(payload.addAndEstimateIfFull(1, recordBytes1, "guid6"));
             fail("Payload should not let us insert records that won't fit");
         } catch (IllegalStateException e) {
             assertTrue(true);
         }
     }
-
-    @Test
-    public void testReset() {
-        assertTrue(payload.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
-        assertTrue(payload.getRecordCount() == 0);
-        assertTrue(payload.getRecordsBuffer().isEmpty());
-        assertTrue(payload.getRecordGuidsBuffer().isEmpty());
-        assertTrue(payload.isEmpty());
-
-        // Shouldn't throw even if already empty
-        payload.reset();
-        assertTrue(payload.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
-        assertTrue(payload.getRecordCount() == 0);
-        assertTrue(payload.getRecordsBuffer().isEmpty());
-        assertTrue(payload.getRecordGuidsBuffer().isEmpty());
-        assertTrue(payload.isEmpty());
-
-        byte[] recordBytes1 = new byte[100];
-        assertFalse(payload.addAndEstimateIfFull(1, recordBytes1, "guid1"));
-        assertFalse(payload.isEmpty());
-        payload.reset();
-
-        assertTrue(payload.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
-        assertTrue(payload.getRecordCount() == 0);
-        assertTrue(payload.getRecordsBuffer().isEmpty());
-        assertTrue(payload.getRecordGuidsBuffer().isEmpty());
-        assertTrue(payload.isEmpty());
-    }
 }
\ No newline at end of file
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
@@ -6,69 +6,72 @@ package org.mozilla.gecko.sync.repositor
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
 import org.mozilla.gecko.sync.HTTPFailureException;
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.NonObjectJSONException;
+import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.Server11Repository;
 import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
-import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
+
+import static org.mockito.Mockito.mock;
 
 import java.io.ByteArrayInputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.concurrent.Executor;
 
 import ch.boye.httpclientandroidlib.HttpResponse;
 import ch.boye.httpclientandroidlib.ProtocolVersion;
 import ch.boye.httpclientandroidlib.entity.BasicHttpEntity;
 import ch.boye.httpclientandroidlib.message.BasicHttpResponse;
 import ch.boye.httpclientandroidlib.message.BasicStatusLine;
 
+import static org.mockito.Mockito.mock;
+
 import static org.junit.Assert.*;
 
 @RunWith(TestRunner.class)
 public class PayloadUploadDelegateTest {
-    private BatchingUploader batchingUploader;
+    private PayloadDispatcher payloadDispatcher;
+    private AuthHeaderProvider authHeaderProvider;
 
-    class MockUploader extends BatchingUploader {
-        public final ArrayList<String> successRecords = new ArrayList<>();
+    class MockPayloadDispatcher extends PayloadDispatcher {
         public final HashMap<String, Exception> failedRecords = new HashMap<>();
         public boolean didLastPayloadFail = false;
 
         public ArrayList<SyncStorageResponse> successResponses = new ArrayList<>();
         public int commitPayloadsSucceeded = 0;
         public int lastPayloadsSucceeded = 0;
 
-        public MockUploader(final Server11RepositorySession repositorySession, final Executor workQueue, final RepositorySessionStoreDelegate sessionStoreDelegate) {
-            super(repositorySession, workQueue, sessionStoreDelegate);
+        public int committedGuids = 0;
+
+        public MockPayloadDispatcher(final Executor workQueue, final BatchingUploader uploader) {
+            super(workQueue, uploader, null);
         }
 
         @Override
-        public void payloadSucceeded(final SyncStorageResponse response, final boolean isCommit, final boolean isLastPayload) {
+        public void payloadSucceeded(final SyncStorageResponse response, String[] guids, final boolean isCommit, final boolean isLastPayload) {
             successResponses.add(response);
             if (isCommit) {
                 ++commitPayloadsSucceeded;
+                committedGuids += guids.length;
             }
             if (isLastPayload) {
                 ++lastPayloadsSucceeded;
             }
         }
 
         @Override
-        public void recordSucceeded(final String recordGuid) {
-            successRecords.add(recordGuid);
-        }
-
-        @Override
         public void recordFailed(final String recordGuid) {
             recordFailed(new Exception(), recordGuid);
         }
 
         @Override
         public void recordFailed(final Exception e, final String recordGuid) {
             failedRecords.put(recordGuid, e);
         }
@@ -80,313 +83,334 @@ public class PayloadUploadDelegateTest {
     }
 
     @Before
     public void setUp() throws Exception {
         Server11Repository server11Repository = new Server11Repository(
                 "dummyCollection",
                 "http://dummy.url/",
                 null,
-                new InfoCollections(),
+                new InfoCollections() {
+                    @Override
+                    public Long getTimestamp(String collection) {
+                        return 0L;
+                    }
+                },
                 new InfoConfiguration()
         );
-        batchingUploader = new MockUploader(
-                new Server11RepositorySession(server11Repository),
+        payloadDispatcher = new MockPayloadDispatcher(
                 null,
-                null
+                mock(BatchingUploader.class)
         );
+
+        authHeaderProvider = mock(AuthHeaderProvider.class);
     }
 
     @Test
     public void testHandleRequestSuccessNonSuccess() {
         ArrayList<String> postedGuids = new ArrayList<>(2);
         postedGuids.add("testGuid1");
         postedGuids.add("testGuid2");
         PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, false, false);
+                authHeaderProvider, payloadDispatcher, postedGuids, false, false);
 
         // Test that non-2* responses aren't processed
         payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(404, null, null));
-        assertEquals(2, ((MockUploader) batchingUploader).failedRecords.size());
-        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
+        assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
         assertEquals(IllegalStateException.class,
-                ((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
+                ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
         assertEquals(IllegalStateException.class,
-                ((MockUploader) batchingUploader).failedRecords.get("testGuid2").getClass());
+                ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid2").getClass());
     }
 
     @Test
     public void testHandleRequestSuccessNoHeaders() {
         ArrayList<String> postedGuids = new ArrayList<>(2);
         postedGuids.add("testGuid1");
         postedGuids.add("testGuid2");
         PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, false, false);
+                authHeaderProvider, payloadDispatcher, postedGuids, false, false);
 
         // Test that responses without X-Last-Modified header aren't processed
         payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(200, null, null));
-        assertEquals(2, ((MockUploader) batchingUploader).failedRecords.size());
-        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
+        assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
         assertEquals(IllegalStateException.class,
-                ((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
+                ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
         assertEquals(IllegalStateException.class,
-                ((MockUploader) batchingUploader).failedRecords.get("testGuid2").getClass());
+                ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid2").getClass());
     }
 
     @Test
     public void testHandleRequestSuccessBadBody() {
         ArrayList<String> postedGuids = new ArrayList<>(2);
         postedGuids.add("testGuid1");
         postedGuids.add("testGuid2");
         PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, false, true);
+                authHeaderProvider, payloadDispatcher, postedGuids, false, true);
 
         // Test that we catch json processing errors
         payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(200, "non json body", "123"));
-        assertEquals(2, ((MockUploader) batchingUploader).failedRecords.size());
-        assertTrue(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
+        assertTrue(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
         assertEquals(NonObjectJSONException.class,
-                ((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
+                ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
         assertEquals(NonObjectJSONException.class,
-                ((MockUploader) batchingUploader).failedRecords.get("testGuid2").getClass());
+                ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid2").getClass());
     }
 
     @Test
     public void testHandleRequestSuccess202NoToken() {
         ArrayList<String> postedGuids = new ArrayList<>(1);
         postedGuids.add("testGuid1");
         PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, false, true);
+                authHeaderProvider, payloadDispatcher, postedGuids, false, true);
 
         // Test that we catch absent tokens in 202 responses
         payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(202, "{\"success\": []}", "123"));
-        assertEquals(1, ((MockUploader) batchingUploader).failedRecords.size());
+        assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
         assertEquals(IllegalStateException.class,
-                ((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
+                ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
     }
 
     @Test
     public void testHandleRequestSuccessBad200() {
         ArrayList<String> postedGuids = new ArrayList<>(1);
         postedGuids.add("testGuid1");
 
         PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, false, false);
+                authHeaderProvider, payloadDispatcher, postedGuids, false, false);
 
         // Test that if in batching mode and saw the token, 200 must be a response to a commit
         try {
-            batchingUploader.getCurrentBatch().setToken("MTIzNA", true);
+            payloadDispatcher.batchWhiteboard.setToken("MTIzNA", true);
         } catch (BatchingUploader.BatchingUploaderException e) {}
-        batchingUploader.setInBatchingMode(true);
+        payloadDispatcher.setInBatchingMode(true);
 
         // not a commit, so should fail
         payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(200, "{\"success\": []}", "123"));
-        assertEquals(1, ((MockUploader) batchingUploader).failedRecords.size());
+        assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
         assertEquals(IllegalStateException.class,
-                ((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
+                ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
     }
 
     @Test
     public void testHandleRequestSuccessNonBatchingFailedLM() {
         ArrayList<String> postedGuids = new ArrayList<>(1);
         postedGuids.add("guid1");
         postedGuids.add("guid2");
         postedGuids.add("guid3");
         PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, false, false);
+                authHeaderProvider, payloadDispatcher, postedGuids, false, false);
 
         payloadUploadDelegate.handleRequestSuccess(
                 makeSyncStorageResponse(200, "{\"success\": [\"guid1\", \"guid2\", \"guid3\"]}", "123"));
-        assertEquals(0, ((MockUploader) batchingUploader).failedRecords.size());
-        assertEquals(3, ((MockUploader) batchingUploader).successRecords.size());
-        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
-        assertEquals(1, ((MockUploader) batchingUploader).successResponses.size());
-        assertEquals(0, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
-        assertEquals(0, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
+        assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
+        assertEquals(3, payloadDispatcher.batchWhiteboard.getSuccessRecordGuids().length);
+        assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
+        assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
+        assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
+        assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
 
         // These should fail, because we're returning a non-changed L-M in a non-batching mode
         postedGuids.add("guid4");
         postedGuids.add("guid6");
         payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, false, false);
+                authHeaderProvider, payloadDispatcher, postedGuids, false, false);
         payloadUploadDelegate.handleRequestSuccess(
                 makeSyncStorageResponse(200, "{\"success\": [\"guid4\", 5, \"guid6\"]}", "123"));
-        assertEquals(5, ((MockUploader) batchingUploader).failedRecords.size());
-        assertEquals(3, ((MockUploader) batchingUploader).successRecords.size());
-        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
-        assertEquals(1, ((MockUploader) batchingUploader).successResponses.size());
-        assertEquals(0, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
-        assertEquals(0, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
+        assertEquals(5, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
+        assertEquals(3, payloadDispatcher.batchWhiteboard.getSuccessRecordGuids().length);
+        assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
+        assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
+        assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
+        assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
         assertEquals(BatchingUploader.LastModifiedDidNotChange.class,
-                ((MockUploader) batchingUploader).failedRecords.get("guid4").getClass());
+                ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("guid4").getClass());
     }
 
     @Test
     public void testHandleRequestSuccessNonBatching() {
         ArrayList<String> postedGuids = new ArrayList<>();
         postedGuids.add("guid1");
         postedGuids.add("guid2");
         postedGuids.add("guid3");
         PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, false, false);
+                authHeaderProvider, payloadDispatcher, postedGuids, false, false);
         payloadUploadDelegate.handleRequestSuccess(
                 makeSyncStorageResponse(200, "{\"success\": [\"guid1\", \"guid2\", \"guid3\"], \"failed\": {}}", "123"));
 
         postedGuids = new ArrayList<>();
         postedGuids.add("guid4");
         postedGuids.add("guid5");
         payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, false, false);
+                authHeaderProvider, payloadDispatcher, postedGuids, false, false);
         payloadUploadDelegate.handleRequestSuccess(
                 makeSyncStorageResponse(200, "{\"success\": [\"guid4\", \"guid5\"], \"failed\": {}}", "333"));
 
         postedGuids = new ArrayList<>();
         postedGuids.add("guid6");
         payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, false, true);
+                authHeaderProvider, payloadDispatcher, postedGuids, false, true);
         payloadUploadDelegate.handleRequestSuccess(
                 makeSyncStorageResponse(200, "{\"success\": [\"guid6\"], \"failed\": {}}", "444"));
 
-        assertEquals(0, ((MockUploader) batchingUploader).failedRecords.size());
-        assertEquals(6, ((MockUploader) batchingUploader).successRecords.size());
-        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
-        assertEquals(3, ((MockUploader) batchingUploader).successResponses.size());
-        assertEquals(0, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
-        assertEquals(1, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
-        assertFalse(batchingUploader.getInBatchingMode());
+        assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
+        assertEquals(6, payloadDispatcher.batchWhiteboard.getSuccessRecordGuids().length);
+        assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
+        assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
+        assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
+        assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
+        assertFalse(payloadDispatcher.batchWhiteboard.getInBatchingMode());
 
         postedGuids = new ArrayList<>();
         postedGuids.add("guid7");
         postedGuids.add("guid8");
         payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, false, true);
+                authHeaderProvider, payloadDispatcher, postedGuids, false, true);
         payloadUploadDelegate.handleRequestSuccess(
                 makeSyncStorageResponse(200, "{\"success\": [\"guid8\"], \"failed\": {\"guid7\": \"reason\"}}", "555"));
-        assertEquals(1, ((MockUploader) batchingUploader).failedRecords.size());
-        assertTrue(((MockUploader) batchingUploader).failedRecords.containsKey("guid7"));
-        assertEquals(7, ((MockUploader) batchingUploader).successRecords.size());
-        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
-        assertEquals(4, ((MockUploader) batchingUploader).successResponses.size());
-        assertEquals(0, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
-        assertEquals(2, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
-        assertFalse(batchingUploader.getInBatchingMode());
+        assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
+        assertTrue(((MockPayloadDispatcher) payloadDispatcher).failedRecords.containsKey("guid7"));
+        assertEquals(7, payloadDispatcher.batchWhiteboard.getSuccessRecordGuids().length);
+        assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
+        assertEquals(4, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
+        assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
+        assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
+        assertFalse(payloadDispatcher.batchWhiteboard.getInBatchingMode());
     }
 
     @Test
     public void testHandleRequestSuccessBatching() {
+        assertNull(payloadDispatcher.batchWhiteboard.getInBatchingMode());
+
         ArrayList<String> postedGuids = new ArrayList<>();
         postedGuids.add("guid1");
         postedGuids.add("guid2");
         postedGuids.add("guid3");
         PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, false, false);
+                authHeaderProvider, payloadDispatcher, postedGuids, false, false);
         payloadUploadDelegate.handleRequestSuccess(
                 makeSyncStorageResponse(202, "{\"batch\": \"MTIzNA\", \"success\": [\"guid1\", \"guid2\", \"guid3\"], \"failed\": {}}", "123"));
 
-        assertTrue(batchingUploader.getInBatchingMode());
-        assertEquals("MTIzNA", batchingUploader.getCurrentBatch().getToken());
+        assertTrue(payloadDispatcher.batchWhiteboard.getInBatchingMode());
+        assertEquals("MTIzNA", payloadDispatcher.batchWhiteboard.getToken());
 
         postedGuids = new ArrayList<>();
         postedGuids.add("guid4");
         postedGuids.add("guid5");
         postedGuids.add("guid6");
         payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, false, false);
+                authHeaderProvider, payloadDispatcher, postedGuids, false, false);
         payloadUploadDelegate.handleRequestSuccess(
                 makeSyncStorageResponse(202, "{\"batch\": \"MTIzNA\", \"success\": [\"guid4\", \"guid5\", \"guid6\"], \"failed\": {}}", "123"));
 
-        assertTrue(batchingUploader.getInBatchingMode());
-        assertEquals("MTIzNA", batchingUploader.getCurrentBatch().getToken());
+        assertTrue(payloadDispatcher.batchWhiteboard.getInBatchingMode());
+        assertEquals("MTIzNA", payloadDispatcher.batchWhiteboard.getToken());
 
         postedGuids = new ArrayList<>();
         postedGuids.add("guid7");
         payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, true, false);
+                authHeaderProvider, payloadDispatcher, postedGuids, true, false);
         payloadUploadDelegate.handleRequestSuccess(
-                makeSyncStorageResponse(200, "{\"success\": [\"guid6\"], \"failed\": {}}", "222"));
+                makeSyncStorageResponse(200, "{\"success\": [\"guid7\"], \"failed\": {}}", "222"));
+
+        assertEquals(7, ((MockPayloadDispatcher) payloadDispatcher).committedGuids);
 
         // Even though everything indicates we're not in a batching, we were, so test that
         // we don't reset the flag.
-        assertTrue(batchingUploader.getInBatchingMode());
-        assertNull(batchingUploader.getCurrentBatch().getToken());
+        assertTrue(payloadDispatcher.batchWhiteboard.getInBatchingMode());
+        assertNull(payloadDispatcher.batchWhiteboard.getToken());
 
         postedGuids = new ArrayList<>();
         postedGuids.add("guid8");
         payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, postedGuids, true, true);
+                authHeaderProvider, payloadDispatcher, postedGuids, true, true);
         payloadUploadDelegate.handleRequestSuccess(
                 makeSyncStorageResponse(200, "{\"success\": [\"guid7\"], \"failed\": {}}", "333"));
 
-        assertEquals(0, ((MockUploader) batchingUploader).failedRecords.size());
-        assertEquals(8, ((MockUploader) batchingUploader).successRecords.size());
-        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
-        assertEquals(4, ((MockUploader) batchingUploader).successResponses.size());
-        assertEquals(2, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
-        assertEquals(1, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
-        assertTrue(batchingUploader.getInBatchingMode());
+        assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
+        assertEquals(8, ((MockPayloadDispatcher) payloadDispatcher).committedGuids);
+        assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
+        assertEquals(4, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
+        assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
+        assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
+        assertTrue(payloadDispatcher.batchWhiteboard.getInBatchingMode());
     }
 
     @Test
     public void testHandleRequestError() {
         ArrayList<String> postedGuids = new ArrayList<>(3);
         postedGuids.add("testGuid1");
         postedGuids.add("testGuid2");
         postedGuids.add("testGuid3");
-        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(batchingUploader, postedGuids, false, false);
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
+                authHeaderProvider, payloadDispatcher, postedGuids, false, false);
 
         IllegalStateException e = new IllegalStateException();
         payloadUploadDelegate.handleRequestError(e);
 
-        assertEquals(3, ((MockUploader) batchingUploader).failedRecords.size());
-        assertEquals(e, ((MockUploader) batchingUploader).failedRecords.get("testGuid1"));
-        assertEquals(e, ((MockUploader) batchingUploader).failedRecords.get("testGuid2"));
-        assertEquals(e, ((MockUploader) batchingUploader).failedRecords.get("testGuid3"));
-        assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
+        assertEquals(e, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1"));
+        assertEquals(e, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid2"));
+        assertEquals(e, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid3"));
+        assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
 
-        payloadUploadDelegate = new PayloadUploadDelegate(batchingUploader, postedGuids, false, true);
+        payloadUploadDelegate = new PayloadUploadDelegate(
+                authHeaderProvider, payloadDispatcher, postedGuids, false, true);
         payloadUploadDelegate.handleRequestError(e);
-        assertEquals(3, ((MockUploader) batchingUploader).failedRecords.size());
-        assertTrue(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
+        assertTrue(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
     }
 
     @Test
     public void testHandleRequestFailure() {
         ArrayList<String> postedGuids = new ArrayList<>(3);
         postedGuids.add("testGuid1");
         postedGuids.add("testGuid2");
         postedGuids.add("testGuid3");
-        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(batchingUploader, postedGuids, false, false);
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
+                authHeaderProvider, payloadDispatcher, postedGuids, false, false);
 
         final HttpResponse response = new BasicHttpResponse(
                 new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 503, "Illegal method/protocol"));
         payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response));
-        assertEquals(3, ((MockUploader) batchingUploader).failedRecords.size());
+        assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
         assertEquals(HTTPFailureException.class,
-                ((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
+                ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
         assertEquals(HTTPFailureException.class,
-                ((MockUploader) batchingUploader).failedRecords.get("testGuid2").getClass());
+                ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid2").getClass());
         assertEquals(HTTPFailureException.class,
-                ((MockUploader) batchingUploader).failedRecords.get("testGuid3").getClass());
+                ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid3").getClass());
 
-        payloadUploadDelegate = new PayloadUploadDelegate(batchingUploader, postedGuids, false, true);
+        payloadUploadDelegate = new PayloadUploadDelegate(
+                authHeaderProvider, payloadDispatcher, postedGuids, false, true);
         payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response));
-        assertEquals(3, ((MockUploader) batchingUploader).failedRecords.size());
-        assertTrue(((MockUploader) batchingUploader).didLastPayloadFail);
+        assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
+        assertTrue(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
     }
 
     @Test
-    public void testIfUnmodifiedSince() {
+    public void testIfUnmodifiedSinceNoLM() {
         PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
-                batchingUploader, new ArrayList<String>(), false, false);
+                authHeaderProvider, payloadDispatcher, new ArrayList<String>(), false, false);
 
         assertNull(payloadUploadDelegate.ifUnmodifiedSince());
+    }
 
+    @Test
+    public void testIfUnmodifiedSinceWithLM() {
+        PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
+                authHeaderProvider, payloadDispatcher, new ArrayList<String>(), false, false);
         try {
-            batchingUploader.getCurrentBatch().setLastModified(1471645412480L, true);
-        } catch (BatchingUploader.BatchingUploaderException e) {}
+            payloadDispatcher.batchWhiteboard.setLastModified(1471645412480L, true);
+        } catch (BatchingUploader.BatchingUploaderException e) {
+            fail();
+        }
 
         assertEquals("1471645412.480", payloadUploadDelegate.ifUnmodifiedSince());
     }
 
     private SyncStorageResponse makeSyncStorageResponse(int code, String body, String lastModified) {
         BasicHttpResponse response = new BasicHttpResponse(
                 new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), code, null));
 
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/RecordUploadRunnableTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/RecordUploadRunnableTest.java
@@ -12,27 +12,25 @@ import org.mozilla.gecko.background.test
 import java.net.URI;
 
 import static org.junit.Assert.*;
 
 @RunWith(TestRunner.class)
 public class RecordUploadRunnableTest {
     @Test
     public void testBuildPostURI() throws Exception {
-        BatchMeta batchMeta = new BatchMeta(new Object(), 1, 1, null);
         URI postURI = RecordUploadRunnable.buildPostURI(
-                false, batchMeta, Uri.parse("http://example.com/"));
+                false, null, Uri.parse("http://example.com/"));
         assertEquals("http://example.com/?batch=true", postURI.toString());
 
         postURI = RecordUploadRunnable.buildPostURI(
-                true, batchMeta, Uri.parse("http://example.com/"));
+                true, null, Uri.parse("http://example.com/"));
         assertEquals("http://example.com/?batch=true&commit=true", postURI.toString());
 
-        batchMeta.setToken("MTIzNA", false);
         postURI = RecordUploadRunnable.buildPostURI(
-                false, batchMeta, Uri.parse("http://example.com/"));
+                false, "MTIzNA", Uri.parse("http://example.com/"));
         assertEquals("http://example.com/?batch=MTIzNA", postURI.toString());
 
         postURI = RecordUploadRunnable.buildPostURI(
-                true, batchMeta, Uri.parse("http://example.com/"));
+                true, "MTIzNA", Uri.parse("http://example.com/"));
         assertEquals("http://example.com/?batch=MTIzNA&commit=true", postURI.toString());
     }
 }
\ No newline at end of file
copy from mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchMetaTest.java
copy to mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/UploaderMetaTest.java
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchMetaTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/UploaderMetaTest.java
@@ -6,277 +6,102 @@ package org.mozilla.gecko.sync.repositor
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
 
 import static org.junit.Assert.*;
 
 @RunWith(TestRunner.class)
-public class BatchMetaTest {
-    private BatchMeta batchMeta;
+public class UploaderMetaTest {
+    private UploaderMeta uploaderMeta;
     private long byteLimit = 1024;
     private long recordLimit = 5;
     private Object lock = new Object();
-    private Long collectionLastModified = 123L;
 
     @Before
     public void setUp() throws Exception {
-        batchMeta = new BatchMeta(lock, byteLimit, recordLimit, collectionLastModified);
-    }
-
-    @Test
-    public void testConstructor() {
-        assertEquals(batchMeta.collectionLastModified, collectionLastModified);
-
-        BatchMeta otherBatchMeta = new BatchMeta(lock, byteLimit, recordLimit, null);
-        assertNull(otherBatchMeta.collectionLastModified);
-    }
-
-    @Test
-    public void testGetLastModified() {
-        // Defaults to collection L-M
-        assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
-
-        try {
-            batchMeta.setLastModified(333L, true);
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {}
-
-        assertEquals(batchMeta.getLastModified(), Long.valueOf(333L));
-    }
-
-    @Test
-    public void testSetLastModified() {
-        assertEquals(batchMeta.getLastModified(), collectionLastModified);
-
-        try {
-            batchMeta.setLastModified(123L, true);
-            assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-            fail("Should not check for modifications on first L-M set");
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {
-            fail("Should not check for modifications on first L-M set");
-        }
-
-        // Now the same, but passing in 'false' for "expecting to change".
-        batchMeta.reset();
-        assertEquals(batchMeta.getLastModified(), collectionLastModified);
-
-        try {
-            batchMeta.setLastModified(123L, false);
-            assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-            fail("Should not check for modifications on first L-M set");
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {
-            fail("Should not check for modifications on first L-M set");
-        }
-
-        // Test that we can't modify L-M when we're not expecting to
-        try {
-            batchMeta.setLastModified(333L, false);
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-            assertTrue("Must throw when L-M changes unexpectedly", true);
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {
-            fail("Not expecting did-not-change throw");
-        }
-        assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
-
-        // Test that we can modify L-M when we're expecting to
-        try {
-            batchMeta.setLastModified(333L, true);
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-            fail("Not expecting changed-unexpectedly throw");
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {
-            fail("Not expecting did-not-change throw");
-        }
-        assertEquals(batchMeta.getLastModified(), Long.valueOf(333L));
-
-        // Test that we catch L-M modifications that expect to change but actually don't
-        try {
-            batchMeta.setLastModified(333L, true);
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-            fail("Not expecting changed-unexpectedly throw");
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {
-            assertTrue("Expected-to-change-but-did-not-change didn't throw", true);
-        }
-        assertEquals(batchMeta.getLastModified(), Long.valueOf(333));
-    }
-
-    @Test
-    public void testSetToken() {
-        assertNull(batchMeta.getToken());
-
-        try {
-            batchMeta.setToken("MTIzNA", false);
-        } catch (BatchingUploader.TokenModifiedException e) {
-            fail("Should be able to set token for the first time");
-        }
-        assertEquals("MTIzNA", batchMeta.getToken());
-
-        try {
-            batchMeta.setToken("XYCvNA", false);
-        } catch (BatchingUploader.TokenModifiedException e) {
-            assertTrue("Should not be able to modify a token", true);
-        }
-        assertEquals("MTIzNA", batchMeta.getToken());
-
-        try {
-            batchMeta.setToken("XYCvNA", true);
-        } catch (BatchingUploader.TokenModifiedException e) {
-            assertTrue("Should catch non-null tokens during onCommit sets", true);
-        }
-        assertEquals("MTIzNA", batchMeta.getToken());
-
-        try {
-            batchMeta.setToken(null, true);
-        } catch (BatchingUploader.TokenModifiedException e) {
-            fail("Should be able to set token to null during onCommit set");
-        }
-        assertNull(batchMeta.getToken());
-    }
-
-    @Test
-    public void testRecordSucceeded() {
-        assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
-
-        batchMeta.recordSucceeded("guid1");
-
-        assertTrue(batchMeta.getSuccessRecordGuids().size() == 1);
-        assertTrue(batchMeta.getSuccessRecordGuids().contains("guid1"));
-
-        try {
-            batchMeta.recordSucceeded(null);
-            fail();
-        } catch (IllegalStateException e) {
-            assertTrue("Should not be able to 'succeed' a null guid", true);
-        }
+        uploaderMeta = new UploaderMeta(lock, byteLimit, recordLimit);
     }
 
     @Test
     public void testByteLimits() {
-        assertTrue(batchMeta.canFit(0));
+        assertTrue(uploaderMeta.canFit(0));
 
         // Should just fit
-        assertTrue(batchMeta.canFit(byteLimit - BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
+        assertTrue(uploaderMeta.canFit(byteLimit - BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
 
         // Can't fit a record due to payload overhead.
-        assertFalse(batchMeta.canFit(byteLimit));
+        assertFalse(uploaderMeta.canFit(byteLimit));
 
-        assertFalse(batchMeta.canFit(byteLimit + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
-        assertFalse(batchMeta.canFit(byteLimit * 1000));
+        assertFalse(uploaderMeta.canFit(byteLimit + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
+        assertFalse(uploaderMeta.canFit(byteLimit * 1000));
 
         long recordDelta = byteLimit / 2;
-        assertFalse(batchMeta.addAndEstimateIfFull(recordDelta));
+        assertFalse(uploaderMeta.addAndEstimateIfFull(recordDelta));
 
         // Record delta shouldn't fit due to payload overhead.
-        assertFalse(batchMeta.canFit(recordDelta));
+        assertFalse(uploaderMeta.canFit(recordDelta));
     }
 
     @Test
     public void testCountLimits() {
         // Our record limit is 5, let's add 4.
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
+        assertFalse(uploaderMeta.addAndEstimateIfFull(1));
+        assertFalse(uploaderMeta.addAndEstimateIfFull(1));
+        assertFalse(uploaderMeta.addAndEstimateIfFull(1));
+        assertFalse(uploaderMeta.addAndEstimateIfFull(1));
 
         // 5th record still fits in
-        assertTrue(batchMeta.canFit(1));
+        assertTrue(uploaderMeta.canFit(1));
 
         // Add the 5th record
-        assertTrue(batchMeta.addAndEstimateIfFull(1));
+        assertTrue(uploaderMeta.addAndEstimateIfFull(1));
 
         // 6th record won't fit
-        assertFalse(batchMeta.canFit(1));
+        assertFalse(uploaderMeta.canFit(1));
     }
 
     @Test
     public void testNeedCommit() {
-        assertFalse(batchMeta.needToCommit());
+        assertFalse(uploaderMeta.needToCommit());
 
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
+        assertFalse(uploaderMeta.addAndEstimateIfFull(1));
 
-        assertTrue(batchMeta.needToCommit());
+        assertTrue(uploaderMeta.needToCommit());
 
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
+        assertFalse(uploaderMeta.addAndEstimateIfFull(1));
+        assertFalse(uploaderMeta.addAndEstimateIfFull(1));
+        assertFalse(uploaderMeta.addAndEstimateIfFull(1));
 
-        assertTrue(batchMeta.needToCommit());
-
-        batchMeta.reset();
-
-        assertFalse(batchMeta.needToCommit());
+        assertTrue(uploaderMeta.needToCommit());
     }
 
     @Test
     public void testAdd() {
         // Ensure we account for payload overhead twice when the batch is empty.
         // Payload overhead is either RECORDS_START or RECORDS_END, and for an empty payload
         // we need both.
-        assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
-        assertTrue(batchMeta.getRecordCount() == 0);
+        assertTrue(uploaderMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
+        assertTrue(uploaderMeta.getRecordCount() == 0);
 
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
+        assertFalse(uploaderMeta.addAndEstimateIfFull(1));
 
-        assertTrue(batchMeta.getByteCount() == (1 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
-        assertTrue(batchMeta.getRecordCount() == 1);
+        assertTrue(uploaderMeta.getByteCount() == (1 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
+        assertTrue(uploaderMeta.getRecordCount() == 1);
 
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
+        assertFalse(uploaderMeta.addAndEstimateIfFull(1));
+        assertFalse(uploaderMeta.addAndEstimateIfFull(1));
+        assertFalse(uploaderMeta.addAndEstimateIfFull(1));
 
-        assertTrue(batchMeta.getByteCount() == (4 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
-        assertTrue(batchMeta.getRecordCount() == 4);
+        assertTrue(uploaderMeta.getByteCount() == (4 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
+        assertTrue(uploaderMeta.getRecordCount() == 4);
 
-        assertTrue(batchMeta.addAndEstimateIfFull(1));
+        assertTrue(uploaderMeta.addAndEstimateIfFull(1));
 
         try {
-            assertTrue(batchMeta.addAndEstimateIfFull(1));
+            assertTrue(uploaderMeta.addAndEstimateIfFull(1));
             fail("BatchMeta should not let us insert records that won't fit");
         } catch (IllegalStateException e) {
             assertTrue(true);
         }
     }
-
-    @Test
-    public void testReset() {
-        assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
-        assertTrue(batchMeta.getRecordCount() == 0);
-        assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
-
-        // Shouldn't throw even if already empty
-        batchMeta.reset();
-        assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
-        assertTrue(batchMeta.getRecordCount() == 0);
-        assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
-
-        assertFalse(batchMeta.addAndEstimateIfFull(1));
-        batchMeta.recordSucceeded("guid1");
-        try {
-            batchMeta.setToken("MTIzNA", false);
-        } catch (BatchingUploader.TokenModifiedException e) {}
-        try {
-            batchMeta.setLastModified(333L, true);
-        } catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
-        } catch (BatchingUploader.LastModifiedDidNotChange e) {}
-        assertEquals(Long.valueOf(333L), batchMeta.getLastModified());
-        assertEquals("MTIzNA", batchMeta.getToken());
-        assertTrue(batchMeta.getSuccessRecordGuids().size() == 1);
-
-        batchMeta.reset();
-
-        // Counts must be reset
-        assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
-        assertTrue(batchMeta.getRecordCount() == 0);
-        assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
-
-        // Collection L-M shouldn't change
-        assertEquals(batchMeta.collectionLastModified, collectionLastModified);
-
-        // Token must be reset
-        assertNull(batchMeta.getToken());
-
-        // L-M must be reverted to collection L-M
-        assertEquals(batchMeta.getLastModified(), collectionLastModified);
-    }
 }
\ No newline at end of file