Bug 730142 - Download batching. r=Grisha,rnewman
authordlim@mozilla.com <dlim@mozilla.com>
Fri, 30 Sep 2016 09:33:08 -0700
changeset 316003 b828802f32e5d540773329ad6a4945d98d53b72d
parent 316002 2fe60d27c7805da856f286548e1d20e4413b56ca
child 316004 22eee779473b155f28aab3fd9a5248d1d0646a56
push id30758
push userphilringnalda@gmail.com
push dateSat, 01 Oct 2016 06:23:57 +0000
treeherdermozilla-central@b0706e5d7ae3 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersGrisha, rnewman
bugs730142
milestone52.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 730142 - Download batching. r=Grisha,rnewman MozReview-Commit-ID: BhMmynysoKa
mobile/android/base/android-services.mozbuild
mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/MozResponse.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncResponse.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConstrainedServer11Repository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11Repository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11RepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/SafeConstrainedServer11Repository.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer11Repository.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer11RepositorySession.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/MockServer.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/test/TestSafeConstrainedServer11Repository.java
--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -985,16 +985,18 @@ sync_java_files = [TOPSRCDIR + '/mobile/
     'sync/repositories/domain/HistoryRecordFactory.java',
     'sync/repositories/domain/PasswordRecord.java',
     'sync/repositories/domain/PasswordRecordFactory.java',
     'sync/repositories/domain/Record.java',
     'sync/repositories/domain/RecordParseException.java',
     'sync/repositories/domain/TabsRecord.java',
     'sync/repositories/domain/TabsRecordFactory.java',
     'sync/repositories/domain/VersionConstants.java',
+    'sync/repositories/downloaders/BatchingDownloader.java',
+    'sync/repositories/downloaders/BatchingDownloaderDelegate.java',
     'sync/repositories/FetchFailedException.java',
     'sync/repositories/HashSetStoreTracker.java',
     'sync/repositories/HistoryRepository.java',
     'sync/repositories/IdentityRecordFactory.java',
     'sync/repositories/InactiveSessionException.java',
     'sync/repositories/InvalidBookmarkTypeException.java',
     'sync/repositories/InvalidRequestException.java',
     'sync/repositories/InvalidSessionTransitionException.java',
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/MozResponse.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/MozResponse.java
@@ -144,17 +144,17 @@ public class MozResponse {
   protected boolean hasHeader(String h) {
     return this.response.containsHeader(h);
   }
 
   public MozResponse(HttpResponse res) {
     response = res;
   }
 
-  private String getNonMissingHeader(String h) {
+  protected String getNonMissingHeader(String h) {
     if (!this.hasHeader(h)) {
       return null;
     }
 
     final Header header = this.response.getFirstHeader(h);
     final String value = header.getValue();
     if (missingHeader(value)) {
       Logger.warn(LOG_TAG, h + " header present but empty.");
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncResponse.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/net/SyncResponse.java
@@ -1,26 +1,29 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.net;
 
+import android.support.annotation.Nullable;
+
 import org.mozilla.gecko.sync.Utils;
 
 import ch.boye.httpclientandroidlib.HttpResponse;
 
 public class SyncResponse extends MozResponse {
   public static final String X_WEAVE_BACKOFF = "x-weave-backoff";
   public static final String X_BACKOFF = "x-backoff";
   public static final String X_LAST_MODIFIED = "x-last-modified";
   public static final String X_WEAVE_TIMESTAMP = "x-weave-timestamp";
   public static final String X_WEAVE_RECORDS = "x-weave-records";
   public static final String X_WEAVE_QUOTA_REMAINING = "x-weave-quota-remaining";
   public static final String X_WEAVE_ALERT = "x-weave-alert";
+  public static final String X_WEAVE_NEXT_OFFSET = "x-weave-next-offset";
 
   public SyncResponse(HttpResponse res) {
     super(res);
   }
 
   /**
    * @return A number of seconds, or -1 if the 'X-Weave-Backoff' header was not
    *         present.
@@ -115,14 +118,40 @@ public class SyncResponse extends MozRes
     return this.getIntegerHeader(X_WEAVE_RECORDS);
   }
 
   public int weaveQuotaRemaining() throws NumberFormatException {
     return this.getIntegerHeader(X_WEAVE_QUOTA_REMAINING);
   }
 
   public String weaveAlert() {
-    if (this.hasHeader(X_WEAVE_ALERT)) {
-      return this.response.getFirstHeader(X_WEAVE_ALERT).getValue();
-    }
-    return null;
+    return this.getNonMissingHeader(X_WEAVE_ALERT);
+  }
+
+  /**
+   * This header may be sent back with multi-record responses where the request included a limit parameter.
+   * Its presence indicates that the number of available records exceeded the given limit.
+   * The value from this header can be passed back in the offset parameter to retrieve additional records.
+   * The value of this header will always be a string of characters from the urlsafe-base64 alphabet.
+   * The specific contents of the string are an implementation detail of the server,
+   * so clients should treat it as an opaque token.
+   *
+   * @return the offset header
+   */
+  public String weaveOffset() {
+    return this.getNonMissingHeader(X_WEAVE_NEXT_OFFSET);
+  }
+
+  /**
+   * This header gives the last-modified time of the target resource as seen during processing of the request,
+   * and will be included in all success responses (200, 201, 204).
+   * When given in response to a write request, this will be equal to the server’s current time and
+   * to the new last-modified time of any BSOs created or changed by the request.
+   * It is similar to the standard HTTP Last-Modified header,
+   * but the value is a decimal timestamp rather than a HTTP-format date.
+   *
+   * @return the last modified header
+   */
+  @Nullable
+  public String lastModified() {
+    return this.getNonMissingHeader(X_LAST_MODIFIED);
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConstrainedServer11Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConstrainedServer11Repository.java
@@ -6,34 +6,46 @@ package org.mozilla.gecko.sync.repositor
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 
 /**
- * A kind of Server11Repository that supports explicit setting of limit and sort on operations.
+ * A kind of Server11Repository that supports explicit setting of total fetch limit, per-batch fetch limit, and a sort order.
  *
  * @author rnewman
  *
  */
 public class ConstrainedServer11Repository extends Server11Repository {
 
-  private String sort = null;
-  private long limit  = -1;
+  private final String sort;
+  private final long batchLimit;
+  private final long totalLimit;
 
-  public ConstrainedServer11Repository(String collection, String storageURL, AuthHeaderProvider authHeaderProvider, InfoCollections infoCollections, InfoConfiguration infoConfiguration, long limit, String sort) throws URISyntaxException {
+  public ConstrainedServer11Repository(String collection, String storageURL,
+                                       AuthHeaderProvider authHeaderProvider,
+                                       InfoCollections infoCollections,
+                                       InfoConfiguration infoConfiguration,
+                                       long batchLimit, long totalLimit, String sort)
+          throws URISyntaxException {
     super(collection, storageURL, authHeaderProvider, infoCollections, infoConfiguration);
-    this.limit = limit;
+    this.batchLimit = batchLimit;
+    this.totalLimit = totalLimit;
     this.sort  = sort;
   }
 
   @Override
-  protected String getDefaultSort() {
+  public String getDefaultSort() {
     return sort;
   }
 
   @Override
-  protected long getDefaultFetchLimit() {
-    return limit;
+  public long getDefaultBatchLimit() {
+    return batchLimit;
+  }
+
+  @Override
+  public long getDefaultTotalLimit() {
+    return totalLimit;
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11Repository.java
@@ -63,17 +63,17 @@ public class Server11Repository extends 
                             Context context) {
     delegate.onSessionCreated(new Server11RepositorySession(this));
   }
 
   public URI collectionURI() {
     return this.collectionURI;
   }
 
-  public URI collectionURI(boolean full, long newer, long limit, String sort, String ids) throws URISyntaxException {
+  public URI collectionURI(boolean full, long newer, long limit, String sort, String ids, String offset) throws URISyntaxException {
     ArrayList<String> params = new ArrayList<String>();
     if (full) {
       params.add("full=1");
     }
     if (newer >= 0) {
       // Translate local millisecond timestamps into server decimal seconds.
       String newerString = Utils.millisecondsToDecimalSecondsString(newer);
       params.add("newer=" + newerString);
@@ -82,17 +82,20 @@ public class Server11Repository extends 
       params.add("limit=" + limit);
     }
     if (sort != null) {
       params.add("sort=" + sort);       // We trust these values.
     }
     if (ids != null) {
       params.add("ids=" + ids);         // We trust these values.
     }
-
+    if (offset != null) {
+      // Offset comes straight out of HTTP headers and it is the responsibility of the caller to URI-escape it.
+      params.add("offset=" + offset);
+    }
     if (params.size() == 0) {
       return this.collectionURI;
     }
 
     StringBuilder out = new StringBuilder();
     char indicator = '?';
     for (String param : params) {
       out.append(indicator);
@@ -104,25 +107,29 @@ public class Server11Repository extends 
   }
 
   public URI wboURI(String id) throws URISyntaxException {
     return new URI(this.collectionURI + "/" + id);
   }
 
   // Override these.
   @SuppressWarnings("static-method")
-  protected long getDefaultFetchLimit() {
+  public long getDefaultBatchLimit() {
     return -1;
   }
 
   @SuppressWarnings("static-method")
-  protected String getDefaultSort() {
+  public String getDefaultSort() {
     return null;
   }
 
+  public long getDefaultTotalLimit() {
+    return -1;
+  }
+
   public AuthHeaderProvider getAuthHeaderProvider() {
     return authHeaderProvider;
   }
 
   public boolean updateNeeded(long lastSyncTimestamp) {
     return infoCollections.updateNeeded(collection, lastSyncTimestamp);
   }
 
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11RepositorySession.java
@@ -1,251 +1,70 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.repositories;
 
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
 import org.mozilla.gecko.background.common.log.Logger;
-import org.mozilla.gecko.sync.CryptoRecord;
-import org.mozilla.gecko.sync.DelayedWorkTracker;
-import org.mozilla.gecko.sync.HTTPFailureException;
-import org.mozilla.gecko.sync.crypto.KeyBundle;
-import org.mozilla.gecko.sync.net.AuthHeaderProvider;
-import org.mozilla.gecko.sync.net.SyncResponse;
-import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
-import org.mozilla.gecko.sync.net.SyncStorageResponse;
-import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
 import org.mozilla.gecko.sync.repositories.domain.Record;
+import org.mozilla.gecko.sync.repositories.downloaders.BatchingDownloader;
 import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader;
 
 public class Server11RepositorySession extends RepositorySession {
   public static final String LOG_TAG = "Server11Session";
 
-  /**
-   * Used to track outstanding requests, so that we can abort them as needed.
-   */
-  private final Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
-
-  @Override
-  public void abort() {
-    super.abort();
-    for (SyncStorageCollectionRequest request : pending) {
-      request.abort();
-    }
-    pending.clear();
-  }
-
-  /**
-   * Convert HTTP request delegate callbacks into fetch callbacks within the
-   * context of this RepositorySession.
-   *
-   * @author rnewman
-   *
-   */
-  public class RequestFetchDelegateAdapter extends WBOCollectionRequestDelegate {
-    RepositorySessionFetchRecordsDelegate delegate;
-    private final DelayedWorkTracker workTracker = new DelayedWorkTracker();
-
-    // So that we can clean up.
-    private SyncStorageCollectionRequest request;
-
-    public void setRequest(SyncStorageCollectionRequest request) {
-      this.request = request;
-    }
-    private void removeRequestFromPending() {
-      if (this.request == null) {
-        return;
-      }
-      pending.remove(this.request);
-      this.request = null;
-    }
-
-    public RequestFetchDelegateAdapter(RepositorySessionFetchRecordsDelegate delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    public AuthHeaderProvider getAuthHeaderProvider() {
-      return serverRepository.getAuthHeaderProvider();
-    }
-
-    @Override
-    public String ifUnmodifiedSince() {
-      return null;
-    }
-
-    @Override
-    public void handleRequestSuccess(SyncStorageResponse response) {
-      Logger.debug(LOG_TAG, "Fetch done.");
-      removeRequestFromPending();
-
-      // This will change overall and will use X_LAST_MODIFIED in Bug 730142.
-      final long normalizedTimestamp = response.normalizedTimestampForHeader(SyncResponse.X_WEAVE_TIMESTAMP);
-      Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp);
-
-      // When we're done processing other events, finish.
-      workTracker.delayWorkItem(new Runnable() {
-        @Override
-        public void run() {
-          Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
-          // TODO: verify number of returned records.
-          delegate.onFetchCompleted(normalizedTimestamp);
-        }
-      });
-    }
-
-    @Override
-    public void handleRequestFailure(SyncStorageResponse response) {
-      // TODO: ensure that delegate methods don't get called more than once.
-      this.handleRequestError(new HTTPFailureException(response));
-    }
-
-    @Override
-    public void handleRequestError(final Exception ex) {
-      removeRequestFromPending();
-      Logger.warn(LOG_TAG, "Got request error.", ex);
-      // When we're done processing other events, finish.
-      workTracker.delayWorkItem(new Runnable() {
-        @Override
-        public void run() {
-          Logger.debug(LOG_TAG, "Running onFetchFailed.");
-          delegate.onFetchFailed(ex, null);
-        }
-      });
-    }
-
-    @Override
-    public void handleWBO(CryptoRecord record) {
-      workTracker.incrementOutstanding();
-      try {
-        delegate.onFetchedRecord(record);
-      } catch (Exception ex) {
-        Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
-        // TODO: handle this better.
-        throw new RuntimeException(ex);
-      } finally {
-        workTracker.decrementOutstanding();
-      }
-    }
-
-    // TODO: this implies that we've screwed up our inheritance chain somehow.
-    @Override
-    public KeyBundle keyBundle() {
-      return null;
-    }
-  }
-
   Server11Repository serverRepository;
   private BatchingUploader uploader;
+  private final BatchingDownloader downloader;
 
   public Server11RepositorySession(Repository repository) {
     super(repository);
     serverRepository = (Server11Repository) repository;
+    this.downloader = new BatchingDownloader(serverRepository, this);
   }
 
   public Server11Repository getServerRepository() {
     return serverRepository;
   }
 
   @Override
   public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
     this.delegate = delegate;
 
     // Now that we have the delegate, we can initialize our uploader.
     this.uploader = new BatchingUploader(this, storeWorkQueue, delegate);
   }
 
-  private String flattenIDs(String[] guids) {
-    // Consider using Utils.toDelimitedString if and when the signature changes
-    // to Collection<String> guids.
-    if (guids.length == 0) {
-      return "";
-    }
-    if (guids.length == 1) {
-      return guids[0];
-    }
-    StringBuilder b = new StringBuilder();
-    for (String guid : guids) {
-      b.append(guid);
-      b.append(",");
-    }
-    return b.substring(0, b.length() - 1);
-  }
-
   @Override
   public void guidsSince(long timestamp,
                          RepositorySessionGuidsSinceDelegate delegate) {
     // TODO Auto-generated method stub
 
   }
 
-  protected void fetchWithParameters(long newer,
-                                     long limit,
-                                     boolean full,
-                                     String sort,
-                                     String ids,
-                                     RequestFetchDelegateAdapter delegate)
-                                         throws URISyntaxException {
-
-    URI collectionURI = serverRepository.collectionURI(full, newer, limit, sort, ids);
-    SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(collectionURI);
-    request.delegate = delegate;
-
-    // So it can clean up.
-    delegate.setRequest(request);
-    pending.add(request);
-    request.get();
-  }
-
-  public void fetchSince(long timestamp, long limit, String sort, RepositorySessionFetchRecordsDelegate delegate) {
-    try {
-      this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate));
-    } catch (URISyntaxException e) {
-      delegate.onFetchFailed(e, null);
-    }
-  }
-
   @Override
   public void fetchSince(long timestamp,
                          RepositorySessionFetchRecordsDelegate delegate) {
-    try {
-      long limit = serverRepository.getDefaultFetchLimit();
-      String sort = serverRepository.getDefaultSort();
-      this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate));
-    } catch (URISyntaxException e) {
-      delegate.onFetchFailed(e, null);
-    }
+    this.downloader.fetchSince(timestamp, delegate);
   }
 
   @Override
   public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) {
     this.fetchSince(-1, delegate);
   }
 
   @Override
   public void fetch(String[] guids,
                     RepositorySessionFetchRecordsDelegate delegate) {
-    // TODO: watch out for URL length limits!
-    try {
-      String ids = flattenIDs(guids);
-      this.fetchWithParameters(-1, -1, true, "index", ids, new RequestFetchDelegateAdapter(delegate));
-    } catch (URISyntaxException e) {
-      delegate.onFetchFailed(e, null);
-    }
+    this.downloader.fetch(guids, delegate);
   }
 
   @Override
   public void wipe(RepositorySessionWipeDelegate delegate) {
     if (!isActive()) {
       delegate.onWipeFailed(new InactiveSessionException(null));
       return;
     }
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
@@ -0,0 +1,310 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.downloaders;
+
+import android.support.annotation.Nullable;
+import android.support.annotation.VisibleForTesting;
+
+import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.CryptoRecord;
+import org.mozilla.gecko.sync.DelayedWorkTracker;
+import org.mozilla.gecko.sync.net.SyncResponse;
+import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
+import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.Server11Repository;
+import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Batching Downloader, which implements batching protocol as supported by Sync 1.5.
+ *
+ * Downloader's batching behaviour is configured via two parameters, obtained from the repository:
+ * - Per-batch limit, which specified how many records may be fetched in an individual GET request.
+ * - Total limit, which controls number of batch GET requests we will make.
+ *
+ *
+ * Batching is implemented via specifying a 'limit' GET parameter, and looking for an 'offset' token
+ * in the response. If offset token is present, this indicates that there are more records than what
+ * we've received so far, and we perform an additional fetch. Batching stops when either we hit a total
+ * limit, or offset token is no longer present (indicating that we're done).
+ *
+ * For unlimited repositories (such as passwords), both of these value will be -1. Downloader will not
+ * specify a limit parameter in this case, and the response will contain every record available and no
+ * offset token, thus fully completing in one go.
+ *
+ * In between batches, we maintain a Last-Modified timestamp, based off the value return in the header
+ * of the first response. Every response will have a Last-Modified header, indicating when the collection
+ * was modified last. We pass along this header in our subsequent requests in a X-If-Unmodified-Since
+ * header. Server will ensure that our collection did not change while we are batching, if it did it will
+ * fail our fetch with a 412 (Consequent Modification) error. Additionally, we perform the same checks
+ * locally.
+ */
+public class BatchingDownloader {
+    public static final String LOG_TAG = "BatchingDownloader";
+
+    protected final Server11Repository repository;
+    private final Server11RepositorySession repositorySession;
+    private final DelayedWorkTracker workTracker = new DelayedWorkTracker();
+    // Used to track outstanding requests, so that we can abort them as needed.
+    @VisibleForTesting
+    protected final Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
+    /* @GuardedBy("this") */ private String lastModified;
+    /* @GuardedBy("this") */ private long numRecords = 0;
+
+    public BatchingDownloader(final Server11Repository repository, final Server11RepositorySession repositorySession) {
+        this.repository = repository;
+        this.repositorySession = repositorySession;
+    }
+
+    @VisibleForTesting
+    protected static String flattenIDs(String[] guids) {
+        // Consider using Utils.toDelimitedString if and when the signature changes
+        // to Collection<String> guids.
+        if (guids.length == 0) {
+            return "";
+        }
+        if (guids.length == 1) {
+            return guids[0];
+        }
+        // Assuming 12-char GUIDs. There should be a -1 in there, but we accumulate one comma too many.
+        StringBuilder b = new StringBuilder(guids.length * 12 + guids.length);
+        for (String guid : guids) {
+            b.append(guid);
+            b.append(",");
+        }
+        return b.substring(0, b.length() - 1);
+    }
+
+    @VisibleForTesting
+    protected void fetchWithParameters(long newer,
+                                    long batchLimit,
+                                    boolean full,
+                                    String sort,
+                                    String ids,
+                                    SyncStorageCollectionRequest request,
+                                    RepositorySessionFetchRecordsDelegate fetchRecordsDelegate)
+            throws URISyntaxException, UnsupportedEncodingException {
+        if (batchLimit > repository.getDefaultTotalLimit()) {
+            throw new IllegalArgumentException("Batch limit should not be greater than total limit");
+        }
+
+        request.delegate = new BatchingDownloaderDelegate(this, fetchRecordsDelegate, request,
+                newer, batchLimit, full, sort, ids);
+        this.pending.add(request);
+        request.get();
+    }
+
+    @VisibleForTesting
+    @Nullable
+    protected String encodeParam(String param) throws UnsupportedEncodingException {
+        if (param != null) {
+            return URLEncoder.encode(param, "UTF-8");
+        }
+        return null;
+    }
+
+    @VisibleForTesting
+    protected SyncStorageCollectionRequest makeSyncStorageCollectionRequest(long newer,
+                                                  long batchLimit,
+                                                  boolean full,
+                                                  String sort,
+                                                  String ids,
+                                                  String offset)
+            throws URISyntaxException, UnsupportedEncodingException {
+        URI collectionURI = repository.collectionURI(full, newer, batchLimit, sort, ids, encodeParam(offset));
+        Logger.debug(LOG_TAG, collectionURI.toString());
+
+        return new SyncStorageCollectionRequest(collectionURI);
+    }
+
+    public void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
+        this.fetchSince(timestamp, null, fetchRecordsDelegate);
+    }
+
+    private void fetchSince(long timestamp, String offset,
+                           RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
+        long batchLimit = repository.getDefaultBatchLimit();
+        String sort = repository.getDefaultSort();
+
+        try {
+            SyncStorageCollectionRequest request = makeSyncStorageCollectionRequest(timestamp,
+                    batchLimit, true, sort, null, offset);
+            this.fetchWithParameters(timestamp, batchLimit, true, sort, null, request, fetchRecordsDelegate);
+        } catch (URISyntaxException | UnsupportedEncodingException e) {
+            fetchRecordsDelegate.onFetchFailed(e, null);
+        }
+    }
+
+    public void fetch(String[] guids, RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
+        String ids = flattenIDs(guids);
+        String index = "index";
+
+        try {
+            SyncStorageCollectionRequest request = makeSyncStorageCollectionRequest(
+                    -1, -1, true, index, ids, null);
+            this.fetchWithParameters(-1, -1, true, index, ids, request, fetchRecordsDelegate);
+        } catch (URISyntaxException | UnsupportedEncodingException e) {
+            fetchRecordsDelegate.onFetchFailed(e, null);
+        }
+    }
+
+    public Server11Repository getServerRepository() {
+        return this.repository;
+    }
+
+    public void onFetchCompleted(SyncStorageResponse response,
+                                 final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+                                 final SyncStorageCollectionRequest request, long newer,
+                                 long limit, boolean full, String sort, String ids) {
+        removeRequestFromPending(request);
+
+        // When we process our first request, we get back a X-Last-Modified header indicating when collection was modified last.
+        // We pass it to the server with every subsequent request (if we need to make more) as the X-If-Unmodified-Since header,
+        // and server is supposed to ensure that this pre-condition is met, and fail our request with a 412 error code otherwise.
+        // So, if all of this happens, these checks should never fail.
+        // However, we also track this header in client side, and can defensively validate against it here as well.
+        final String currentLastModifiedTimestamp = response.lastModified();
+        Logger.debug(LOG_TAG, "Last modified timestamp " + currentLastModifiedTimestamp);
+
+        // Sanity check. We also did a null check in delegate before passing it into here.
+        if (currentLastModifiedTimestamp == null) {
+            this.abort(fetchRecordsDelegate, "Last modified timestamp is missing");
+            return;
+        }
+
+        final boolean lastModifiedChanged;
+        synchronized (this) {
+            if (this.lastModified == null) {
+                // First time seeing last modified timestamp.
+                this.lastModified = currentLastModifiedTimestamp;
+            }
+            lastModifiedChanged = !this.lastModified.equals(currentLastModifiedTimestamp);
+        }
+
+        if (lastModifiedChanged) {
+            this.abort(fetchRecordsDelegate, "Last modified timestamp has changed unexpectedly");
+            return;
+        }
+
+        final boolean hasNotReachedLimit;
+        synchronized (this) {
+            this.numRecords += response.weaveRecords();
+            hasNotReachedLimit = this.numRecords < repository.getDefaultTotalLimit();
+        }
+
+        final String offset = response.weaveOffset();
+        final SyncStorageCollectionRequest newRequest;
+        try {
+            newRequest = makeSyncStorageCollectionRequest(newer,
+                    limit, full, sort, ids, offset);
+        } catch (final URISyntaxException | UnsupportedEncodingException e) {
+            this.workTracker.delayWorkItem(new Runnable() {
+                @Override
+                public void run() {
+                    Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
+                    fetchRecordsDelegate.onFetchFailed(e, null);
+                }
+            });
+            return;
+        }
+
+        if (offset != null && hasNotReachedLimit) {
+            try {
+                this.fetchWithParameters(newer, limit, full, sort, ids, newRequest, fetchRecordsDelegate);
+            } catch (final URISyntaxException | UnsupportedEncodingException e) {
+                this.workTracker.delayWorkItem(new Runnable() {
+                    @Override
+                    public void run() {
+                        Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
+                        fetchRecordsDelegate.onFetchFailed(e, null);
+                    }
+                });
+            }
+            return;
+        }
+
+        final long normalizedTimestamp = response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED);
+        Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp);
+
+        this.workTracker.delayWorkItem(new Runnable() {
+            @Override
+            public void run() {
+                Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
+                fetchRecordsDelegate.onFetchCompleted(normalizedTimestamp);
+            }
+        });
+    }
+
+    public void onFetchFailed(final Exception ex,
+                              final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+                              final SyncStorageCollectionRequest request) {
+        removeRequestFromPending(request);
+        this.workTracker.delayWorkItem(new Runnable() {
+            @Override
+            public void run() {
+                Logger.debug(LOG_TAG, "Running onFetchFailed.");
+                fetchRecordsDelegate.onFetchFailed(ex, null);
+            }
+        });
+    }
+
+    public void onFetchedRecord(CryptoRecord record,
+                                RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
+        this.workTracker.incrementOutstanding();
+        try {
+            fetchRecordsDelegate.onFetchedRecord(record);
+        } catch (Exception ex) {
+            Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
+            throw new RuntimeException(ex);
+        } finally {
+            this.workTracker.decrementOutstanding();
+        }
+    }
+
+    private void removeRequestFromPending(SyncStorageCollectionRequest request) {
+        if (request == null) {
+            return;
+        }
+        this.pending.remove(request);
+    }
+
+    @VisibleForTesting
+    protected void abortRequests() {
+        this.repositorySession.abort();
+        synchronized (this.pending) {
+            for (SyncStorageCollectionRequest request : this.pending) {
+                request.abort();
+            }
+            this.pending.clear();
+        }
+    }
+
+    @Nullable
+    protected synchronized String getLastModified() {
+        return this.lastModified;
+    }
+
+    private void abort(final RepositorySessionFetchRecordsDelegate delegate, final String msg) {
+        Logger.error(LOG_TAG, msg);
+        this.abortRequests();
+        this.workTracker.delayWorkItem(new Runnable() {
+            @Override
+            public void run() {
+                Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
+                delegate.onFetchFailed(
+                        new IllegalStateException(msg),
+                        null);
+            }
+        });
+    }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java
@@ -0,0 +1,91 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.downloaders;
+
+import org.mozilla.gecko.background.common.log.Logger;
+import org.mozilla.gecko.sync.CryptoRecord;
+import org.mozilla.gecko.sync.HTTPFailureException;
+import org.mozilla.gecko.sync.crypto.KeyBundle;
+import org.mozilla.gecko.sync.net.AuthHeaderProvider;
+import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
+import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
+
+/**
+ * Delegate that gets passed into fetch methods to handle server response from fetch.
+ */
+public class BatchingDownloaderDelegate extends WBOCollectionRequestDelegate {
+    public static final String LOG_TAG = "BatchingDownloaderDelegate";
+
+    private BatchingDownloader downloader;
+    private RepositorySessionFetchRecordsDelegate fetchRecordsDelegate;
+    public SyncStorageCollectionRequest request;
+    // Used to pass back to BatchDownloader to start another fetch with these parameters if needed.
+    private long newer;
+    private long batchLimit;
+    private boolean full;
+    private String sort;
+    private String ids;
+
+    public BatchingDownloaderDelegate(final BatchingDownloader downloader,
+                                      final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+                                      final SyncStorageCollectionRequest request, long newer,
+                                      long batchLimit, boolean full, String sort, String ids) {
+        this.downloader = downloader;
+        this.fetchRecordsDelegate = fetchRecordsDelegate;
+        this.request = request;
+        this.newer = newer;
+        this.batchLimit = batchLimit;
+        this.full = full;
+        this.sort = sort;
+        this.ids = ids;
+    }
+
+    @Override
+    public AuthHeaderProvider getAuthHeaderProvider() {
+        return this.downloader.getServerRepository().getAuthHeaderProvider();
+    }
+
+    @Override
+    public String ifUnmodifiedSince() {
+        return this.downloader.getLastModified();
+    }
+
+    @Override
+    public void handleRequestSuccess(SyncStorageResponse response) {
+        Logger.debug(LOG_TAG, "Fetch done.");
+        if (response.lastModified() != null) {
+            this.downloader.onFetchCompleted(response, this.fetchRecordsDelegate, this.request,
+                    this.newer, this.batchLimit, this.full, this.sort, this.ids);
+            return;
+        }
+        this.downloader.onFetchFailed(
+                new IllegalStateException("Missing last modified header from response"),
+                this.fetchRecordsDelegate,
+                this.request);
+    }
+
+    @Override
+    public void handleRequestFailure(SyncStorageResponse response) {
+        this.handleRequestError(new HTTPFailureException(response));
+    }
+
+    @Override
+    public void handleRequestError(final Exception ex) {
+        Logger.warn(LOG_TAG, "Got request error.", ex);
+        this.downloader.onFetchFailed(ex, this.fetchRecordsDelegate, this.request);
+    }
+
+    @Override
+    public void handleWBO(CryptoRecord record) {
+        this.downloader.onFetchedRecord(record, this.fetchRecordsDelegate);
+    }
+
+    @Override
+    public KeyBundle keyBundle() {
+        return null;
+    }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
@@ -16,17 +16,20 @@ import org.mozilla.gecko.sync.repositori
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class AndroidBrowserBookmarksServerSyncStage extends ServerSyncStage {
   protected static final String LOG_TAG = "BookmarksStage";
 
   // Eventually this kind of sync stage will be data-driven,
   // and all this hard-coding can go away.
   private static final String BOOKMARKS_SORT          = "index";
-  private static final long   BOOKMARKS_REQUEST_LIMIT = 5000;         // Sanity limit.
+  // Sanity limit. Batch and total limit are the same for now, and will be adjusted
+  // once buffer and high water mark are in place. See Bug 730142.
+  private static final long BOOKMARKS_BATCH_LIMIT = 5000;
+  private static final long BOOKMARKS_TOTAL_LIMIT = 5000;
 
   @Override
   protected String getCollection() {
     return "bookmarks";
   }
 
   @Override
   protected String getEngineName() {
@@ -46,17 +49,18 @@ public class AndroidBrowserBookmarksServ
     final JSONRecordFetcher countsFetcher = new JSONRecordFetcher(session.config.infoCollectionCountsURL(), authHeaderProvider);
     String collection = getCollection();
     return new SafeConstrainedServer11Repository(
         collection,
         session.config.storageURL(),
         session.getAuthHeaderProvider(),
         session.config.infoCollections,
         session.config.infoConfiguration,
-        BOOKMARKS_REQUEST_LIMIT,
+        BOOKMARKS_BATCH_LIMIT,
+        BOOKMARKS_TOTAL_LIMIT,
         BOOKMARKS_SORT,
         countsFetcher);
   }
 
   @Override
   protected Repository getLocalRepository() {
     return new AndroidBrowserBookmarksRepository();
   }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
@@ -15,17 +15,20 @@ import org.mozilla.gecko.sync.repositori
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class AndroidBrowserHistoryServerSyncStage extends ServerSyncStage {
   protected static final String LOG_TAG = "HistoryStage";
 
   // Eventually this kind of sync stage will be data-driven,
   // and all this hard-coding can go away.
   private static final String HISTORY_SORT          = "index";
-  private static final long   HISTORY_REQUEST_LIMIT = 250;
+  // Sanity limit. Batch and total limit are the same for now, and will be adjusted
+  // once buffer and high water mark are in place. See Bug 730142.
+  private static final long HISTORY_BATCH_LIMIT = 250;
+  private static final long HISTORY_TOTAL_LIMIT = 250;
 
   @Override
   protected String getCollection() {
     return "history";
   }
 
   @Override
   protected String getEngineName() {
@@ -46,17 +49,18 @@ public class AndroidBrowserHistoryServer
   protected Repository getRemoteRepository() throws URISyntaxException {
     String collection = getCollection();
     return new ConstrainedServer11Repository(
                                              collection,
                                              session.config.storageURL(),
                                              session.getAuthHeaderProvider(),
                                              session.config.infoCollections,
                                              session.config.infoConfiguration,
-                                             HISTORY_REQUEST_LIMIT,
+                                             HISTORY_BATCH_LIMIT,
+                                             HISTORY_TOTAL_LIMIT,
                                              HISTORY_SORT);
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new HistoryRecordFactory();
   }
 
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
@@ -15,17 +15,20 @@ import org.mozilla.gecko.sync.repositori
 import org.mozilla.gecko.sync.repositories.domain.Record;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class FormHistoryServerSyncStage extends ServerSyncStage {
 
   // Eventually this kind of sync stage will be data-driven,
   // and all this hard-coding can go away.
   private static final String FORM_HISTORY_SORT          = "index";
-  private static final long   FORM_HISTORY_REQUEST_LIMIT = 5000;         // Sanity limit.
+  // Sanity limit. Batch and total limit are the same for now, and will be adjusted
+  // once buffer and high water mark are in place. See Bug 730142.
+  private static final long FORM_HISTORY_BATCH_LIMIT = 5000;
+  private static final long FORM_HISTORY_TOTAL_LIMIT = 5000;
 
   @Override
   protected String getCollection() {
     return "forms";
   }
 
   @Override
   protected String getEngineName() {
@@ -41,17 +44,18 @@ public class FormHistoryServerSyncStage 
   protected Repository getRemoteRepository() throws URISyntaxException {
     String collection = getCollection();
     return new ConstrainedServer11Repository(
         collection,
         session.config.storageURL(),
         session.getAuthHeaderProvider(),
         session.config.infoCollections,
         session.config.infoConfiguration,
-        FORM_HISTORY_REQUEST_LIMIT,
+        FORM_HISTORY_BATCH_LIMIT,
+        FORM_HISTORY_TOTAL_LIMIT,
         FORM_HISTORY_SORT);
   }
 
   @Override
   protected Repository getLocalRepository() {
     return new FormHistoryRepositorySession.FormHistoryRepository();
   }
 
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/SafeConstrainedServer11Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/SafeConstrainedServer11Repository.java
@@ -34,31 +34,33 @@ public class SafeConstrainedServer11Repo
   // This can be lazily evaluated if we need it.
   private final JSONRecordFetcher countFetcher;
 
   public SafeConstrainedServer11Repository(String collection,
                                            String storageURL,
                                            AuthHeaderProvider authHeaderProvider,
                                            InfoCollections infoCollections,
                                            InfoConfiguration infoConfiguration,
-                                           long limit,
+                                           long batchLimit,
+                                           long totalLimit,
                                            String sort,
                                            JSONRecordFetcher countFetcher)
     throws URISyntaxException {
-    super(collection, storageURL, authHeaderProvider, infoCollections, infoConfiguration, limit, sort);
+    super(collection, storageURL, authHeaderProvider, infoCollections, infoConfiguration,
+            batchLimit, totalLimit, sort);
     if (countFetcher == null) {
       throw new IllegalArgumentException("countFetcher must not be null");
     }
     this.countFetcher = countFetcher;
   }
 
   @Override
   public void createSession(RepositorySessionCreationDelegate delegate,
                             Context context) {
-    delegate.onSessionCreated(new CountCheckingServer11RepositorySession(this, this.getDefaultFetchLimit()));
+    delegate.onSessionCreated(new CountCheckingServer11RepositorySession(this, this.getDefaultBatchLimit()));
   }
 
   public class CountCheckingServer11RepositorySession extends Server11RepositorySession {
     private static final String LOG_TAG = "CountCheckingServer11RepositorySession";
 
     /**
      * The session will report no data available if this is a first sync
      * and the server has more data available than this limit.
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer11Repository.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer11Repository.java
@@ -26,21 +26,21 @@ public class TestServer11Repository {
   public static void assertQueryEquals(String expected, URI u) {
     Assert.assertEquals(expected, u.getRawQuery());
   }
 
   @SuppressWarnings("static-method")
   @Test
   public void testCollectionURIFull() throws URISyntaxException {
     Server11Repository r = new Server11Repository(COLLECTION, COLLECTION_URL, null, infoCollections, infoConfiguration);
-    assertQueryEquals("full=1&newer=5000.000",              r.collectionURI(true,  5000000L, -1,    null, null));
-    assertQueryEquals("newer=1230.000",                     r.collectionURI(false, 1230000L, -1,    null, null));
-    assertQueryEquals("newer=5000.000&limit=10",            r.collectionURI(false, 5000000L, 10,    null, null));
-    assertQueryEquals("full=1&newer=5000.000&sort=index",   r.collectionURI(true,  5000000L,  0, "index", null));
-    assertQueryEquals("full=1&ids=123,abc",                 r.collectionURI(true,       -1L, -1,    null, "123,abc"));
+    assertQueryEquals("full=1&newer=5000.000",              r.collectionURI(true,  5000000L, -1,    null, null, null));
+    assertQueryEquals("newer=1230.000",                     r.collectionURI(false, 1230000L, -1,    null, null, null));
+    assertQueryEquals("newer=5000.000&limit=10",            r.collectionURI(false, 5000000L, 10,    null, null, null));
+    assertQueryEquals("full=1&newer=5000.000&sort=index",   r.collectionURI(true,  5000000L,  0, "index", null, null));
+    assertQueryEquals("full=1&ids=123,abc",                 r.collectionURI(true,       -1L, -1,    null, "123,abc", null));
   }
 
   @Test
   public void testCollectionURI() throws URISyntaxException {
     Server11Repository noTrailingSlash = new Server11Repository(COLLECTION, COLLECTION_URL, null, infoCollections, infoConfiguration);
     Server11Repository trailingSlash = new Server11Repository(COLLECTION, COLLECTION_URL + "/", null, infoCollections, infoConfiguration);
     Assert.assertEquals("http://foo.com/1.1/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage/bookmarks", noTrailingSlash.collectionURI().toASCIIString());
     Assert.assertEquals("http://foo.com/1.1/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage/bookmarks", trailingSlash.collectionURI().toASCIIString());
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer11RepositorySession.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer11RepositorySession.java
@@ -186,17 +186,17 @@ public class TestServer11RepositorySessi
     };
     final JSONRecordFetcher countsFetcher = new JSONRecordFetcher(LOCAL_COUNTS_URL, getAuthHeaderProvider());
     String collection = "bookmarks";
     final SafeConstrainedServer11Repository remote = new SafeConstrainedServer11Repository(collection,
         getCollectionURL(collection),
         getAuthHeaderProvider(),
         infoCollections,
         infoConfiguration,
-        5000, "sortindex", countsFetcher);
+        5000, 5000, "sortindex", countsFetcher);
 
     data.startHTTPServer(server);
     final AtomicBoolean out = new AtomicBoolean(false);
 
     // Verify that shouldSkip returns true due to a fetch of too large counts,
     // rather than due to a timeout failure waiting to fetch counts.
     try {
       WaitHelper.getTestWaiter().performWait(
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/MockServer.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/MockServer.java
@@ -42,16 +42,17 @@ public class MockServer implements Conta
     response.setValue("Content-Type", contentType);
     response.setValue("Server", "HelloWorld/1.0 (Simple 4.0)");
     response.setDate("Date", time);
     response.setDate("Last-Modified", time);
 
     final String timestampHeader = Utils.millisecondsToDecimalSecondsString(time);
     response.setValue("X-Weave-Timestamp", timestampHeader);
     Logger.debug(LOG_TAG, "> X-Weave-Timestamp header: " + timestampHeader);
+    response.setValue("X-Last-Modified", "12345678");
     return bodyStream;
   }
 
   protected void handle(Request request, Response response, int code, String body) {
     try {
       Logger.debug(LOG_TAG, "Handling request...");
       PrintStream bodyStream = this.handleBasicHeaders(request, response, code, "application/json");
 
new file mode 100644
--- /dev/null
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
@@ -0,0 +1,186 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.downloaders;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.CryptoRecord;
+import org.mozilla.gecko.sync.HTTPFailureException;
+import org.mozilla.gecko.sync.InfoCollections;
+import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.net.SyncResponse;
+import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
+import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.Server11Repository;
+import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import java.net.URI;
+import java.util.concurrent.ExecutorService;
+
+import ch.boye.httpclientandroidlib.ProtocolVersion;
+import ch.boye.httpclientandroidlib.client.ClientProtocolException;
+import ch.boye.httpclientandroidlib.message.BasicHttpResponse;
+import ch.boye.httpclientandroidlib.message.BasicStatusLine;
+
+import static org.junit.Assert.*;
+
+@RunWith(TestRunner.class)
+public class BatchingDownloaderDelegateTest {
+    private Server11Repository server11Repository;
+    private Server11RepositorySession repositorySession;
+    private MockDownloader mockDownloader;
+    private String DEFAULT_COLLECTION_URL = "http://dummy.url/";
+
+    class MockDownloader extends BatchingDownloader {
+        public boolean isSuccess = false;
+        public boolean isFetched = false;
+        public boolean isFailure = false;
+        public Exception ex;
+
+        public MockDownloader(Server11Repository repository, Server11RepositorySession repositorySession) {
+            super(repository, repositorySession);
+        }
+
+        @Override
+        public void onFetchCompleted(SyncStorageResponse response,
+                                     final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+                                     final SyncStorageCollectionRequest request,
+                                     long l, long newerTimestamp, boolean full, String sort, String ids) {
+            this.isSuccess = true;
+        }
+
+        @Override
+        public void onFetchFailed(final Exception ex,
+                                  final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+                                  final SyncStorageCollectionRequest request) {
+            this.isFailure = true;
+            this.ex = ex;
+        }
+
+        @Override
+        public void onFetchedRecord(CryptoRecord record,
+                                    RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
+            this.isFetched = true;
+        }
+    }
+
+    class SimpleSessionFetchRecordsDelegate implements RepositorySessionFetchRecordsDelegate {
+        @Override
+        public void onFetchFailed(Exception ex, Record record) {
+
+        }
+
+        @Override
+        public void onFetchedRecord(Record record) {
+
+        }
+
+        @Override
+        public void onFetchCompleted(long fetchEnd) {
+
+        }
+
+        @Override
+        public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
+            return null;
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        server11Repository = new Server11Repository(
+                "dummyCollection",
+                DEFAULT_COLLECTION_URL,
+                null,
+                new InfoCollections(),
+                new InfoConfiguration());
+        repositorySession = new Server11RepositorySession(server11Repository);
+        mockDownloader = new MockDownloader(server11Repository, repositorySession);
+    }
+
+    @Test
+    public void testIfUnmodifiedSince() throws Exception {
+        BatchingDownloader downloader = new BatchingDownloader(server11Repository, repositorySession);
+        RepositorySessionFetchRecordsDelegate delegate = new SimpleSessionFetchRecordsDelegate();
+        BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(downloader, delegate,
+                new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
+        String lastModified = "12345678";
+        SyncStorageResponse response = makeSyncStorageResponse(200, lastModified);
+        downloaderDelegate.handleRequestSuccess(response);
+        assertEquals(lastModified, downloaderDelegate.ifUnmodifiedSince());
+    }
+
+    @Test
+    public void testSuccess() throws Exception {
+        BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null,
+                new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
+        SyncStorageResponse response = makeSyncStorageResponse(200, "12345678");
+        downloaderDelegate.handleRequestSuccess(response);
+        assertTrue(mockDownloader.isSuccess);
+        assertFalse(mockDownloader.isFailure);
+        assertFalse(mockDownloader.isFetched);
+    }
+
+    @Test
+    public void testFailureMissingLMHeader() throws Exception {
+        BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null,
+                new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
+        SyncStorageResponse response = makeSyncStorageResponse(200, null);
+        downloaderDelegate.handleRequestSuccess(response);
+        assertTrue(mockDownloader.isFailure);
+        assertEquals(IllegalStateException.class, mockDownloader.ex.getClass());
+        assertFalse(mockDownloader.isSuccess);
+        assertFalse(mockDownloader.isFetched);
+    }
+
+    @Test
+    public void testFailureHTTPException() throws Exception {
+        BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null,
+                new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
+        SyncStorageResponse response = makeSyncStorageResponse(400, null);
+        downloaderDelegate.handleRequestFailure(response);
+        assertTrue(mockDownloader.isFailure);
+        assertEquals(HTTPFailureException.class, mockDownloader.ex.getClass());
+        assertFalse(mockDownloader.isSuccess);
+        assertFalse(mockDownloader.isFetched);
+    }
+
+    @Test
+    public void testFailureRequestError() throws Exception {
+        BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null,
+                new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
+        downloaderDelegate.handleRequestError(new ClientProtocolException());
+        assertTrue(mockDownloader.isFailure);
+        assertEquals(ClientProtocolException.class, mockDownloader.ex.getClass());
+        assertFalse(mockDownloader.isSuccess);
+        assertFalse(mockDownloader.isFetched);
+    }
+
+    @Test
+    public void testFetchRecord() throws Exception {
+        BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null,
+                new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
+        CryptoRecord record = new CryptoRecord();
+        downloaderDelegate.handleWBO(record);
+        assertTrue(mockDownloader.isFetched);
+        assertFalse(mockDownloader.isSuccess);
+        assertFalse(mockDownloader.isFailure);
+    }
+
+    private SyncStorageResponse makeSyncStorageResponse(int code, String lastModified) {
+        BasicHttpResponse response = new BasicHttpResponse(
+                new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), code, null));
+
+        if (lastModified != null) {
+            response.addHeader(SyncResponse.X_LAST_MODIFIED, lastModified);
+        }
+
+        return new SyncStorageResponse(response);
+    }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
@@ -0,0 +1,543 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.downloaders;
+
+import android.support.annotation.NonNull;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.CryptoRecord;
+import org.mozilla.gecko.sync.InfoCollections;
+import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.net.AuthHeaderProvider;
+import org.mozilla.gecko.sync.net.SyncResponse;
+import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
+import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.Repository;
+import org.mozilla.gecko.sync.repositories.Server11Repository;
+import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.ExecutorService;
+
+import ch.boye.httpclientandroidlib.ProtocolVersion;
+import ch.boye.httpclientandroidlib.message.BasicHttpResponse;
+import ch.boye.httpclientandroidlib.message.BasicStatusLine;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(TestRunner.class)
+public class BatchingDownloaderTest {
+    private MockSever11Repository serverRepository;
+    private Server11RepositorySession repositorySession;
+    private MockSessionFetchRecordsDelegate sessionFetchRecordsDelegate;
+    private MockDownloader mockDownloader;
+    private String DEFAULT_COLLECTION_NAME = "dummyCollection";
+    private String DEFAULT_COLLECTION_URL = "http://dummy.url/";
+    private long DEFAULT_NEWER = 1;
+    private String DEFAULT_SORT = "index";
+    private String DEFAULT_IDS = "1";
+    private String DEFAULT_LMHEADER = "12345678";
+
+    class MockSessionFetchRecordsDelegate implements RepositorySessionFetchRecordsDelegate {
+        public boolean isFailure;
+        public boolean isFetched;
+        public boolean isSuccess;
+        public Exception ex;
+        public Record record;
+
+        @Override
+        public void onFetchFailed(Exception ex, Record record) {
+            this.isFailure = true;
+            this.ex = ex;
+            this.record = record;
+        }
+
+        @Override
+        public void onFetchedRecord(Record record) {
+            this.isFetched = true;
+            this.record = record;
+        }
+
+        @Override
+        public void onFetchCompleted(long fetchEnd) {
+            this.isSuccess = true;
+        }
+
+        @Override
+        public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
+            return null;
+        }
+    }
+
+    class MockRequest extends SyncStorageCollectionRequest {
+
+        public MockRequest(URI uri) {
+            super(uri);
+        }
+
+        @Override
+        public void get() {
+
+        }
+    }
+
+    class MockDownloader extends BatchingDownloader {
+        public long newer;
+        public long limit;
+        public boolean full;
+        public String sort;
+        public String ids;
+        public String offset;
+        public boolean abort;
+
+        public MockDownloader(Server11Repository repository, Server11RepositorySession repositorySession) {
+            super(repository, repositorySession);
+        }
+
+        @Override
+        public void fetchWithParameters(long newer,
+                                 long batchLimit,
+                                 boolean full,
+                                 String sort,
+                                 String ids,
+                                 SyncStorageCollectionRequest request,
+                                 RepositorySessionFetchRecordsDelegate fetchRecordsDelegate)
+                throws UnsupportedEncodingException, URISyntaxException {
+            this.newer = newer;
+            this.limit = batchLimit;
+            this.full = full;
+            this.sort = sort;
+            this.ids = ids;
+            MockRequest mockRequest = new MockRequest(new URI(DEFAULT_COLLECTION_URL));
+            super.fetchWithParameters(newer, batchLimit, full, sort, ids, mockRequest, fetchRecordsDelegate);
+        }
+
+        @Override
+        public void abortRequests() {
+            this.abort = true;
+        }
+
+        @Override
+        public SyncStorageCollectionRequest makeSyncStorageCollectionRequest(long newer,
+                      long batchLimit,
+                      boolean full,
+                      String sort,
+                      String ids,
+                      String offset)
+                throws URISyntaxException, UnsupportedEncodingException {
+            this.offset = offset;
+            return super.makeSyncStorageCollectionRequest(newer, batchLimit, full, sort, ids, offset);
+        }
+    }
+
+    class MockSever11Repository extends Server11Repository {
+        public MockSever11Repository(@NonNull String collection, @NonNull String storageURL,
+                                     AuthHeaderProvider authHeaderProvider, @NonNull InfoCollections infoCollections,
+                                     @NonNull InfoConfiguration infoConfiguration) throws URISyntaxException {
+            super(collection, storageURL, authHeaderProvider, infoCollections, infoConfiguration);
+        }
+
+        @Override
+        public long getDefaultTotalLimit() {
+            return 200;
+        }
+    }
+
+    class MockRepositorySession extends Server11RepositorySession {
+        public boolean abort;
+
+        public MockRepositorySession(Repository repository) {
+            super(repository);
+        }
+
+        @Override
+        public void abort() {
+            this.abort = true;
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        sessionFetchRecordsDelegate = new MockSessionFetchRecordsDelegate();
+
+        serverRepository = new MockSever11Repository(DEFAULT_COLLECTION_NAME, DEFAULT_COLLECTION_URL, null,
+                new InfoCollections(), new InfoConfiguration());
+        repositorySession = new Server11RepositorySession(serverRepository);
+        mockDownloader = new MockDownloader(serverRepository, repositorySession);
+    }
+
+    @Test
+    public void testFlattenId() {
+        String[] emptyGuid = new String[]{};
+        String flatten =  BatchingDownloader.flattenIDs(emptyGuid);
+        assertEquals("", flatten);
+
+        String guid0 = "123456789abc";
+        String[] singleGuid = new String[1];
+        singleGuid[0] = guid0;
+        flatten = BatchingDownloader.flattenIDs(singleGuid);
+        assertEquals("123456789abc", flatten);
+
+        String guid1 = "456789abc";
+        String guid2 = "789abc";
+        String[] multiGuid = new String[3];
+        multiGuid[0] = guid0;
+        multiGuid[1] = guid1;
+        multiGuid[2] = guid2;
+        flatten = BatchingDownloader.flattenIDs(multiGuid);
+        assertEquals("123456789abc,456789abc,789abc", flatten);
+    }
+
+    @Test
+    public void testEncodeParam() throws Exception {
+        String param = "123&123";
+        String encodedParam = mockDownloader.encodeParam(param);
+        assertEquals("123%26123", encodedParam);
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void testOverTotalLimit() throws Exception {
+        // Per-batch limits exceed total.
+        Server11Repository repository = new Server11Repository(DEFAULT_COLLECTION_NAME, DEFAULT_COLLECTION_URL,
+                null, new InfoCollections(), new InfoConfiguration()) {
+            @Override
+            public long getDefaultTotalLimit() {
+                return 100;
+            }
+            @Override
+            public long getDefaultBatchLimit() {
+                return 200;
+            }
+        };
+        MockDownloader mockDownloader = new MockDownloader(repository, repositorySession);
+
+        assertNull(mockDownloader.getLastModified());
+        mockDownloader.fetchSince(DEFAULT_NEWER, sessionFetchRecordsDelegate);
+    }
+
+    @Test
+    public void testTotalLimit() throws Exception {
+        // Total and per-batch limits are the same.
+        Server11Repository repository = new Server11Repository(DEFAULT_COLLECTION_NAME, DEFAULT_COLLECTION_URL,
+                null, new InfoCollections(), new InfoConfiguration()) {
+            @Override
+            public long getDefaultTotalLimit() {
+                return 100;
+            }
+            @Override
+            public long getDefaultBatchLimit() {
+                return 100;
+            }
+        };
+        MockDownloader mockDownloader = new MockDownloader(repository, repositorySession);
+
+        assertNull(mockDownloader.getLastModified());
+        mockDownloader.fetchSince(DEFAULT_NEWER, sessionFetchRecordsDelegate);
+
+        SyncStorageResponse response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, "100", "100");
+        SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
+        long limit = repository.getDefaultBatchLimit();
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request,
+                DEFAULT_NEWER, limit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
+        assertTrue(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFetched);
+        assertFalse(sessionFetchRecordsDelegate.isFailure);
+    }
+
+    @Test
+    public void testOverHalfOfTotalLimit() throws Exception {
+        // Per-batch limit is just a bit lower than total.
+        Server11Repository repository = new Server11Repository(DEFAULT_COLLECTION_NAME, DEFAULT_COLLECTION_URL,
+                null, new InfoCollections(), new InfoConfiguration()) {
+            @Override
+            public long getDefaultTotalLimit() {
+                return 100;
+            }
+            @Override
+            public long getDefaultBatchLimit() {
+                return 75;
+            }
+        };
+        MockDownloader mockDownloader = new MockDownloader(repository, repositorySession);
+
+        assertNull(mockDownloader.getLastModified());
+        mockDownloader.fetchSince(DEFAULT_NEWER, sessionFetchRecordsDelegate);
+
+        String offsetHeader = "75";
+        String recordsHeader = "75";
+        SyncStorageResponse response = makeSyncStorageResponse(200,  DEFAULT_LMHEADER, offsetHeader, recordsHeader);
+        SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
+        long limit = repository.getDefaultBatchLimit();
+
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+                limit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
+        // Verify the same parameters are used in the next fetch.
+        assertSameParameters(mockDownloader, limit);
+        assertEquals(offsetHeader, mockDownloader.offset);
+        assertFalse(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFetched);
+        assertFalse(sessionFetchRecordsDelegate.isFailure);
+
+        // The next batch, we still have an offset token but we complete our fetch since we have reached the total limit.
+        offsetHeader = "150";
+        response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+                limit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
+        assertTrue(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFetched);
+        assertFalse(sessionFetchRecordsDelegate.isFailure);
+    }
+
+    @Test
+    public void testHalfOfTotalLimit() throws Exception {
+        // Per-batch limit is half of total.
+        Server11Repository repository = new Server11Repository(DEFAULT_COLLECTION_NAME, DEFAULT_COLLECTION_URL,
+                null, new InfoCollections(), new InfoConfiguration()) {
+            @Override
+            public long getDefaultTotalLimit() {
+                return 100;
+            }
+            @Override
+            public long getDefaultBatchLimit() {
+                return 50;
+            }
+        };
+        mockDownloader = new MockDownloader(repository, repositorySession);
+
+        assertNull(mockDownloader.getLastModified());
+        mockDownloader.fetchSince(DEFAULT_NEWER, sessionFetchRecordsDelegate);
+
+        String offsetHeader = "50";
+        String recordsHeader = "50";
+        SyncStorageResponse response = makeSyncStorageResponse(200,  DEFAULT_LMHEADER, offsetHeader, recordsHeader);
+        SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
+        long limit = repository.getDefaultBatchLimit();
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+                limit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
+        // Verify the same parameters are used in the next fetch.
+        assertSameParameters(mockDownloader, limit);
+        assertEquals(offsetHeader, mockDownloader.offset);
+        assertFalse(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFetched);
+        assertFalse(sessionFetchRecordsDelegate.isFailure);
+
+        // The next batch, we still have an offset token but we complete our fetch since we have reached the total limit.
+        offsetHeader = "100";
+        response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+                limit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
+        assertTrue(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFetched);
+        assertFalse(sessionFetchRecordsDelegate.isFailure);
+    }
+
+    @Test
+    public void testFractionOfTotalLimit() throws Exception {
+        // Per-batch limit is a small fraction of the total.
+        Server11Repository repository = new Server11Repository(DEFAULT_COLLECTION_NAME, DEFAULT_COLLECTION_URL,
+                null, new InfoCollections(), new InfoConfiguration()) {
+            @Override
+            public long getDefaultTotalLimit() {
+                return 100;
+            }
+            @Override
+            public long getDefaultBatchLimit() {
+                return 25;
+            }
+        };
+        mockDownloader = new MockDownloader(repository, repositorySession);
+
+        assertNull(mockDownloader.getLastModified());
+        mockDownloader.fetchSince(DEFAULT_NEWER, sessionFetchRecordsDelegate);
+
+        String offsetHeader = "25";
+        String recordsHeader = "25";
+        SyncStorageResponse response = makeSyncStorageResponse(200,  DEFAULT_LMHEADER, offsetHeader, recordsHeader);
+        SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
+        long limit = repository.getDefaultBatchLimit();
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+                limit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
+        // Verify the same parameters are used in the next fetch.
+        assertSameParameters(mockDownloader, limit);
+        assertEquals(offsetHeader, mockDownloader.offset);
+        assertFalse(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFetched);
+        assertFalse(sessionFetchRecordsDelegate.isFailure);
+
+        // The next batch, we still have an offset token and has not exceed the total limit.
+        offsetHeader = "50";
+        response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+                limit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
+        // Verify the same parameters are used in the next fetch.
+        assertSameParameters(mockDownloader, limit);
+        assertEquals(offsetHeader, mockDownloader.offset);
+        assertFalse(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFetched);
+        assertFalse(sessionFetchRecordsDelegate.isFailure);
+
+        // The next batch, we still have an offset token and has not exceed the total limit.
+        offsetHeader = "75";
+        response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+                limit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
+        // Verify the same parameters are used in the next fetch.
+        assertSameParameters(mockDownloader, limit);
+        assertEquals(offsetHeader, mockDownloader.offset);
+        assertFalse(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFetched);
+        assertFalse(sessionFetchRecordsDelegate.isFailure);
+
+        // The next batch, we still have an offset token but we complete our fetch since we have reached the total limit.
+        offsetHeader = "100";
+        response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+                limit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
+        assertTrue(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFetched);
+        assertFalse(sessionFetchRecordsDelegate.isFailure);
+    }
+
+    @Test
+    public void testFailureLMChangedMultiBatch() throws Exception {
+        assertNull(mockDownloader.getLastModified());
+
+        String lmHeader = "12345678";
+        String offsetHeader = "100";
+        SyncStorageResponse response = makeSyncStorageResponse(200, lmHeader, offsetHeader, null);
+        SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI("http://dummy.url"));
+        long limit = 1;
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+                limit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        assertEquals(lmHeader, mockDownloader.getLastModified());
+        // Verify the same parameters are used in the next fetch.
+        assertEquals(DEFAULT_NEWER, mockDownloader.newer);
+        assertEquals(limit, mockDownloader.limit);
+        assertTrue(mockDownloader.full);
+        assertEquals(DEFAULT_SORT, mockDownloader.sort);
+        assertEquals(DEFAULT_IDS, mockDownloader.ids);
+        assertEquals(offsetHeader, mockDownloader.offset);
+        assertFalse(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFetched);
+        assertFalse(sessionFetchRecordsDelegate.isFailure);
+
+        // Last modified header somehow changed.
+        lmHeader = "10000000";
+        response = makeSyncStorageResponse(200, lmHeader, offsetHeader, null);
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+                limit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        assertNotEquals(lmHeader, mockDownloader.getLastModified());
+        assertTrue(mockDownloader.abort);
+        assertFalse(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFetched);
+        assertTrue(sessionFetchRecordsDelegate.isFailure);
+    }
+
+    @Test
+    public void testFailureMissingLMMultiBatch() throws Exception {
+        assertNull(mockDownloader.getLastModified());
+
+        String offsetHeader = "100";
+        SyncStorageResponse response = makeSyncStorageResponse(200, null, offsetHeader, null);
+        SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI("http://dummy.url"));
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+                1, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        // Last modified header somehow missing from response.
+        assertNull(null, mockDownloader.getLastModified());
+        assertTrue(mockDownloader.abort);
+        assertFalse(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFetched);
+        assertTrue(sessionFetchRecordsDelegate.isFailure);
+    }
+
+    @Test
+    public void testFailureException() throws Exception {
+        Exception ex = new IllegalStateException();
+        SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI("http://dummy.url"));
+        mockDownloader.onFetchFailed(ex, sessionFetchRecordsDelegate, request);
+
+        assertFalse(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFetched);
+        assertTrue(sessionFetchRecordsDelegate.isFailure);
+        assertEquals(ex.getClass(), sessionFetchRecordsDelegate.ex.getClass());
+        assertNull(sessionFetchRecordsDelegate.record);
+    }
+
+    @Test
+    public void testFetchRecord() {
+        CryptoRecord record = new CryptoRecord();
+        mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
+
+        assertTrue(sessionFetchRecordsDelegate.isFetched);
+        assertFalse(sessionFetchRecordsDelegate.isSuccess);
+        assertFalse(sessionFetchRecordsDelegate.isFailure);
+        assertEquals(record, sessionFetchRecordsDelegate.record);
+    }
+
+    @Test
+    public void testAbortRequests() {
+        MockRepositorySession mockRepositorySession = new MockRepositorySession(serverRepository);
+        BatchingDownloader downloader = new BatchingDownloader(serverRepository, mockRepositorySession);
+        assertFalse(mockRepositorySession.abort);
+        downloader.abortRequests();
+        assertTrue(mockRepositorySession.abort);
+    }
+
+    private void assertSameParameters(MockDownloader mockDownloader, long limit) {
+        assertEquals(DEFAULT_NEWER, mockDownloader.newer);
+        assertEquals(limit, mockDownloader.limit);
+        assertTrue(mockDownloader.full);
+        assertEquals(DEFAULT_SORT, mockDownloader.sort);
+        assertEquals(DEFAULT_IDS, mockDownloader.ids);
+    }
+
+    private SyncStorageResponse makeSyncStorageResponse(int code, String lastModified, String offset, String records) {
+        BasicHttpResponse response = new BasicHttpResponse(
+                new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), code, null));
+
+        if (lastModified != null) {
+            response.addHeader(SyncResponse.X_LAST_MODIFIED, lastModified);
+        }
+
+        if (offset != null) {
+            response.addHeader(SyncResponse.X_WEAVE_NEXT_OFFSET, offset);
+        }
+
+        if (records != null) {
+            response.addHeader(SyncResponse.X_WEAVE_RECORDS, records);
+        }
+
+        return new SyncStorageResponse(response);
+    }
+}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/test/TestSafeConstrainedServer11Repository.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/test/TestSafeConstrainedServer11Repository.java
@@ -83,17 +83,17 @@ public class TestSafeConstrainedServer11
       String countsURL = TEST_SERVER + TEST_BASE_PATH + "info/collection_counts";
       JSONRecordFetcher countFetcher = new JSONRecordFetcher(countsURL, getAuthHeaderProvider());
       String sort = "sortindex";
       String collection = "rotary";
 
       final int TEST_LIMIT = 1000;
       final SafeConstrainedServer11Repository repo = new SafeConstrainedServer11Repository(
           collection, getCollectionURL(collection), null, infoCollections, infoConfiguration,
-          TEST_LIMIT, sort, countFetcher);
+          TEST_LIMIT, TEST_LIMIT, sort, countFetcher);
 
       final AtomicBoolean shouldSkipLots = new AtomicBoolean(false);
       final AtomicBoolean shouldSkipFew = new AtomicBoolean(true);
       final AtomicBoolean shouldSkip503 = new AtomicBoolean (false);
 
       WaitHelper.getTestWaiter().performWait(2000, new Runnable() {
         @Override
         public void run() {