Bug 1291821 - Allow BatchingDownloader to resume downloads using offset r=rnewman
authorGrisha Kruglov <gkruglov@mozilla.com>
Thu, 19 Jan 2017 13:11:18 -0800
changeset 344875 5e0b1684de40494b4464d963253149080b120fa8
parent 344874 3c89bba23c2d344c514b2d59ca52c377d55c541f
child 344876 bd232d46a3967a174a399d4cf444b99391bd8014
push id37970
push usergkruglov@mozilla.com
push dateSat, 25 Feb 2017 01:09:28 +0000
treeherderautoland@bd232d46a396 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersrnewman
bugs1291821
milestone54.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1291821 - Allow BatchingDownloader to resume downloads using offset r=rnewman BatchingDownloader uses provided RepositoryStateProvider instance in order to track offset and high water mark as it performs batching. The state holder objects are initialized by individual ServerSyncStages, and prefixes are used to ensure keys won't clash. Two RepositoryStateProvider implementations are used: persistent and non-persistent. Non-persistent state provider does not allow for resuming after a sync restart, while persistent one does. Persistent state provider is used by the history stage. It is fetched oldest-first, and records are applied to live storage as they're downloaded. These conditions let use resume downloads. It's also possible to resume downloads for stages which use a persistent buffer, but currently we do not have any. Offset value and its context is reset if we hit a 412 error; it is maintained if we hit a sync deadline, allowing us to minimize number of records we'll redownload. BatchingDownloaderController owns resuming and context checking logic. High water mark (h.w.m.) is maintained across syncs and used instead of stage's "last-synced" timestamp if said stage is set to fetch oldest-first and explicitely allows use of a h.w.m. Server15RepositorySession provides correct timestamp to RecordsChannel, decoupling BatchingDownloader from this logic. MozReview-Commit-ID: IH28YrDU4vW
mobile/android/base/android-services.mozbuild
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConfigurableServer15Repository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/NonPersistentRepositoryStateProvider.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/PersistentRepositoryStateProvider.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositoryStateProvider.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15Repository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15RepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserHistoryRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderController.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserRecentHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer15Repository.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderControllerTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -992,37 +992,41 @@ sync_java_files = [TOPSRCDIR + '/mobile/
     'sync/repositories/domain/PasswordRecord.java',
     'sync/repositories/domain/PasswordRecordFactory.java',
     'sync/repositories/domain/Record.java',
     'sync/repositories/domain/RecordParseException.java',
     'sync/repositories/domain/TabsRecord.java',
     'sync/repositories/domain/TabsRecordFactory.java',
     'sync/repositories/domain/VersionConstants.java',
     'sync/repositories/downloaders/BatchingDownloader.java',
+    'sync/repositories/downloaders/BatchingDownloaderController.java',
     'sync/repositories/downloaders/BatchingDownloaderDelegate.java',
     'sync/repositories/FetchFailedException.java',
     'sync/repositories/HashSetStoreTracker.java',
     'sync/repositories/HistoryRepository.java',
     'sync/repositories/IdentityRecordFactory.java',
     'sync/repositories/InactiveSessionException.java',
     'sync/repositories/InvalidBookmarkTypeException.java',
     'sync/repositories/InvalidRequestException.java',
     'sync/repositories/InvalidSessionTransitionException.java',
     'sync/repositories/MultipleRecordsForGuidException.java',
     'sync/repositories/NoContentProviderException.java',
     'sync/repositories/NoGuidForIdException.java',
+    'sync/repositories/NonPersistentRepositoryStateProvider.java',
     'sync/repositories/NoStoreDelegateException.java',
     'sync/repositories/NullCursorException.java',
     'sync/repositories/ParentNotFoundException.java',
+    'sync/repositories/PersistentRepositoryStateProvider.java',
     'sync/repositories/ProfileDatabaseException.java',
     'sync/repositories/RecordFactory.java',
     'sync/repositories/RecordFilter.java',
     'sync/repositories/Repository.java',
     'sync/repositories/RepositorySession.java',
     'sync/repositories/RepositorySessionBundle.java',
+    'sync/repositories/RepositoryStateProvider.java',
     'sync/repositories/Server15Repository.java',
     'sync/repositories/Server15RepositorySession.java',
     'sync/repositories/StoreFailedException.java',
     'sync/repositories/StoreTracker.java',
     'sync/repositories/StoreTrackingRepositorySession.java',
     'sync/repositories/uploaders/BatchingUploader.java',
     'sync/repositories/uploaders/BatchMeta.java',
     'sync/repositories/uploaders/BufferSizeTracker.java',
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
@@ -153,21 +153,16 @@ import java.util.concurrent.Executors;
     }
 
     @Override
     public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
         inner.setStoreDelegate(delegate);
         this.storeDelegate = delegate;
     }
 
-    @Override
-    public long getHighWaterMarkTimestamp() {
-        return bufferStorage.latestModifiedTimestamp();
-    }
-
     private boolean mayProceedToMergeBuffer() {
         // If our buffer storage is not persistent, disallowing merging after buffer has been filled
         // means throwing away records only to re-download them later.
         // In this case allow merge to proceed even if we're past the deadline.
         if (!bufferStorage.isPersistent()) {
             return true;
         }
 
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
@@ -22,14 +22,10 @@ public interface BufferStorage {
     // NB: For a database-backed storage, "replace" happens at a transaction level.
     void addOrReplace(Record record);
 
     // For database-backed implementations, commits any records that came in up to this point.
     void flush();
 
     void clear();
 
-    // For buffers that are filled up oldest-first this is a high water mark, which enables resuming
-    // a sync.
-    long latestModifiedTimestamp();
-
     boolean isPersistent();
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
@@ -43,28 +43,9 @@ public class MemoryBufferStorage impleme
     public void flush() {
         // This is a no-op; flush intended for database-backed stores.
     }
 
     @Override
     public void clear() {
         recordBuffer.clear();
     }
-
-    @Override
-    public long latestModifiedTimestamp() {
-        long lastModified = 0;
-
-        synchronized (recordBuffer) {
-            if (recordBuffer.size() == 0) {
-                return lastModified;
-            }
-
-            for (Record record : recordBuffer.values()) {
-                if (record.lastModified > lastModified) {
-                    lastModified = record.lastModified;
-                }
-            }
-        }
-
-        return lastModified;
-    }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConfigurableServer15Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConfigurableServer15Repository.java
@@ -4,59 +4,81 @@
 
 package org.mozilla.gecko.sync.repositories;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
+import org.mozilla.gecko.sync.stage.ServerSyncStage;
 
 /**
- * A kind of Server15Repository that supports explicit setting of per-batch fetch limit,
- * batching mode (single batch vs multi-batch), and a sort order.
+ * A kind of Server15Repository that supports explicit setting of:
+ * - per-batch fetch limit
+ * - batching mode (single batch vs multi-batch)
+ * - sort order
+ * - repository state provider (persistent vs non-persistent)
+ * - whereas use of high-water-mark is allowed
  *
  * @author rnewman
  *
  */
 public class ConfigurableServer15Repository extends Server15Repository {
   private final String sortOrder;
   private final long batchLimit;
-  private final boolean allowMultipleBatches;
+  private final ServerSyncStage.MultipleBatches multipleBatches;
+  private final ServerSyncStage.HighWaterMark highWaterMark;
 
   public ConfigurableServer15Repository(
           String collection,
           long syncDeadline,
           String storageURL,
           AuthHeaderProvider authHeaderProvider,
           InfoCollections infoCollections,
           InfoConfiguration infoConfiguration,
           long batchLimit,
           String sort,
-          boolean allowMultipleBatches) throws URISyntaxException {
+          ServerSyncStage.MultipleBatches multipleBatches,
+          ServerSyncStage.HighWaterMark highWaterMark,
+          RepositoryStateProvider stateProvider) throws URISyntaxException {
     super(
             collection,
             syncDeadline,
             storageURL,
             authHeaderProvider,
             infoCollections,
-            infoConfiguration
+            infoConfiguration,
+            stateProvider
     );
     this.batchLimit = batchLimit;
     this.sortOrder  = sort;
-    this.allowMultipleBatches = allowMultipleBatches;
+    this.multipleBatches = multipleBatches;
+    this.highWaterMark = highWaterMark;
+
+    // Sanity check: let's ensure we're configured correctly. At this point in time, it doesn't make
+    // sense to use H.W.M. with a non-persistent state provider. This might change if we start retrying
+    // during a download in case of 412s.
+    if (!stateProvider.isPersistent() && highWaterMark.equals(ServerSyncStage.HighWaterMark.Enabled)) {
+      throw new IllegalArgumentException("Can not use H.W.M. with NonPersistentRepositoryStateProvider");
+    }
   }
 
   @Override
   public String getSortOrder() {
     return sortOrder;
   }
 
   @Override
   public Long getBatchLimit() {
     return batchLimit;
   }
 
   @Override
   public boolean getAllowMultipleBatches() {
-    return allowMultipleBatches;
+    return multipleBatches.equals(ServerSyncStage.MultipleBatches.Enabled);
+  }
+
+  @Override
+  public boolean getAllowHighWaterMark() {
+    return highWaterMark.equals(ServerSyncStage.HighWaterMark.Enabled);
   }
 }
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/NonPersistentRepositoryStateProvider.java
@@ -0,0 +1,76 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories;
+
+import android.support.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple non-persistent implementation of a repository state provider.
+ *
+ * Just like in the persistent implementation, changes to values are visible only after a commit.
+ *
+ * @author grisha
+ */
+public class NonPersistentRepositoryStateProvider implements RepositoryStateProvider {
+    // We'll have at least OFFSET and H.W.M. values set.
+    private final int INITIAL_CAPACITY = 2;
+    private final Map<String, Object> nonCommittedValuesMap = Collections.synchronizedMap(
+            new HashMap<String, Object>(INITIAL_CAPACITY)
+    );
+
+    // NB: Any changes are made by creating a new map instead of altering an existing one.
+    private volatile Map<String, Object> committedValuesMap = new HashMap<>(INITIAL_CAPACITY);
+
+    @Override
+    public boolean isPersistent() {
+        return false;
+    }
+
+    @Override
+    public boolean commit() {
+        committedValuesMap = new HashMap<>(nonCommittedValuesMap);
+        return true;
+    }
+
+    @Override
+    public NonPersistentRepositoryStateProvider clear(String key) {
+        nonCommittedValuesMap.remove(key);
+        return this;
+    }
+
+    @Override
+    public NonPersistentRepositoryStateProvider setString(String key, String value) {
+        nonCommittedValuesMap.put(key, value);
+        return this;
+    }
+
+    @Nullable
+    @Override
+    public String getString(String key) {
+        return (String) committedValuesMap.get(key);
+    }
+
+    @Override
+    public NonPersistentRepositoryStateProvider setLong(String key, Long value) {
+        nonCommittedValuesMap.put(key, value);
+        return this;
+    }
+
+    @Nullable
+    @Override
+    public Long getLong(String key) {
+        return (Long) committedValuesMap.get(key);
+    }
+
+    @Override
+    public boolean resetAndCommit() {
+        nonCommittedValuesMap.clear();
+        return commit();
+    }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/PersistentRepositoryStateProvider.java
@@ -0,0 +1,83 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories;
+
+import android.support.annotation.Nullable;
+
+import org.mozilla.gecko.background.common.PrefsBranch;
+
+/**
+ * Simple persistent implementation of a repository state provider.
+ * Uses provided PrefsBranch object in order to persist values.
+ *
+ * Values must be committed before they become visible via getters.
+ * It is a caller's responsibility to perform a commit.
+ *
+ * @author grisha
+ */
+public class PersistentRepositoryStateProvider implements RepositoryStateProvider {
+    private final PrefsBranch prefs;
+
+    private final PrefsBranch.Editor editor;
+
+    public PersistentRepositoryStateProvider(PrefsBranch prefs) {
+        this.prefs = prefs;
+        // NB: It is a caller's responsibility to commit any changes it performs via setters.
+        this.editor = prefs.edit();
+    }
+
+    @Override
+    public boolean isPersistent() {
+        return true;
+    }
+
+    @Override
+    public boolean commit() {
+        return this.editor.commit();
+    }
+
+    @Override
+    public PersistentRepositoryStateProvider clear(String key) {
+        this.editor.remove(key);
+        return this;
+    }
+
+    @Override
+    public PersistentRepositoryStateProvider setString(String key, String value) {
+        this.editor.putString(key, value);
+        return this;
+    }
+
+    @Nullable
+    @Override
+    public String getString(String key) {
+        return this.prefs.getString(key, null);
+    }
+
+    @Override
+    public PersistentRepositoryStateProvider setLong(String key, Long value) {
+        this.editor.putLong(key, value);
+        return this;
+    }
+
+    @Nullable
+    @Override
+    public Long getLong(String key) {
+        if (!this.prefs.contains(key)) {
+            return null;
+        }
+        return this.prefs.getLong(key, 0);
+    }
+
+    @Override
+    public boolean resetAndCommit() {
+        return this.editor
+                .remove(KEY_HIGH_WATER_MARK)
+                .remove(KEY_OFFSET)
+                .remove(KEY_OFFSET_ORDER)
+                .remove(KEY_OFFSET_SINCE)
+                .commit();
+    }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
@@ -71,21 +71,16 @@ public abstract class RepositorySession 
 
   // The time that the last sync on this collection completed, in milliseconds since epoch.
   private long lastSyncTimestamp = 0;
 
   public long getLastSyncTimestamp() {
     return lastSyncTimestamp;
   }
 
-  // Override this in the buffering wrappers.
-  public long getHighWaterMarkTimestamp() {
-    return 0;
-  }
-
   public static long now() {
     return System.currentTimeMillis();
   }
 
   public RepositorySession(Repository repository) {
     this.repository = repository;
   }
 
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositoryStateProvider.java
@@ -0,0 +1,47 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories;
+
+import android.support.annotation.CheckResult;
+import android.support.annotation.Nullable;
+
+/**
+ * Interface describing a repository state provider.
+ * Repository's state might consist of a number of key-value pairs.
+ *
+ * Currently there are two types of implementations: persistent and non-persistent state.
+ * Persistent state survives between syncs, and is currently used by the BatchingDownloader to
+ * resume downloads in case of interruptions. Non-persistent state is used when resuming downloads
+ * is not possible.
+ *
+ * In order to safely use a persistent state provider for resuming downloads, a sync stage must match
+ * the following criteria:
+ * - records are downloaded with sort=oldest
+ * - records must be downloaded into a persistent buffer, or applied to live storage
+ *
+ * @author grisha
+ */
+public interface RepositoryStateProvider {
+    String KEY_HIGH_WATER_MARK = "highWaterMark";
+    String KEY_OFFSET = "offset";
+    String KEY_OFFSET_SINCE = "offsetSince";
+    String KEY_OFFSET_ORDER = "offsetOrder";
+
+    boolean isPersistent();
+
+    @CheckResult
+    boolean commit();
+
+    RepositoryStateProvider clear(String key);
+
+    RepositoryStateProvider setString(String key, String value);
+    @Nullable String getString(String key);
+
+    RepositoryStateProvider setLong(String key, Long value);
+    @Nullable Long getLong(String key);
+
+    @CheckResult
+    boolean resetAndCommit();
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15Repository.java
@@ -26,16 +26,18 @@ public class Server15Repository extends 
   public final AuthHeaderProvider authHeaderProvider;
 
   private final long syncDeadlineMillis;
   /* package-local */ final URI collectionURI;
 
   protected final String collection;
   protected final InfoCollections infoCollections;
 
+  protected RepositoryStateProvider stateProvider;
+
   private final InfoConfiguration infoConfiguration;
   private final static String DEFAULT_SORT_ORDER = "oldest";
   private final static long DEFAULT_BATCH_LIMIT = 100;
 
   /**
    * Construct a new repository that fetches and stores against the Sync 1.5 API.
    *
    * @param collection name.
@@ -45,32 +47,34 @@ public class Server15Repository extends 
    * @throws URISyntaxException
    */
   public Server15Repository(
           @NonNull String collection,
           long syncDeadlineMillis,
           @NonNull String storageURL,
           AuthHeaderProvider authHeaderProvider,
           @NonNull InfoCollections infoCollections,
-          @NonNull InfoConfiguration infoConfiguration) throws URISyntaxException {
+          @NonNull InfoConfiguration infoConfiguration,
+          @NonNull RepositoryStateProvider stateProvider) throws URISyntaxException {
     if (collection == null) {
       throw new IllegalArgumentException("collection must not be null");
     }
     if (storageURL == null) {
       throw new IllegalArgumentException("storageURL must not be null");
     }
     if (infoCollections == null) {
       throw new IllegalArgumentException("infoCollections must not be null");
     }
     this.collection = collection;
     this.syncDeadlineMillis = syncDeadlineMillis;
     this.collectionURI = new URI(storageURL + (storageURL.endsWith("/") ? collection : "/" + collection));
     this.authHeaderProvider = authHeaderProvider;
     this.infoCollections = infoCollections;
     this.infoConfiguration = infoConfiguration;
+    this.stateProvider = stateProvider;
   }
 
   @Override
   public void createSession(RepositorySessionCreationDelegate delegate,
                             Context context) {
     delegate.onSessionCreated(new Server15RepositorySession(this));
   }
 
@@ -98,16 +102,20 @@ public class Server15Repository extends 
   public Long getBatchLimit() {
     return DEFAULT_BATCH_LIMIT;
   }
 
   public boolean getAllowMultipleBatches() {
     return true;
   }
 
+  public boolean getAllowHighWaterMark() {
+    return false;
+  }
+
   /**
    * A point in time by which this repository's session must complete fetch and store operations.
    * Particularly pertinent for batching downloads performed by the session (should we fetch
    * another batch?) and buffered repositories (do we have enough time to merge what we've downloaded?).
    */
   public long getSyncDeadline() {
     return syncDeadlineMillis;
   }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15RepositorySession.java
@@ -8,29 +8,37 @@ import android.net.Uri;
 
 import org.mozilla.gecko.background.common.log.Logger;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 import org.mozilla.gecko.sync.repositories.downloaders.BatchingDownloader;
+import org.mozilla.gecko.sync.repositories.downloaders.BatchingDownloaderController;
 import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader;
 
 public class Server15RepositorySession extends RepositorySession {
   public static final String LOG_TAG = "Server15RepositorySession";
 
   protected final Server15Repository serverRepository;
   private BatchingUploader uploader;
   private final BatchingDownloader downloader;
 
   public Server15RepositorySession(Repository repository) {
     super(repository);
     this.serverRepository = (Server15Repository) repository;
-    this.downloader = initializeDownloader(this);
+    this.downloader = new BatchingDownloader(
+            this.serverRepository.authHeaderProvider,
+            Uri.parse(this.serverRepository.collectionURI().toString()),
+            this.serverRepository.getSyncDeadline(),
+            this.serverRepository.getAllowMultipleBatches(),
+            this.serverRepository.getAllowHighWaterMark(),
+            this.serverRepository.stateProvider,
+            this);
   }
 
   @Override
   public void setStoreDelegate(RepositorySessionStoreDelegate storeDelegate) {
     super.setStoreDelegate(storeDelegate);
 
     // Now that we have the delegate, we can initialize our uploader.
     this.uploader = new BatchingUploader(
@@ -44,17 +52,19 @@ public class Server15RepositorySession e
                          RepositorySessionGuidsSinceDelegate delegate) {
     // TODO Auto-generated method stub
 
   }
 
   @Override
   public void fetchSince(long sinceTimestamp,
                          RepositorySessionFetchRecordsDelegate delegate) {
-    this.downloader.fetchSince(
+    BatchingDownloaderController.resumeFetchSinceIfPossible(
+            this.downloader,
+            this.serverRepository.stateProvider,
             delegate,
             sinceTimestamp,
             serverRepository.getBatchLimit(),
             serverRepository.getSortOrder()
     );
   }
 
   @Override
@@ -98,22 +108,37 @@ public class Server15RepositorySession e
     // If delegate was set, this shouldn't happen.
     if (uploader == null) {
       throw new IllegalStateException("Uploader haven't been initialized");
     }
 
     uploader.noMoreRecordsToUpload();
   }
 
+  /**
+   * @return Repository's high-water-mark if it's available, its use is allowed by the repository,
+   * repository is set to fetch oldest-first, and it's greater than collection's last-synced timestamp.
+   * Otherwise, returns repository's last-synced timestamp.
+   */
+  @Override
+  public long getLastSyncTimestamp() {
+    if (!serverRepository.getAllowHighWaterMark() || !serverRepository.getSortOrder().equals("oldest")) {
+      return super.getLastSyncTimestamp();
+    }
+
+    final Long highWaterMark = serverRepository.stateProvider.getLong(
+            RepositoryStateProvider.KEY_HIGH_WATER_MARK);
+
+    // After a successful sync we expect that last-synced timestamp for a collection will be greater
+    // than the high-water-mark. High-water-mark is mostly useful in case of resuming a sync,
+    // and if we're resuming we did not bump our last-sync timestamps during the previous sync.
+    if (highWaterMark == null || super.getLastSyncTimestamp() > highWaterMark) {
+      return super.getLastSyncTimestamp();
+    }
+
+    return highWaterMark;
+  }
+
   @Override
   public boolean dataAvailable() {
     return serverRepository.updateNeeded(getLastSyncTimestamp());
   }
-
-  protected static BatchingDownloader initializeDownloader(final Server15RepositorySession serverRepositorySession) {
-    return new BatchingDownloader(
-            serverRepositorySession.serverRepository.authHeaderProvider,
-            Uri.parse(serverRepositorySession.serverRepository.collectionURI().toString()),
-            serverRepositorySession.serverRepository.getSyncDeadline(),
-            serverRepositorySession.serverRepository.getAllowMultipleBatches(),
-            serverRepositorySession);
-  }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserHistoryRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserHistoryRepositorySession.java
@@ -186,16 +186,38 @@ public class AndroidBrowserHistoryReposi
     }
   }
 
   @Override
   public void storeDone() {
     storeDone(System.currentTimeMillis());
   }
 
+  /**
+   * We need to flush our internal buffer of records in case of any interruptions of record flow
+   * from our "source". Downloader might be maintaining a "high-water-mark" based on the records
+   * it tried to store, so it's pertinent that all of the records that were queued for storage
+   * are eventually persisted.
+   */
+  @Override
+  public void storeIncomplete() {
+    storeWorkQueue.execute(new Runnable() {
+      @Override
+      public void run() {
+        synchronized (recordsBufferMonitor) {
+          try {
+            flushNewRecords();
+          } catch (Exception e) {
+            Logger.warn(LOG_TAG, "Error flushing records to database.", e);
+          }
+        }
+      }
+    });
+  }
+
   @Override
   public void storeDone(final long end) {
     storeWorkQueue.execute(new Runnable() {
       @Override
       public void run() {
         synchronized (recordsBufferMonitor) {
           try {
             flushNewRecords();
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
@@ -15,16 +15,17 @@ import org.mozilla.gecko.sync.CryptoReco
 import org.mozilla.gecko.sync.DelayedWorkTracker;
 import org.mozilla.gecko.sync.SyncDeadlineReachedException;
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -37,17 +38,17 @@ import java.util.concurrent.TimeUnit;
  * - Per-batch limit, which specified how many records may be fetched in an individual GET request.
  * - allowMultipleBatches, which determines if downloader is allowed to perform more than one fetch.
  *
  * Batching is implemented via specifying a 'limit' GET parameter, and looking for an 'offset' token
  * in the response. If offset token is present, this indicates that there are more records than what
  * we've received so far, and we perform an additional fetch, if we're allowed to do so by our
  * configuration. Batching stops when offset token is no longer present (indicating that we're done).
  *
- * If we are not allowed to perform multiple batches, we consider batching to be succesfully complete
+ * If we are not allowed to perform multiple batches, we consider batching to be successfully completed
  * after fist fetch request succeeds. Similarly, a trivial case of collection having less records than
  * the batch limit will also successfully complete in one fetch.
  *
  * In between batches, we maintain a Last-Modified timestamp, based off the value returned in the header
  * of the first response. Every response will have a Last-Modified header, indicating when the collection
  * was modified last. We pass along this header in our subsequent requests in a X-If-Unmodified-Since
  * header. Server will ensure that our collection did not change while we are batching, if it did it will
  * fail our fetch with a 412 error. Additionally, we perform the same checks locally.
@@ -56,35 +57,42 @@ public class BatchingDownloader {
     public static final String LOG_TAG = "BatchingDownloader";
     private static final String DEFAULT_SORT_ORDER = "index";
 
     private final RepositorySession repositorySession;
     private final DelayedWorkTracker workTracker = new DelayedWorkTracker();
     private final Uri baseCollectionUri;
     private final long fetchDeadline;
     private final boolean allowMultipleBatches;
+    private final boolean keepTrackOfHighWaterMark;
+
+    private RepositoryStateProvider stateProvider;
 
     /* package-local */ final AuthHeaderProvider authHeaderProvider;
 
     // Used to track outstanding requests, so that we can abort them as needed.
     @VisibleForTesting
     protected final Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
     /* @GuardedBy("this") */ private String lastModified;
 
     public BatchingDownloader(
             AuthHeaderProvider authHeaderProvider,
             Uri baseCollectionUri,
             long fetchDeadline,
             boolean allowMultipleBatches,
+            boolean keepTrackOfHighWaterMark,
+            RepositoryStateProvider stateProvider,
             RepositorySession repositorySession) {
         this.repositorySession = repositorySession;
         this.authHeaderProvider = authHeaderProvider;
         this.baseCollectionUri = baseCollectionUri;
         this.allowMultipleBatches = allowMultipleBatches;
+        this.keepTrackOfHighWaterMark = keepTrackOfHighWaterMark;
         this.fetchDeadline = fetchDeadline;
+        this.stateProvider = stateProvider;
     }
 
     @VisibleForTesting
     protected static String flattenIDs(String[] guids) {
         // Consider using Utils.toDelimitedString if and when the signature changes
         // to Collection<String> guids.
         if (guids.length == 0) {
             return "";
@@ -125,21 +133,16 @@ public class BatchingDownloader {
                                                   String offset)
             throws URISyntaxException, UnsupportedEncodingException {
         final URI collectionURI = buildCollectionURI(baseCollectionUri, full, newer, batchLimit, sort, ids, offset);
         Logger.debug(LOG_TAG, collectionURI.toString());
 
         return new SyncStorageCollectionRequest(collectionURI);
     }
 
-    public void fetchSince(RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, long timestamp, long batchLimit, String sortOrder) {
-        this.fetchSince(fetchRecordsDelegate, timestamp, batchLimit, sortOrder, null);
-    }
-
-    @VisibleForTesting
     public void fetchSince(RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, long timestamp, long batchLimit, String sortOrder, String offset) {
         try {
             SyncStorageCollectionRequest request = makeSyncStorageCollectionRequest(timestamp,
                     batchLimit, true, sortOrder, null, offset);
             this.fetchWithParameters(timestamp, batchLimit, true, sortOrder, null, request, fetchRecordsDelegate);
         } catch (URISyntaxException | UnsupportedEncodingException e) {
             fetchRecordsDelegate.onFetchFailed(e);
         }
@@ -194,26 +197,49 @@ public class BatchingDownloader {
         }
 
         // If we can (or must) stop batching at this point, let the delegate know that we're all done!
         final String offset = response.weaveOffset();
         if (offset == null || !allowMultipleBatches) {
             final long normalizedTimestamp = response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED);
             Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp);
 
+            // This isn't great, but shouldn't be too problematic - but do see notes below.
+            // Failing to reset a resume context after we're done with batching means that on next
+            // sync we'll erroneously try to resume downloading. If resume proceeds, we will fetch
+            // from an older timestamp, but offset by the amount of records we've fetched prior.
+            // Since we're diligent about setting a X-I-U-S header, any remote collection changes
+            // will be caught and we'll receive a 412.
+            if (!BatchingDownloaderController.resetResumeContextAndCommit(this.stateProvider)) {
+                Logger.warn(LOG_TAG, "Failed to reset resume context while completing a batch");
+            }
+
             this.workTracker.delayWorkItem(new Runnable() {
                 @Override
                 public void run() {
                     Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
                     fetchRecordsDelegate.onFetchCompleted(normalizedTimestamp);
                 }
             });
             return;
         }
 
+        // This is unfortunate, but largely just means that in case we need to resume later on, it
+        // either won't be possible (and we'll fetch w/o resuming), or won't be as efficient (i.e.
+        // we'll download more records than necessary).
+        if (BatchingDownloaderController.isResumeContextSet(this.stateProvider)) {
+            if (!BatchingDownloaderController.updateResumeContextAndCommit(this.stateProvider, offset)) {
+                Logger.warn(LOG_TAG, "Failed to update resume context while processing a batch.");
+            }
+        } else {
+            if (!BatchingDownloaderController.setInitialResumeContextAndCommit(this.stateProvider, offset, newer, sort)) {
+                Logger.warn(LOG_TAG, "Failed to set initial resume context while processing a batch.");
+            }
+        }
+
         // We need to make another batching request!
         // Let the delegate know that a batch fetch just completed before we proceed.
         // This operation needs to run after every call to onFetchedRecord for this batch has been
         // processed, hence the delayWorkItem call.
         this.workTracker.delayWorkItem(new Runnable() {
             @Override
             public void run() {
                 Logger.debug(LOG_TAG, "Running onBatchCompleted.");
@@ -228,16 +254,19 @@ public class BatchingDownloader {
         }
 
         // Create and execute new batch request.
         try {
             final SyncStorageCollectionRequest newRequest = makeSyncStorageCollectionRequest(newer,
                     limit, full, sort, ids, offset);
             this.fetchWithParameters(newer, limit, full, sort, ids, newRequest, fetchRecordsDelegate);
         } catch (final URISyntaxException | UnsupportedEncodingException e) {
+            if (!this.stateProvider.commit()) {
+                Logger.warn(LOG_TAG, "Failed to commit repository state while handling request creation error");
+            }
             this.workTracker.delayWorkItem(new Runnable() {
                 @Override
                 public void run() {
                     Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
                     fetchRecordsDelegate.onFetchFailed(e);
                 }
             });
         }
@@ -248,30 +277,54 @@ public class BatchingDownloader {
         handleFetchFailed(fetchRecordsDelegate, ex, null);
     }
 
     /* package-local */ void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
                               final Exception ex,
                               @Nullable final SyncStorageCollectionRequest request) {
         this.removeRequestFromPending(request);
         this.abortRequests();
+
+        // Resume context is not discarded if we failed because of reaching our deadline. In this case,
+        // we keep it allowing us to resume our download exactly where we left off.
+        // Discard resume context for all other failures: 412 (concurrent modification), HTTP errors, ...
+        if (!(ex instanceof SyncDeadlineReachedException)) {
+            // Failing to reset context means that we will try to resume once we re-sync current stage.
+            // This won't affect X-I-U-S logic in case of 412 (it's set separately from resume context),
+            // and same notes apply after failing to reset context in onFetchCompleted (see above).
+            if (!BatchingDownloaderController.resetResumeContextAndCommit(stateProvider)) {
+                Logger.warn(LOG_TAG, "Failed to reset resume context while processing a non-deadline exception");
+            }
+        } else {
+            // Failing to commit the context here means that we didn't commit the latest high-water-mark,
+            // and won't be as efficient once we re-sync. That is, we might download more records than necessary.
+            if (!this.stateProvider.commit()) {
+                Logger.warn(LOG_TAG, "Failed to commit resume context while processing a deadline exception");
+            }
+        }
+
         this.workTracker.delayWorkItem(new Runnable() {
             @Override
             public void run() {
                 Logger.debug(LOG_TAG, "Running onFetchFailed.");
                 fetchRecordsDelegate.onFetchFailed(ex);
             }
         });
     }
 
     public void onFetchedRecord(CryptoRecord record,
                                 RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
         this.workTracker.incrementOutstanding();
+
         try {
             fetchRecordsDelegate.onFetchedRecord(record);
+            // NB: changes to stateProvider are committed in either onFetchCompleted or handleFetchFailed.
+            if (this.keepTrackOfHighWaterMark) {
+                this.stateProvider.setLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK, record.lastModified);
+            }
         } catch (Exception ex) {
             Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
             throw new RuntimeException(ex);
         } finally {
             this.workTracker.decrementOutstanding();
         }
     }
 
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderController.java
@@ -0,0 +1,118 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.downloaders;
+
+import android.support.annotation.CheckResult;
+import android.util.Log;
+
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
+
+/**
+ * Encapsulates logic for resuming batching downloads.
+ *
+ * It's possible to resume a batching download if we have an offset token and the context in which
+ * we obtained the offset token did not change. Namely, we ensure that `since` and `order` parameters
+ * remain the same if offset is being used. See Bug 1330839 for a discussion on this.
+ *
+ * @author grisha
+ */
+public class BatchingDownloaderController {
+    private final static String LOG_TAG = "BatchingDownloaderCtrl";
+
+    private BatchingDownloaderController() {}
+
+    private static class ResumeContext {
+        private final String offset;
+        private final Long since;
+        private final String order;
+
+        private ResumeContext(String offset, Long since, String order) {
+            this.offset = offset;
+            this.since = since;
+            this.order = order;
+        }
+    }
+
+    private static ResumeContext getResumeContext(RepositoryStateProvider stateProvider, Long since, String order) {
+        // Build a "default" context around passed-in values if no context is available.
+        if (!isResumeContextSet(stateProvider)) {
+            return new ResumeContext(null, since, order);
+        }
+
+        final String offset = stateProvider.getString(RepositoryStateProvider.KEY_OFFSET);
+        final Long offsetSince = stateProvider.getLong(RepositoryStateProvider.KEY_OFFSET_SINCE);
+        final String offsetOrder = stateProvider.getString(RepositoryStateProvider.KEY_OFFSET_ORDER);
+
+        // If context is still valid, we can use it!
+        if (order.equals(offsetOrder)) {
+            return new ResumeContext(offset, offsetSince, offsetOrder);
+        }
+
+        // Build a "default" context around passed-in values.
+        return new ResumeContext(null, since, order);
+    }
+
+    /**
+     * Resumes a fetch if there is an offset present, and offset's context matches provided values.
+     * Otherwise, performs a regular fetch.
+     */
+    public static void resumeFetchSinceIfPossible(
+            BatchingDownloader downloader,
+            RepositoryStateProvider stateProvider,
+            RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
+            long since, long limit, String order) {
+        ResumeContext resumeContext = getResumeContext(stateProvider, since, order);
+
+        downloader.fetchSince(
+                fetchRecordsDelegate,
+                resumeContext.since,
+                limit,
+                resumeContext.order,
+                resumeContext.offset
+        );
+    }
+
+    @CheckResult
+    /* package-local */ static boolean setInitialResumeContextAndCommit(RepositoryStateProvider stateProvider, String offset, long since, String order) {
+        if (isResumeContextSet(stateProvider)) {
+            throw new IllegalStateException("Not allowed to set resume context more than once. Use update instead.");
+        }
+
+        return stateProvider
+                .setString(RepositoryStateProvider.KEY_OFFSET, offset)
+                .setLong(RepositoryStateProvider.KEY_OFFSET_SINCE, since)
+                .setString(RepositoryStateProvider.KEY_OFFSET_ORDER, order)
+                .commit();
+    }
+
+    @CheckResult
+    /* package-local */ static boolean updateResumeContextAndCommit(RepositoryStateProvider stateProvider, String offset) {
+        if (!isResumeContextSet(stateProvider)) {
+            throw new IllegalStateException("Tried to update resume context before it was set.");
+        }
+
+        return stateProvider
+                .setString(RepositoryStateProvider.KEY_OFFSET, offset)
+                .commit();
+    }
+
+    @CheckResult
+    /* package-local */ static boolean resetResumeContextAndCommit(RepositoryStateProvider stateProvider) {
+        return stateProvider
+                .clear(RepositoryStateProvider.KEY_OFFSET)
+                .clear(RepositoryStateProvider.KEY_OFFSET_SINCE)
+                .clear(RepositoryStateProvider.KEY_OFFSET_ORDER)
+                .commit();
+    }
+
+    /*package-local */ static boolean isResumeContextSet(RepositoryStateProvider stateProvider) {
+        final String offset = stateProvider.getString(RepositoryStateProvider.KEY_OFFSET);
+        final Long offsetSince = stateProvider.getLong(RepositoryStateProvider.KEY_OFFSET_SINCE);
+        final String offsetOrder = stateProvider.getString(RepositoryStateProvider.KEY_OFFSET_ORDER);
+
+        return offset != null && offsetSince != null && offsetOrder != null;
+    }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
@@ -5,16 +5,17 @@
 package org.mozilla.gecko.sync.stage;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.MetaGlobalException;
 import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
 import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.AndroidBrowserBookmarksRepository;
 import org.mozilla.gecko.sync.repositories.domain.BookmarkRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class AndroidBrowserBookmarksServerSyncStage extends ServerSyncStage {
   protected static final String LOG_TAG = "BookmarksStage";
@@ -34,28 +35,52 @@ public class AndroidBrowserBookmarksServ
     return "bookmarks";
   }
 
   @Override
   public Integer getStorageVersion() {
     return VersionConstants.BOOKMARKS_ENGINE_VERSION;
   }
 
+  /**
+   * We're downloading records into a non-persistent buffer for safety, so we can't use a H.W.M.
+   * Once this stage is using a persistent buffer, this should change. See Bug 1318515.
+   *
+   * @return HighWaterMark.Disabled
+   */
+  @Override
+  protected HighWaterMark getAllowedToUseHighWaterMark() {
+    return HighWaterMark.Disabled;
+  }
+
+  /**
+   * Full batching is allowed, because we want all of the records.
+   *
+   * @return MultipleBatches.Enabled
+   */
+  @Override
+  protected MultipleBatches getAllowedMultipleBatches() {
+    return MultipleBatches.Enabled;
+  }
+
   @Override
   protected Repository getRemoteRepository() throws URISyntaxException {
     return new ConfigurableServer15Repository(
             getCollection(),
             session.getSyncDeadline(),
             session.config.storageURL(),
             session.getAuthHeaderProvider(),
             session.config.infoCollections,
             session.config.infoConfiguration,
             BOOKMARKS_BATCH_LIMIT,
             BOOKMARKS_SORT,
-            true /* allow multiple batches */);
+            getAllowedMultipleBatches(),
+            getAllowedToUseHighWaterMark(),
+            getRepositoryStateProvider()
+    );
   }
 
   @Override
   protected Repository getLocalRepository() {
     return new BufferingMiddlewareRepository(
             session.getSyncDeadline(),
             new MemoryBufferStorage(),
             new AndroidBrowserBookmarksRepository()
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
@@ -3,18 +3,20 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.MetaGlobalException;
 import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
+import org.mozilla.gecko.sync.repositories.PersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.android.AndroidBrowserHistoryRepository;
 import org.mozilla.gecko.sync.repositories.domain.HistoryRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class AndroidBrowserHistoryServerSyncStage extends ServerSyncStage {
   protected static final String LOG_TAG = "HistoryStage";
 
   // Eventually this kind of sync stage will be data-driven,
@@ -37,28 +39,67 @@ public class AndroidBrowserHistoryServer
     return VersionConstants.HISTORY_ENGINE_VERSION;
   }
 
   @Override
   protected Repository getLocalRepository() {
     return new AndroidBrowserHistoryRepository();
   }
 
+  /**
+   * We use a persistent state provider for this stage, because it lets us resume interrupted
+   * syncs more efficiently.
+   * We are able to do this because we match criteria described in {@link RepositoryStateProvider}.
+   *
+   * @return Persistent repository state provider.
+     */
+  @Override
+  protected RepositoryStateProvider getRepositoryStateProvider() {
+    return new PersistentRepositoryStateProvider(
+            session.config.getBranch(statePreferencesPrefix())
+    );
+  }
+
+  /**
+   * We're downloading records oldest-first directly into live storage, forgoing any buffering other
+   * than AndroidBrowserHistoryRepository's internal records queue. These conditions allow us to use
+   * high-water-mark to resume downloads in case of interruptions.
+   *
+   * @return HighWaterMark.Enabled
+   */
+  @Override
+  protected HighWaterMark getAllowedToUseHighWaterMark() {
+    return HighWaterMark.Enabled;
+  }
+
+  /**
+   * Full batching is allowed, because we want all of the records.
+   *
+   * @return MultipleBatches.Enabled
+   */
+  @Override
+  protected MultipleBatches getAllowedMultipleBatches() {
+    return MultipleBatches.Enabled;
+  }
+
   @Override
   protected Repository getRemoteRepository() throws URISyntaxException {
     return new ConfigurableServer15Repository(
             getCollection(),
             session.getSyncDeadline(),
             session.config.storageURL(),
             session.getAuthHeaderProvider(),
             session.config.infoCollections,
             session.config.infoConfiguration,
             HISTORY_BATCH_LIMIT,
             HISTORY_SORT,
-            true /* allow multiple batches */);
+            getAllowedMultipleBatches(),
+            getAllowedToUseHighWaterMark(),
+            getRepositoryStateProvider()
+    );
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new HistoryRecordFactory();
   }
 
   @Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserRecentHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserRecentHistoryServerSyncStage.java
@@ -5,45 +5,79 @@
 package org.mozilla.gecko.sync.stage;
 
 import org.mozilla.gecko.sync.MetaGlobalException;
 import org.mozilla.gecko.sync.NonObjectJSONException;
 import org.mozilla.gecko.sync.SynchronizerConfiguration;
 import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
 import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
+import org.mozilla.gecko.sync.repositories.PersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.Repository;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.android.AndroidBrowserHistoryRepository;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
 
 /**
  * History sync stage which is limited to just recent history, and will only run if the full history
  * sync stage did not complete yet. Its purpose is to give users with a lot of history in their
  * profiles a good experience during a large collection sync.
  *
  * @author grisha
  */
 public class AndroidBrowserRecentHistoryServerSyncStage extends AndroidBrowserHistoryServerSyncStage {
     protected static final String LOG_TAG = "RecentHistoryStage";
 
-    // TODO: Bug 1316110 tracks follow up work to make this stage more efficient.
+    // Bug 1316110 tracks follow up work to generalize this stage and make it more efficient.
     private static final int HISTORY_BATCH_LIMIT = 50;
     // We need a custom configuration bundle name for this stage, because we want to track last-synced
     // timestamp for this stage separately from that of a full history sync stage, yet their collection
     // names are the same.
     private static final String BUNDLE_NAME = "recentHistory.";
     private static final String HISTORY_SORT = "newest";
 
     @Override
     public String bundlePrefix() {
         return BUNDLE_NAME;
     }
 
+    /**
+     * We use a non-persistent state provider for this stage, as it's designed to just run once.
+     *
+     * @return Non-persistent repository state provider.
+     */
+    @Override
+    protected RepositoryStateProvider getRepositoryStateProvider() {
+        return new NonPersistentRepositoryStateProvider();
+    }
+
+    /**
+     * Force download to be limited to a single batch.
+     * We just to want fetch a batch-worth of records for this stage.
+     *
+     * @return MultipleBatches.Disabled
+     */
+    @Override
+    protected MultipleBatches getAllowedMultipleBatches() {
+        return MultipleBatches.Disabled;
+    }
+
+    /**
+     * Right now this stage is designed to run just once, when there's no history data available.
+     *
+     * @return HighWaterMark.Disabled
+     */
+    @Override
+    protected HighWaterMark getAllowedToUseHighWaterMark() {
+        return HighWaterMark.Disabled;
+    }
+
     @Override
     protected Repository getLocalRepository() {
         return new BufferingMiddlewareRepository(
                 session.getSyncDeadline(),
                 new MemoryBufferStorage(),
                 new AndroidBrowserHistoryRepository()
         );
     }
@@ -54,17 +88,19 @@ public class AndroidBrowserRecentHistory
                 getCollection(),
                 session.getSyncDeadline(),
                 session.config.storageURL(),
                 session.getAuthHeaderProvider(),
                 session.config.infoCollections,
                 session.config.infoConfiguration,
                 HISTORY_BATCH_LIMIT,
                 HISTORY_SORT,
-                false /* force single batch only */);
+                getAllowedMultipleBatches(),
+                getAllowedToUseHighWaterMark(),
+                getRepositoryStateProvider());
     }
 
     /**
      * This stage is only enabled if full history session is enabled and did not complete a sync yet.
      */
     @Override
     public boolean isEnabled() throws MetaGlobalException {
         final boolean historyStageEnabled = super.isEnabled();
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
@@ -5,16 +5,17 @@
 package org.mozilla.gecko.sync.stage;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.CryptoRecord;
 import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
 import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.FormHistoryRepositorySession;
 import org.mozilla.gecko.sync.repositories.domain.FormHistoryRecord;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class FormHistoryServerSyncStage extends ServerSyncStage {
@@ -34,29 +35,53 @@ public class FormHistoryServerSyncStage 
     return "forms";
   }
 
   @Override
   public Integer getStorageVersion() {
     return VersionConstants.FORMS_ENGINE_VERSION;
   }
 
+  /**
+   * We're downloading records into a non-persistent buffer for safety, so we can't use a H.W.M.
+   * Once this stage is using a persistent buffer, this should change.
+   *
+   * @return HighWaterMark.Disabled
+   */
+  @Override
+  protected HighWaterMark getAllowedToUseHighWaterMark() {
+    return HighWaterMark.Disabled;
+  }
+
+  /**
+   * Full batching is allowed, because we want all of the records.
+   *
+   * @return MultipleBatches.Enabled
+   */
+  @Override
+  protected MultipleBatches getAllowedMultipleBatches() {
+    return MultipleBatches.Enabled;
+  }
+
   @Override
   protected Repository getRemoteRepository() throws URISyntaxException {
     String collection = getCollection();
     return new ConfigurableServer15Repository(
             collection,
             session.getSyncDeadline(),
             session.config.storageURL(),
             session.getAuthHeaderProvider(),
             session.config.infoCollections,
             session.config.infoConfiguration,
             FORM_HISTORY_BATCH_LIMIT,
             FORM_HISTORY_SORT,
-            true /* allow multiple batches */);
+            getAllowedMultipleBatches(),
+            getAllowedToUseHighWaterMark(),
+            getRepositoryStateProvider()
+    );
   }
 
   @Override
   protected Repository getLocalRepository() {
     return new BufferingMiddlewareRepository(
             session.getSyncDeadline(),
             new MemoryBufferStorage(),
             new FormHistoryRepositorySession.FormHistoryRepository()
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
@@ -22,20 +22,22 @@ import org.mozilla.gecko.sync.delegates.
 import org.mozilla.gecko.sync.middleware.Crypto5MiddlewareRepository;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.BaseResource;
 import org.mozilla.gecko.sync.net.SyncStorageRequest;
 import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.repositories.InactiveSessionException;
 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
 import org.mozilla.gecko.sync.synchronizer.ServerLocalSynchronizer;
 import org.mozilla.gecko.sync.synchronizer.Synchronizer;
 import org.mozilla.gecko.sync.synchronizer.SynchronizerDelegate;
@@ -56,16 +58,30 @@ import java.util.concurrent.ExecutorServ
 public abstract class ServerSyncStage extends AbstractSessionManagingSyncStage implements SynchronizerDelegate {
 
   protected static final String LOG_TAG = "ServerSyncStage";
 
   protected long stageStartTimestamp = -1;
   protected long stageCompleteTimestamp = -1;
 
   /**
+   * Poor-man's boolean typing.
+   * These enums are used to configure {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
+   */
+  public enum HighWaterMark {
+    Enabled,
+    Disabled
+  }
+
+  public enum MultipleBatches {
+    Enabled,
+    Disabled
+  }
+
+  /**
    * Override these in your subclasses.
    *
    * @return true if this stage should be executed.
    * @throws MetaGlobalException
    */
   protected boolean isEnabled() throws MetaGlobalException {
     EngineSettings engineSettings = null;
     try {
@@ -136,25 +152,54 @@ public abstract class ServerSyncStage ex
     return new EngineSettings(config.syncID, version);
   }
 
   protected abstract String getCollection();
   protected abstract String getEngineName();
   protected abstract Repository getLocalRepository();
   protected abstract RecordFactory getRecordFactory();
 
+  /**
+   * Used to configure a {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
+   * Override this if you need a persistent repository state provider.
+   *
+   * @return Non-persistent state provider.
+   */
+  protected RepositoryStateProvider getRepositoryStateProvider() {
+    return new NonPersistentRepositoryStateProvider();
+  }
+
+  /**
+   * Used to configure a {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
+   * Override this if you want to restrict downloader to just a single batch.
+   */
+  protected MultipleBatches getAllowedMultipleBatches() {
+    return MultipleBatches.Enabled;
+  }
+
+  /**
+   * Used to configure a {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
+   * Override this if you want to allow resuming record downloads from a high-water-mark.
+   * Ensure you're using a {@link org.mozilla.gecko.sync.repositories.PersistentRepositoryStateProvider}
+   * to persist high-water-mark across syncs.
+   */
+  protected HighWaterMark getAllowedToUseHighWaterMark() {
+    return HighWaterMark.Disabled;
+  }
+
   // Override this in subclasses.
   protected Repository getRemoteRepository() throws URISyntaxException {
     String collection = getCollection();
     return new Server15Repository(collection,
                                   session.getSyncDeadline(),
                                   session.config.storageURL(),
                                   session.getAuthHeaderProvider(),
                                   session.config.infoCollections,
-                                  session.config.infoConfiguration);
+                                  session.config.infoConfiguration,
+                                  new NonPersistentRepositoryStateProvider());
   }
 
   /**
    * Return a Crypto5Middleware-wrapped Server15Repository.
    *
    * @throws NoCollectionKeysSetException
    * @throws URISyntaxException
    */
@@ -165,16 +210,20 @@ public abstract class ServerSyncStage ex
     cryptoRepo.recordFactory = getRecordFactory();
     return cryptoRepo;
   }
 
   protected String bundlePrefix() {
     return this.getCollection() + ".";
   }
 
+  protected String statePreferencesPrefix() {
+    return this.getCollection() + ".state.";
+  }
+
   protected SynchronizerConfiguration getConfig() throws NonObjectJSONException, IOException {
     return new SynchronizerConfiguration(session.config.getBranch(bundlePrefix()));
   }
 
   protected void persistConfig(SynchronizerConfiguration synchronizerConfiguration) {
     synchronizerConfiguration.persist(session.config.getBranch(bundlePrefix()));
   }
 
@@ -185,21 +234,31 @@ public abstract class ServerSyncStage ex
     synchronizer.repositoryA = remote;
     synchronizer.repositoryB = this.getLocalRepository();
     synchronizer.load(getConfig());
 
     return synchronizer;
   }
 
   /**
-   * Reset timestamps.
+   * Reset timestamps and any repository state.
    */
   @Override
   protected void resetLocal() {
     resetLocalWithSyncID(null);
+    if (!getRepositoryStateProvider().resetAndCommit()) {
+      // At the very least, we can log this.
+      // Failing to reset at this point means that we'll have lingering state for any stages using a
+      // persistent provider. In certain cases this might negatively affect first sync of this stage
+      // in the future.
+      // Our timestamp resetting code in `persistConfig` is affected by the same problem.
+      // A way to work around this is to further prefix our persisted SharedPreferences with
+      // clientID/syncID, ensuring a very defined scope for any persisted state. See Bug 1332431.
+      Logger.warn(LOG_TAG, "Failed to reset repository state");
+    }
   }
 
   /**
    * Reset timestamps and possibly set syncID.
    * @param syncID if non-null, new syncID to persist.
    */
   protected void resetLocalWithSyncID(String syncID) {
     // Clear both timestamps.
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -1,16 +1,17 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.synchronizer;
 
 import android.support.annotation.NonNull;
 import android.support.annotation.Nullable;
+import android.support.annotation.VisibleForTesting;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.mozilla.gecko.background.common.log.Logger;
 import org.mozilla.gecko.sync.ReflowIsNecessaryException;
 import org.mozilla.gecko.sync.ThreadPool;
@@ -171,26 +172,17 @@ public class RecordsChannel implements
     numFetched.set(0);
     numFetchFailed.set(0);
     numStored.set(0);
     numStoreFailed.set(0);
     // Start a consumer thread.
     this.consumer = new ConcurrentRecordConsumer(this);
     ThreadPool.run(this.consumer);
     waitingForQueueDone = true;
-
-    // Fetch all records that were modified since our previous flow. If our previous flow succeeded,
-    // we will use source's last-sync timestamp. If our previous flow didn't complete, resume it,
-    // starting from sink's high water mark timestamp.
-    // If there was no previous flow (first sync, or data was cleared...), fetch everything.
-    // Resuming a flow is supported for buffered RepositorySessions. We degrade gracefully otherwise.
-    final long highWaterMark = sink.getHighWaterMarkTimestamp();
-    final long lastSync = source.getLastSyncTimestamp();
-    final long sinceTimestamp = Math.max(highWaterMark, lastSync);
-    source.fetchSince(sinceTimestamp, this);
+    source.fetchSince(source.getLastSyncTimestamp(), this);
   }
 
   /**
    * Begin both sessions, invoking flow() when done.
    * @throws InvalidSessionTransitionException
    */
   public void beginAndFlow() throws InvalidSessionTransitionException {
     Logger.trace(LOG_TAG, "Beginning source.");
@@ -210,19 +202,19 @@ public class RecordsChannel implements
 
   @Override
   public void onFetchFailed(Exception ex) {
     Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
     numFetchFailed.incrementAndGet();
     if (ex instanceof ReflowIsNecessaryException) {
       setReflowException((ReflowIsNecessaryException) ex);
     }
+    delegate.onFlowFetchFailed(this, ex);
     // Sink will be informed once consumer finishes.
     this.consumer.halt();
-    delegate.onFlowFetchFailed(this, ex);
   }
 
   @Override
   public void onFetchedRecord(Record record) {
     numFetched.incrementAndGet();
     this.toProcess.add(record);
     this.consumer.doNotify();
   }
@@ -313,16 +305,17 @@ public class RecordsChannel implements
 
     // Prevent "once consumer is done..." actions from taking place. They already have (case 2), or
     // we don't need them (case 1).
     waitingForQueueDone = false;
 
     // If consumer is still going at it, tell it to stop.
     this.consumer.halt();
 
+    delegate.onFlowStoreFailed(this, ex, null);
     delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis());
   }
 
   @Override
   public void onBeginFailed(Exception ex) {
     delegate.onFlowBeginFailed(this, ex);
   }
 
@@ -358,17 +351,18 @@ public class RecordsChannel implements
 
   @Override
   public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
     // Lie outright. We know that all of our fetch methods are safe.
     return this;
   }
 
   @Nullable
-  /* package-local */ synchronized ReflowIsNecessaryException getReflowException() {
+  @VisibleForTesting
+  public synchronized ReflowIsNecessaryException getReflowException() {
     return reflowException;
   }
 
   private synchronized void setReflowException(@NonNull ReflowIsNecessaryException e) {
     // It is a mistake to set reflow exception multiple times.
     if (reflowException != null) {
       throw new IllegalStateException("Reflow exception already set: " + reflowException);
     }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer15Repository.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer15Repository.java
@@ -6,36 +6,39 @@ package org.mozilla.android.sync.net.tes
 import android.os.SystemClock;
 
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.concurrent.TimeUnit;
 
 @RunWith(TestRunner.class)
 public class TestServer15Repository {
 
   private static final String COLLECTION = "bookmarks";
   private static final String COLLECTION_URL = "http://foo.com/1.5/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage";
   private static final long SYNC_DEADLINE = SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30);
 
   protected final InfoCollections infoCollections = new InfoCollections();
   protected final InfoConfiguration infoConfiguration = new InfoConfiguration();
+  protected final RepositoryStateProvider stateProvider = new NonPersistentRepositoryStateProvider();
 
   public static void assertQueryEquals(String expected, URI u) {
     Assert.assertEquals(expected, u.getRawQuery());
   }
 
   @Test
   public void testCollectionURI() throws URISyntaxException {
-    Server15Repository noTrailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL, null, infoCollections, infoConfiguration);
-    Server15Repository trailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL + "/", null, infoCollections, infoConfiguration);
+    Server15Repository noTrailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL, null, infoCollections, infoConfiguration, stateProvider);
+    Server15Repository trailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL + "/", null, infoCollections, infoConfiguration, stateProvider);
     Assert.assertEquals("http://foo.com/1.5/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage/bookmarks", noTrailingSlash.collectionURI().toASCIIString());
     Assert.assertEquals("http://foo.com/1.5/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage/bookmarks", trailingSlash.collectionURI().toASCIIString());
   }
 }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
@@ -60,23 +60,21 @@ public class BufferingMiddlewareReposito
 
         MockRecord record = new MockRecord("guid1", null, 1, false);
         bufferingSession.store(record);
         assertEquals(1, bufferStorage.all().size());
 
         MockRecord record1 = new MockRecord("guid2", null, 1, false);
         bufferingSession.store(record1);
         assertEquals(2, bufferStorage.all().size());
-        assertEquals(1, bufferStorage.latestModifiedTimestamp());
 
         // record2 must replace record.
         MockRecord record2 = new MockRecord("guid1", null, 2, false);
         bufferingSession.store(record2);
         assertEquals(2, bufferStorage.all().size());
-        assertEquals(2, bufferStorage.latestModifiedTimestamp());
 
         // Ensure inner session doesn't see incoming records.
         verify(innerRepositorySession, never()).store(record);
         verify(innerRepositorySession, never()).store(record1);
         verify(innerRepositorySession, never()).store(record2);
     }
 
     @Test
@@ -172,32 +170,9 @@ public class BufferingMiddlewareReposito
     }
 
     @Test
     public void setStoreDelegate() throws Exception {
         RepositorySessionStoreDelegate delegate = mock(RepositorySessionStoreDelegate.class);
         bufferingSession.setStoreDelegate(delegate);
         verify(innerRepositorySession).setStoreDelegate(delegate);
     }
-
-    @Test
-    public void getHighWaterMarkTimestamp() throws Exception {
-        // Trivial case, empty buffer.
-        assertEquals(0, bufferingSession.getHighWaterMarkTimestamp());
-
-        MockRecord record = new MockRecord("guid1", null, 1, false);
-        bufferingSession.store(record);
-        assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
-
-        MockRecord record3 = new MockRecord("guid3", null, 5, false);
-        bufferingSession.store(record3);
-        assertEquals(5, bufferingSession.getHighWaterMarkTimestamp());
-
-        // NB: same guid as above.
-        MockRecord record4 = new MockRecord("guid3", null, -1, false);
-        bufferingSession.store(record4);
-        assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
-
-        MockRecord record2 = new MockRecord("guid2", null, 13, false);
-        bufferingSession.store(record2);
-        assertEquals(13, bufferingSession.getHighWaterMarkTimestamp());
-    }
 }
\ No newline at end of file
new file mode 100644
--- /dev/null
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderControllerTest.java
@@ -0,0 +1,91 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories.downloaders;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.InfoCollections;
+import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
+
+import static org.junit.Assert.*;
+
+@RunWith(TestRunner.class)
+public class BatchingDownloaderControllerTest {
+    private BatchingDownloaderTest.MockSever15Repository serverRepository;
+    private Server15RepositorySession repositorySession;
+    private BatchingDownloaderTest.MockSessionFetchRecordsDelegate sessionFetchRecordsDelegate;
+    private BatchingDownloaderTest.MockDownloader mockDownloader;
+    private BatchingDownloaderTest.CountingShadowRepositoryState repositoryStateProvider;
+
+    @Before
+    public void setUp() throws Exception {
+        sessionFetchRecordsDelegate = new BatchingDownloaderTest.MockSessionFetchRecordsDelegate();
+
+        serverRepository = new BatchingDownloaderTest.MockSever15Repository(
+                "dummyCollection", "http://dummy.url/", null,
+                new InfoCollections(), new InfoConfiguration());
+        repositorySession = new Server15RepositorySession(serverRepository);
+        repositoryStateProvider = new BatchingDownloaderTest.CountingShadowRepositoryState();
+        mockDownloader = new BatchingDownloaderTest.MockDownloader(repositorySession, true, true, repositoryStateProvider);
+    }
+
+    @Test
+    public void resumeFetchSinceIfPossible() throws Exception {
+        assertTrue(BatchingDownloaderController.setInitialResumeContextAndCommit(repositoryStateProvider, "offset1", 2L, "oldest"));
+
+        // Test that we'll resume from offset if context is correct.
+        BatchingDownloaderController.resumeFetchSinceIfPossible(
+                mockDownloader, repositoryStateProvider, sessionFetchRecordsDelegate, 3L, 25L, "oldest");
+        assertEquals("offset1", mockDownloader.offset);
+        // Ensure we'll use context-provided since value.
+        assertEquals(2L, mockDownloader.newer);
+        assertEquals("oldest", mockDownloader.sort);
+
+        assertTrue(BatchingDownloaderController.updateResumeContextAndCommit(repositoryStateProvider, "offset2"));
+
+        // Test that we won't resume on context mismatch. Ensure that we use new context.
+        BatchingDownloaderController.resumeFetchSinceIfPossible(
+                mockDownloader, repositoryStateProvider, sessionFetchRecordsDelegate, 1L, 25L, "newest");
+        assertEquals(null, mockDownloader.offset);
+        assertEquals(1L, mockDownloader.newer);
+        assertEquals("newest", mockDownloader.sort);
+
+        assertTrue(BatchingDownloaderController.updateResumeContextAndCommit(repositoryStateProvider, "offset3"));
+
+        // Test that we may fetch with a different limit and still resume since our context is valid.
+        BatchingDownloaderController.resumeFetchSinceIfPossible(
+                mockDownloader, repositoryStateProvider, sessionFetchRecordsDelegate, 3L, 50L, "oldest");
+        assertEquals("offset3", mockDownloader.offset);
+        assertEquals("oldest", mockDownloader.sort);
+        assertEquals(2L, mockDownloader.newer);
+    }
+
+    @Test
+    public void testInitialSetAndUpdateOfContext() throws Exception {
+        assertFalse(BatchingDownloaderController.isResumeContextSet(repositoryStateProvider));
+
+        // Test that we can't update context which wasn't set yet.
+        try {
+            assertFalse(BatchingDownloaderController.updateResumeContextAndCommit(repositoryStateProvider, "offset2"));
+            fail();
+        } catch (IllegalStateException e) {}
+
+        // Test that we can set context and check that it's set.
+        assertTrue(BatchingDownloaderController.setInitialResumeContextAndCommit(repositoryStateProvider, "offset1", 1L, "newest"));
+        assertTrue(BatchingDownloaderController.isResumeContextSet(repositoryStateProvider));
+
+        // Test that we can't set context after it was already set.
+        try {
+            assertFalse(BatchingDownloaderController.setInitialResumeContextAndCommit(repositoryStateProvider, "offset1", 1L, "newest"));
+            fail();
+        } catch (IllegalStateException e) {}
+
+        // Test that we can update context after it was set.
+        assertTrue(BatchingDownloaderController.updateResumeContextAndCommit(repositoryStateProvider, "offset2"));
+    }
+}
\ No newline at end of file
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
@@ -14,16 +14,17 @@ import org.mozilla.gecko.background.test
 import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
 import org.mozilla.gecko.sync.CryptoRecord;
 import org.mozilla.gecko.sync.HTTPFailureException;
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
 import java.net.URI;
 import java.util.concurrent.ExecutorService;
@@ -49,16 +50,18 @@ public class BatchingDownloaderDelegateT
         public Exception ex;
 
         public MockDownloader(RepositorySession repositorySession) {
             super(
                     null,
                     Uri.EMPTY,
                     SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
                     true,
+                    true,
+                    new NonPersistentRepositoryStateProvider(),
                     repositorySession
             );
         }
 
         @Override
         public void onFetchCompleted(SyncStorageResponse response,
                                      final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
                                      final SyncStorageCollectionRequest request,
@@ -111,28 +114,31 @@ public class BatchingDownloaderDelegateT
     @Before
     public void setUp() throws Exception {
         repositorySession = new Server15RepositorySession(new Server15Repository(
                 "dummyCollection",
                 SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
                 DEFAULT_COLLECTION_URL,
                 null,
                 new InfoCollections(),
-                new InfoConfiguration())
+                new InfoConfiguration(),
+                new NonPersistentRepositoryStateProvider())
         );
         mockDownloader = new MockDownloader(repositorySession);
     }
 
     @Test
     public void testIfUnmodifiedSince() throws Exception {
         BatchingDownloader downloader = new BatchingDownloader(
                 null,
                 Uri.EMPTY,
                 SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
                 true,
+                true,
+                new NonPersistentRepositoryStateProvider(),
                 repositorySession
         );
         RepositorySessionFetchRecordsDelegate delegate = new SimpleSessionFetchRecordsDelegate();
         BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(downloader, delegate,
                 new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
         String lastModified = "12345678";
         SyncStorageResponse response = makeSyncStorageResponse(200, lastModified);
         downloaderDelegate.handleRequestSuccess(response);
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
@@ -2,62 +2,128 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.repositories.downloaders;
 
 import android.net.Uri;
 import android.os.SystemClock;
 import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
 import org.mozilla.gecko.sync.CryptoRecord;
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.SyncDeadlineReachedException;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import ch.boye.httpclientandroidlib.ProtocolVersion;
 import ch.boye.httpclientandroidlib.message.BasicHttpResponse;
 import ch.boye.httpclientandroidlib.message.BasicStatusLine;
 
 import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
 
 @RunWith(TestRunner.class)
 public class BatchingDownloaderTest {
     private MockSever15Repository serverRepository;
     private Server15RepositorySession repositorySession;
     private MockSessionFetchRecordsDelegate sessionFetchRecordsDelegate;
     private MockDownloader mockDownloader;
     private String DEFAULT_COLLECTION_NAME = "dummyCollection";
-    private String DEFAULT_COLLECTION_URL = "http://dummy.url/";
+    private static String DEFAULT_COLLECTION_URL = "http://dummy.url/";
     private long DEFAULT_NEWER = 1;
     private String DEFAULT_SORT = "oldest";
     private String DEFAULT_IDS = "1";
     private String DEFAULT_LMHEADER = "12345678";
 
-    class MockSessionFetchRecordsDelegate implements RepositorySessionFetchRecordsDelegate {
+    private CountingShadowRepositoryState repositoryStateProvider;
+
+    // Memory-backed state implementation which keeps a shadow of current values, so that they can be
+    // queried by the tests. Classes under test do not have access to the shadow, and follow regular
+    // non-persistent state semantics (value changes visible only after commit).
+    static class CountingShadowRepositoryState extends NonPersistentRepositoryStateProvider {
+        private AtomicInteger commitCount = new AtomicInteger(0);
+        private final Map<String, Object> shadowMap = Collections.synchronizedMap(new HashMap<String, Object>(2));
+
+        @Override
+        public boolean commit() {
+            commitCount.incrementAndGet();
+            return super.commit();
+        }
+
+        int getCommitCount() {
+            return commitCount.get();
+        }
+
+        @Nullable
+        public Long getShadowedLong(String key) {
+            return (Long) shadowMap.get(key);
+        }
+
+        @Nullable
+        public String getShadowedString(String key) {
+            return (String) shadowMap.get(key);
+        }
+
+        @Override
+        public CountingShadowRepositoryState setLong(String key, Long value) {
+            shadowMap.put(key, value);
+            super.setLong(key, value);
+            return this;
+        }
+
+        @Override
+        public CountingShadowRepositoryState setString(String key, String value) {
+            shadowMap.put(key, value);
+            super.setString(key, value);
+            return this;
+        }
+
+        @Override
+        public CountingShadowRepositoryState clear(String key) {
+            shadowMap.remove(key);
+            super.clear(key);
+            return this;
+        }
+
+        @Override
+        public boolean resetAndCommit() {
+            shadowMap.clear();
+            return super.resetAndCommit();
+        }
+    }
+
+    static class MockSessionFetchRecordsDelegate implements RepositorySessionFetchRecordsDelegate {
         public boolean isFailure;
         public boolean isFetched;
         public boolean isSuccess;
         public int batchesCompleted;
         public Exception ex;
         public Record record;
 
         @Override
@@ -83,39 +149,40 @@ public class BatchingDownloaderTest {
         }
 
         @Override
         public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
             return null;
         }
     }
 
-    class MockRequest extends SyncStorageCollectionRequest {
+    static class MockRequest extends SyncStorageCollectionRequest {
 
-        public MockRequest(URI uri) {
+        MockRequest(URI uri) {
             super(uri);
         }
 
         @Override
         public void get() {
 
         }
     }
 
-    class MockDownloader extends BatchingDownloader {
+    static class MockDownloader extends BatchingDownloader {
         public long newer;
         public long limit;
         public boolean full;
         public String sort;
         public String ids;
         public String offset;
         public boolean abort;
 
-        public MockDownloader(RepositorySession repositorySession, boolean allowMultipleBatches) {
-            super(null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), allowMultipleBatches, repositorySession);
+        MockDownloader(RepositorySession repositorySession, boolean allowMultipleBatches, boolean keepTrackOfHighWaterMark, RepositoryStateProvider repositoryStateProvider) {
+            super(null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
+                    allowMultipleBatches, keepTrackOfHighWaterMark, repositoryStateProvider, repositorySession);
         }
 
         @Override
         public void fetchWithParameters(long newer,
                                  long batchLimit,
                                  boolean full,
                                  String sort,
                                  String ids,
@@ -144,45 +211,48 @@ public class BatchingDownloaderTest {
                       String ids,
                       String offset)
                 throws URISyntaxException, UnsupportedEncodingException {
             this.offset = offset;
             return super.makeSyncStorageCollectionRequest(newer, batchLimit, full, sort, ids, offset);
         }
     }
 
-    class MockSever15Repository extends Server15Repository {
-        public MockSever15Repository(@NonNull String collection, @NonNull String storageURL,
+    static class MockSever15Repository extends Server15Repository {
+        MockSever15Repository(@NonNull String collection, @NonNull String storageURL,
                                      AuthHeaderProvider authHeaderProvider, @NonNull InfoCollections infoCollections,
                                      @NonNull InfoConfiguration infoConfiguration) throws URISyntaxException {
-            super(collection, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), storageURL, authHeaderProvider, infoCollections, infoConfiguration);
+            super(collection, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
+                    storageURL, authHeaderProvider, infoCollections, infoConfiguration,
+                    new NonPersistentRepositoryStateProvider());
         }
     }
 
-    class MockRepositorySession extends Server15RepositorySession {
+    static class MockRepositorySession extends Server15RepositorySession {
         public boolean abort;
 
-        public MockRepositorySession(Repository repository) {
+        MockRepositorySession(Repository repository) {
             super(repository);
         }
 
         @Override
         public void abort() {
             this.abort = true;
         }
     }
 
     @Before
     public void setUp() throws Exception {
         sessionFetchRecordsDelegate = new MockSessionFetchRecordsDelegate();
 
         serverRepository = new MockSever15Repository(DEFAULT_COLLECTION_NAME, DEFAULT_COLLECTION_URL, null,
                 new InfoCollections(), new InfoConfiguration());
         repositorySession = new Server15RepositorySession(serverRepository);
-        mockDownloader = new MockDownloader(repositorySession, true);
+        repositoryStateProvider = new CountingShadowRepositoryState();
+        mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
     }
 
     @Test
     public void testFlattenId() {
         String[] emptyGuid = new String[]{};
         String flatten =  BatchingDownloader.flattenIDs(emptyGuid);
         assertEquals("", flatten);
 
@@ -199,122 +269,135 @@ public class BatchingDownloaderTest {
         multiGuid[1] = guid1;
         multiGuid[2] = guid2;
         flatten = BatchingDownloader.flattenIDs(multiGuid);
         assertEquals("123456789abc,456789abc,789abc", flatten);
     }
 
     @Test
     public void testBatchingTrivial() throws Exception {
-        MockDownloader mockDownloader = new MockDownloader(repositorySession, true);
+        MockDownloader mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
 
         assertNull(mockDownloader.getLastModified());
         // Number of records == batch limit.
         final long BATCH_LIMIT = 100;
-        mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
 
         SyncStorageResponse response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, null, "100");
         SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
         mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request,
                 DEFAULT_NEWER, BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         assertTrue(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(0, sessionFetchRecordsDelegate.batchesCompleted);
+
+        // NB: we set highWaterMark as part of onFetchedRecord, so we don't expect it to be set here.
+        // Expect no offset to be persisted.
+        ensureOffsetContextIsNull(repositoryStateProvider);
+        assertEquals(1, repositoryStateProvider.getCommitCount());
     }
 
     @Test
     public void testBatchingSingleBatchMode() throws Exception {
-        MockDownloader mockDownloader = new MockDownloader(repositorySession, false);
+        MockDownloader mockDownloader = new MockDownloader(repositorySession, false, true, repositoryStateProvider);
 
         assertNull(mockDownloader.getLastModified());
         // Number of records > batch limit. But, we're only allowed to make one batch request.
         final long BATCH_LIMIT = 100;
-        mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
 
         String offsetHeader = "25";
         String recordsHeader = "500";
         SyncStorageResponse response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
         SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
         mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request,
                 DEFAULT_NEWER, BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         assertTrue(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(0, sessionFetchRecordsDelegate.batchesCompleted);
+
+        // We don't care about the offset in a single batch mode.
+        ensureOffsetContextIsNull(repositoryStateProvider);
+        assertEquals(1, repositoryStateProvider.getCommitCount());
     }
 
     @Test
     public void testBatching() throws Exception {
         final long BATCH_LIMIT = 25;
-        mockDownloader = new MockDownloader(repositorySession, true);
+        mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
 
         assertNull(mockDownloader.getLastModified());
-        mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
 
-        String offsetHeader = "25";
-        String recordsHeader = "25";
-        SyncStorageResponse response = makeSyncStorageResponse(200,  DEFAULT_LMHEADER, offsetHeader, recordsHeader);
-        SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
-        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
-                BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
+        final String recordsHeader = "25";
+        performOnFetchCompleted("25", recordsHeader, BATCH_LIMIT);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         // Verify the same parameters are used in the next fetch.
         assertSameParameters(mockDownloader, BATCH_LIMIT);
-        assertEquals(offsetHeader, mockDownloader.offset);
+        assertEquals("25", mockDownloader.offset);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(1, sessionFetchRecordsDelegate.batchesCompleted);
 
+        // Offset context set.
+        ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
+        assertEquals(1, repositoryStateProvider.getCommitCount());
+
         // The next batch, we still have an offset token and has not exceed the total limit.
-        offsetHeader = "50";
-        response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
-        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
-                BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
+        performOnFetchCompleted("50", recordsHeader, BATCH_LIMIT);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         // Verify the same parameters are used in the next fetch.
         assertSameParameters(mockDownloader, BATCH_LIMIT);
-        assertEquals(offsetHeader, mockDownloader.offset);
+        assertEquals("50", mockDownloader.offset);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(2, sessionFetchRecordsDelegate.batchesCompleted);
 
+        // Offset context updated.
+        ensureOffsetContextIs(repositoryStateProvider, "50", "oldest", 1L);
+        assertEquals(2, repositoryStateProvider.getCommitCount());
+
         // The next batch, we still have an offset token and has not exceed the total limit.
-        offsetHeader = "75";
-        response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
-        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
-                BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
+        performOnFetchCompleted("75", recordsHeader, BATCH_LIMIT);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         // Verify the same parameters are used in the next fetch.
         assertSameParameters(mockDownloader, BATCH_LIMIT);
-        assertEquals(offsetHeader, mockDownloader.offset);
+        assertEquals("75", mockDownloader.offset);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(3, sessionFetchRecordsDelegate.batchesCompleted);
 
+        // Offset context updated.
+        ensureOffsetContextIs(repositoryStateProvider, "75", "oldest", 1L);
+        assertEquals(3, repositoryStateProvider.getCommitCount());
+
         // No more offset token, so we complete batching.
-        response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, null, recordsHeader);
-        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
-                BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
+        performOnFetchCompleted(null, recordsHeader, BATCH_LIMIT);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         assertTrue(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(3, sessionFetchRecordsDelegate.batchesCompleted);
+
+        // Offset context cleared since we finished batching, and committed.
+        ensureOffsetContextIsNull(repositoryStateProvider);
+        assertEquals(4, repositoryStateProvider.getCommitCount());
     }
 
     @Test
     public void testFailureLMChangedMultiBatch() throws Exception {
         assertNull(mockDownloader.getLastModified());
 
         String lmHeader = "12345678";
         String offsetHeader = "100";
@@ -331,57 +414,140 @@ public class BatchingDownloaderTest {
         assertTrue(mockDownloader.full);
         assertEquals(DEFAULT_SORT, mockDownloader.sort);
         assertEquals(DEFAULT_IDS, mockDownloader.ids);
         assertEquals(offsetHeader, mockDownloader.offset);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
 
+        // Offset context set.
+        ensureOffsetContextIs(repositoryStateProvider, "100", "oldest", 1L);
+        assertEquals(1, repositoryStateProvider.getCommitCount());
+
         // Last modified header somehow changed.
         lmHeader = "10000000";
         response = makeSyncStorageResponse(200, lmHeader, offsetHeader, null);
         mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
                 limit, true, DEFAULT_SORT, DEFAULT_IDS);
 
         assertNotEquals(lmHeader, mockDownloader.getLastModified());
         assertTrue(mockDownloader.abort);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertTrue(sessionFetchRecordsDelegate.isFailure);
+
+        // Since we failed due to a remotely modified collection, we expect offset context to be reset.
+        ensureOffsetContextIsNull(repositoryStateProvider);
+        assertEquals(2, repositoryStateProvider.getCommitCount());
     }
 
     @Test
     public void testFailureException() throws Exception {
         Exception ex = new IllegalStateException();
         SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI("http://dummy.url"));
         mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
 
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertTrue(sessionFetchRecordsDelegate.isFailure);
         assertEquals(ex.getClass(), sessionFetchRecordsDelegate.ex.getClass());
         assertNull(sessionFetchRecordsDelegate.record);
     }
 
     @Test
+    public void testOffsetNotResetAfterDeadline() throws Exception {
+        final long BATCH_LIMIT = 25;
+        mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
+
+        SyncStorageCollectionRequest request = performOnFetchCompleted("25", "25", 25);
+
+        // Offset context set.
+        ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
+        assertEquals(1, repositoryStateProvider.getCommitCount());
+
+        Exception ex = new SyncDeadlineReachedException();
+        mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
+
+        // Offset context not reset.
+        ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
+        assertEquals(2, repositoryStateProvider.getCommitCount());
+    }
+
+    @Test
+    public void testOffsetResetAfterConcurrentModification() throws Exception {
+        final long BATCH_LIMIT = 25;
+        mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
+
+        SyncStorageCollectionRequest request = performOnFetchCompleted("25", "25", 25);
+
+        // Offset context set.
+        ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
+        assertEquals(1, repositoryStateProvider.getCommitCount());
+
+        Exception ex = new CollectionConcurrentModificationException();
+        mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
+
+        // Offset is reset.
+        ensureOffsetContextIsNull(repositoryStateProvider);
+        assertEquals(2, repositoryStateProvider.getCommitCount());
+    }
+
+    @Test
     public void testFetchRecord() {
         CryptoRecord record = new CryptoRecord();
         mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
 
         assertTrue(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(record, sessionFetchRecordsDelegate.record);
     }
 
     @Test
+    public void testHighWaterMarkTracking() {
+        CryptoRecord record = new CryptoRecord();
+
+        // HWM enabled
+        mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
+
+        record.lastModified = 1L;
+        mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
+        assertEquals(Long.valueOf(1), repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+
+        record.lastModified = 5L;
+        mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
+        assertEquals(Long.valueOf(5), repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+
+        // NB: Currently nothing is preventing HWM from "going down".
+        record.lastModified = 4L;
+        mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
+        assertEquals(Long.valueOf(4), repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+
+        // HWM disabled
+        mockDownloader = new MockDownloader(repositorySession, true, false, repositoryStateProvider);
+        assertTrue(repositoryStateProvider.resetAndCommit());
+
+        record.lastModified = 4L;
+        mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+
+        record.lastModified = 5L;
+        mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+    }
+
+    @Test
     public void testAbortRequests() {
         MockRepositorySession mockRepositorySession = new MockRepositorySession(serverRepository);
-        BatchingDownloader downloader = new BatchingDownloader(null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), true, mockRepositorySession);
+        BatchingDownloader downloader = new BatchingDownloader(
+                null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
+                true, true, new NonPersistentRepositoryStateProvider(), mockRepositorySession);
         assertFalse(mockRepositorySession.abort);
         downloader.abortRequests();
         assertTrue(mockRepositorySession.abort);
     }
 
     @Test
     public void testBuildCollectionURI() {
         try {
@@ -393,16 +559,25 @@ public class BatchingDownloaderTest {
 
             final Uri baseUri = Uri.parse("https://moztest.org/collection/");
             assertEquals(baseUri + "?full=1&ids=123%2Cabc&offset=1234", BatchingDownloader.buildCollectionURI(baseUri, true, -1L, -1, null, "123,abc", "1234").toString());
         } catch (URISyntaxException e) {
             fail();
         }
     }
 
+    private SyncStorageCollectionRequest performOnFetchCompleted(String offsetHeader, String recordsHeader, long batchLimit) throws URISyntaxException {
+        SyncStorageResponse response = makeSyncStorageResponse(200,  DEFAULT_LMHEADER, offsetHeader, recordsHeader);
+        SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+                batchLimit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        return request;
+    }
+
     private void assertSameParameters(MockDownloader mockDownloader, long limit) {
         assertEquals(DEFAULT_NEWER, mockDownloader.newer);
         assertEquals(limit, mockDownloader.limit);
         assertTrue(mockDownloader.full);
         assertEquals(DEFAULT_SORT, mockDownloader.sort);
         assertEquals(DEFAULT_IDS, mockDownloader.ids);
     }
 
@@ -419,9 +594,21 @@ public class BatchingDownloaderTest {
         }
 
         if (records != null) {
             response.addHeader(SyncResponse.X_WEAVE_RECORDS, records);
         }
 
         return new SyncStorageResponse(response);
     }
+
+    private void ensureOffsetContextIsNull(CountingShadowRepositoryState stateProvider) {
+        assertNull(stateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+        assertNull(stateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET_ORDER));
+        assertNull(stateProvider.getShadowedLong(RepositoryStateProvider.KEY_OFFSET_SINCE));
+    }
+
+    private void ensureOffsetContextIs(CountingShadowRepositoryState stateProvider, String offset, String order, Long since) {
+        assertEquals(offset, stateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+        assertEquals(since, stateProvider.getShadowedLong(RepositoryStateProvider.KEY_OFFSET_SINCE));
+        assertEquals(order, stateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET_ORDER));
+    }
 }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -15,16 +15,17 @@ import org.junit.runner.RunWith;
 import org.mozilla.gecko.background.testhelpers.MockRecord;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
 import org.mozilla.gecko.sync.ExtendedJSONObject;
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
 
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -589,17 +590,18 @@ public class BatchingUploaderTest {
 
         try {
             return new Server15Repository(
                     "dummyCollection",
                     SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
                     "http://dummy.url/",
                     null,
                     infoCollections,
-                    infoConfiguration
+                    infoConfiguration,
+                    new NonPersistentRepositoryStateProvider()
             );
         } catch (URISyntaxException e) {
             // Won't throw, and this won't happen.
             return null;
         }
     }
 
     static abstract class TestRunnableWithTarget<T> {