Bug 1291821 - Allow BatchingDownloader to resume downloads using offset or high water mark r=rnewman draft
authorGrisha Kruglov <gkruglov@mozilla.com>
Tue, 29 Nov 2016 21:53:03 -0800
changeset 445752 7fb9f3be33549ee0f78ed0661e280702a2afb792
parent 445751 c49abe8e93423cd3c52de26a6cb8ab3419e0aed0
child 445753 2c28e52a991f039f32dbaa1d63c507e285022381
push id37599
push usergkruglov@mozilla.com
push dateWed, 30 Nov 2016 06:33:43 +0000
reviewersrnewman
bugs1291821
milestone53.0a1
Bug 1291821 - Allow BatchingDownloader to resume downloads using offset or high water mark r=rnewman BatchingDownloader uses provided RepositoryStateProvider instance in order to track offset and high water mark as it performs batching. These objects are initialized by individual ServerSyncStages, and prefixes are used to ensure keys won't clash. Two RepositoryStateProvider implementations are used: persistent and non-persistent. Non-persistent state provider does not allow for resuming after a sync restart, while persistent one does. Persistent state provider is used by history stage. It is fetched oldest-first, and records are applied to live storage as they're downloaded. These conditions let use resume downloads. It's also possible to resume downloads for stages which use a persistent buffer, but currently we don't have any. Offset value is reset if we hit a 412 error; it is maintained if we hit a sync deadline, allowing us to minimize number of records we'll redownload. High water mark is maintained across syncs and used instead of stage's "last-synced" timestamp. MozReview-Commit-ID: IH28YrDU4vW
mobile/android/base/android-services.mozbuild
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConfigurableServer15Repository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/NonPersistentRepositoryStateProvider.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/PersistentRepositoryStateProvider.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositoryStateProvider.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15Repository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15RepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserRecentHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer15Repository.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -1004,25 +1004,28 @@ sync_java_files = [TOPSRCDIR + '/mobile/
     'sync/repositories/IdentityRecordFactory.java',
     'sync/repositories/InactiveSessionException.java',
     'sync/repositories/InvalidBookmarkTypeException.java',
     'sync/repositories/InvalidRequestException.java',
     'sync/repositories/InvalidSessionTransitionException.java',
     'sync/repositories/MultipleRecordsForGuidException.java',
     'sync/repositories/NoContentProviderException.java',
     'sync/repositories/NoGuidForIdException.java',
+    'sync/repositories/NonPersistentRepositoryStateProvider.java',
     'sync/repositories/NoStoreDelegateException.java',
     'sync/repositories/NullCursorException.java',
     'sync/repositories/ParentNotFoundException.java',
+    'sync/repositories/PersistentRepositoryStateProvider.java',
     'sync/repositories/ProfileDatabaseException.java',
     'sync/repositories/RecordFactory.java',
     'sync/repositories/RecordFilter.java',
     'sync/repositories/Repository.java',
     'sync/repositories/RepositorySession.java',
     'sync/repositories/RepositorySessionBundle.java',
+    'sync/repositories/RepositoryStateProvider.java',
     'sync/repositories/Server15Repository.java',
     'sync/repositories/Server15RepositorySession.java',
     'sync/repositories/StoreFailedException.java',
     'sync/repositories/StoreTracker.java',
     'sync/repositories/StoreTrackingRepositorySession.java',
     'sync/repositories/uploaders/BatchingUploader.java',
     'sync/repositories/uploaders/BatchMeta.java',
     'sync/repositories/uploaders/BufferSizeTracker.java',
@@ -1051,17 +1054,16 @@ sync_java_files = [TOPSRCDIR + '/mobile/
     'sync/stage/FetchMetaGlobalStage.java',
     'sync/stage/FormHistoryServerSyncStage.java',
     'sync/stage/GlobalSyncStage.java',
     'sync/stage/NoSuchStageException.java',
     'sync/stage/PasswordsServerSyncStage.java',
     'sync/stage/ServerSyncStage.java',
     'sync/stage/SyncClientsEngineStage.java',
     'sync/stage/UploadMetaGlobalStage.java',
-    'sync/Sync11Configuration.java',
     'sync/SyncConfiguration.java',
     'sync/SyncConfigurationException.java',
     'sync/SyncConstants.java',
     'sync/SyncDeadlineReachedException.java',
     'sync/SyncException.java',
     'sync/synchronizer/ConcurrentRecordConsumer.java',
     'sync/synchronizer/RecordConsumer.java',
     'sync/synchronizer/RecordsChannel.java',
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
@@ -152,21 +152,16 @@ import java.util.concurrent.Executors;
     }
 
     @Override
     public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
         inner.setStoreDelegate(delegate);
         this.storeDelegate = delegate;
     }
 
-    @Override
-    public long getHighWaterMarkTimestamp() {
-        return bufferStorage.latestModifiedTimestamp();
-    }
-
     private boolean mayProceedToMergeBuffer() {
         // While actual runtime of a merge operation is a function of record type, buffer size, etc.,
         // let's do a simple thing for now and say that we may proceed if we have couple of minutes
         // of runtime left. That surely is enough, right?
         final long timeLeftMillis = syncDeadlineMillis - SystemClock.elapsedRealtime();
         return timeLeftMillis > 1000 * 60 * 2;
     }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
@@ -21,13 +21,9 @@ public interface BufferStorage {
     // what's already present in the storage layer.
     // NB: For a database-backed storage, "replace" happens at a transaction level.
     void addOrReplace(Record record);
 
     // For database-backed implementations, commits any records that came in up to this point.
     void flush();
 
     void clear();
-
-    // For buffers that are filled up oldest-first this is a high water mark, which enables resuming
-    // a sync.
-    long latestModifiedTimestamp();
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
@@ -38,28 +38,9 @@ public class MemoryBufferStorage impleme
     public void flush() {
         // This is a no-op; flush intended for database-backed stores.
     }
 
     @Override
     public void clear() {
         recordBuffer.clear();
     }
-
-    @Override
-    public long latestModifiedTimestamp() {
-        long lastModified = 0;
-
-        synchronized (recordBuffer) {
-            if (recordBuffer.size() == 0) {
-                return lastModified;
-            }
-
-            for (Record record : recordBuffer.values()) {
-                if (record.lastModified > lastModified) {
-                    lastModified = record.lastModified;
-                }
-            }
-        }
-
-        return lastModified;
-    }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConfigurableServer15Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/ConfigurableServer15Repository.java
@@ -26,24 +26,26 @@ public class ConfigurableServer15Reposit
           String collection,
           long syncDeadline,
           String storageURL,
           AuthHeaderProvider authHeaderProvider,
           InfoCollections infoCollections,
           InfoConfiguration infoConfiguration,
           long batchLimit,
           String sort,
-          boolean allowMultipleBatches) throws URISyntaxException {
+          boolean allowMultipleBatches,
+          RepositoryStateProvider stateProvider) throws URISyntaxException {
     super(
             collection,
             syncDeadline,
             storageURL,
             authHeaderProvider,
             infoCollections,
-            infoConfiguration
+            infoConfiguration,
+            stateProvider
     );
     this.batchLimit = batchLimit;
     this.sortOrder  = sort;
     this.allowMultipleBatches = allowMultipleBatches;
   }
 
   @Override
   public String getSortOrder() {
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/NonPersistentRepositoryStateProvider.java
@@ -0,0 +1,60 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories;
+
+import android.support.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple non-persistent implementation of a repository state provider.
+ *
+ * Just like in the persistent implementation, changes to values are visible only after a commit.
+ *
+ * @author grisha
+ */
+public class NonPersistentRepositoryStateProvider implements RepositoryStateProvider {
+    // We'll have at least OFFSET and H.W.M. values set.
+    private final int INITIAL_CAPACITY = 2;
+    private final Map<String, Object> nonCommittedValuesMap = Collections.synchronizedMap(new HashMap<String, Object>(INITIAL_CAPACITY));
+
+    // NB: Any changes are made by creating a new map instead of altering existing one.
+    private volatile Map<String, Object> committedValuesMap = new HashMap<>(INITIAL_CAPACITY);
+
+    @Override
+    public void commit() {
+        // No-op for non-persistent repository.
+        committedValuesMap = new HashMap<>(nonCommittedValuesMap);
+    }
+
+    @Override
+    public void clear(String key) {
+        nonCommittedValuesMap.remove(key);
+    }
+
+    @Override
+    public void setString(String key, String value) {
+        nonCommittedValuesMap.put(key, value);
+    }
+
+    @Nullable
+    @Override
+    public String getString(String key) {
+        return (String) committedValuesMap.get(key);
+    }
+
+    @Override
+    public void setLong(String key, Long value) {
+        nonCommittedValuesMap.put(key, value);
+    }
+
+    @Nullable
+    @Override
+    public Long getLong(String key) {
+        return (Long) committedValuesMap.get(key);
+    }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/PersistentRepositoryStateProvider.java
@@ -0,0 +1,63 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories;
+
+import android.support.annotation.Nullable;
+
+import org.mozilla.gecko.background.common.PrefsBranch;
+
+/**
+ * Simple persistent implementation of a repository state provider.
+ * Uses provided PrefsBranch object in order to persist values.
+ *
+ * Values must be committed before they become visible via getters.
+ *
+ * @author grisha
+ */
+public class PersistentRepositoryStateProvider implements RepositoryStateProvider {
+    private final PrefsBranch prefs;
+
+    private final PrefsBranch.Editor editor;
+
+    public PersistentRepositoryStateProvider(PrefsBranch prefs) {
+        this.prefs = prefs;
+        this.editor = prefs.edit();
+    }
+
+    @Override
+    public void commit() {
+        this.editor.apply();
+    }
+
+    @Override
+    public void clear(String key) {
+        this.editor.remove(key);
+    }
+
+    @Override
+    public void setString(String key, String value) {
+        this.editor.putString(key, value);
+    }
+
+    @Nullable
+    @Override
+    public String getString(String key) {
+        return this.prefs.getString(key, null);
+    }
+
+    @Override
+    public void setLong(String key, Long value) {
+        this.editor.putLong(key, value);
+    }
+
+    @Nullable
+    @Override
+    public Long getLong(String key) {
+        if (!this.prefs.contains(key)) {
+            return null;
+        }
+        return this.prefs.getLong(key, 0);
+    }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
@@ -71,21 +71,16 @@ public abstract class RepositorySession 
 
   // The time that the last sync on this collection completed, in milliseconds since epoch.
   private long lastSyncTimestamp = 0;
 
   public long getLastSyncTimestamp() {
     return lastSyncTimestamp;
   }
 
-  // Override this in the buffering wrappers.
-  public long getHighWaterMarkTimestamp() {
-    return 0;
-  }
-
   public static long now() {
     return System.currentTimeMillis();
   }
 
   public RepositorySession(Repository repository) {
     this.repository = repository;
   }
 
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositoryStateProvider.java
@@ -0,0 +1,33 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.repositories;
+
+import android.support.annotation.Nullable;
+
+/**
+ * Interface describing a repository state provider.
+ * Repository's state might consist of a number of key-value pairs.
+ *
+ * Currently there are two types of implementations: persistent and non-persistent state.
+ * Persistent state survives between syncs, and is currently used by the BatchingDownloader to
+ * resume downloads in case of interruptions. Non-persistent state is used when resuming downloads
+ * is not possible.
+ *
+ * @author grisha
+ */
+public interface RepositoryStateProvider {
+    String KEY_OFFSET = "offset";
+    String KEY_HIGH_WATER_MARK = "highWaterMark";
+
+    void commit();
+
+    void clear(String key);
+
+    void setString(String key, String value);
+    @Nullable String getString(String key);
+
+    void setLong(String key, Long value);
+    @Nullable Long getLong(String key);
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15Repository.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15Repository.java
@@ -26,16 +26,18 @@ public class Server15Repository extends 
   public final AuthHeaderProvider authHeaderProvider;
 
   private final long syncDeadlineMillis;
   /* package-local */ final URI collectionURI;
 
   protected final String collection;
   protected final InfoCollections infoCollections;
 
+  protected RepositoryStateProvider stateProvider;
+
   private final InfoConfiguration infoConfiguration;
   private final static String DEFAULT_SORT_ORDER = "oldest";
   private final static long DEFAULT_BATCH_LIMIT = 100;
 
   /**
    * Construct a new repository that fetches and stores against the Sync 1.5 API.
    *
    * @param collection name.
@@ -45,32 +47,34 @@ public class Server15Repository extends 
    * @throws URISyntaxException
    */
   public Server15Repository(
           @NonNull String collection,
           long syncDeadlineMillis,
           @NonNull String storageURL,
           AuthHeaderProvider authHeaderProvider,
           @NonNull InfoCollections infoCollections,
-          @NonNull InfoConfiguration infoConfiguration) throws URISyntaxException {
+          @NonNull InfoConfiguration infoConfiguration,
+          @NonNull RepositoryStateProvider stateProvider) throws URISyntaxException {
     if (collection == null) {
       throw new IllegalArgumentException("collection must not be null");
     }
     if (storageURL == null) {
       throw new IllegalArgumentException("storageURL must not be null");
     }
     if (infoCollections == null) {
       throw new IllegalArgumentException("infoCollections must not be null");
     }
     this.collection = collection;
     this.syncDeadlineMillis = syncDeadlineMillis;
     this.collectionURI = new URI(storageURL + (storageURL.endsWith("/") ? collection : "/" + collection));
     this.authHeaderProvider = authHeaderProvider;
     this.infoCollections = infoCollections;
     this.infoConfiguration = infoConfiguration;
+    this.stateProvider = stateProvider;
   }
 
   @Override
   public void createSession(RepositorySessionCreationDelegate delegate,
                             Context context) {
     delegate.onSessionCreated(new Server15RepositorySession(this));
   }
 
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server15RepositorySession.java
@@ -20,17 +20,23 @@ public class Server15RepositorySession e
 
   protected final Server15Repository serverRepository;
   private BatchingUploader uploader;
   private final BatchingDownloader downloader;
 
   public Server15RepositorySession(Repository repository) {
     super(repository);
     this.serverRepository = (Server15Repository) repository;
-    this.downloader = initializeDownloader(this);
+    this.downloader = new BatchingDownloader(
+            this.serverRepository.authHeaderProvider,
+            Uri.parse(this.serverRepository.collectionURI().toString()),
+            this.serverRepository.getSyncDeadline(),
+            this.serverRepository.getAllowMultipleBatches(),
+            this.serverRepository.stateProvider,
+            this);
   }
 
   @Override
   public void setStoreDelegate(RepositorySessionStoreDelegate storeDelegate) {
     super.setStoreDelegate(storeDelegate);
 
     // Now that we have the delegate, we can initialize our uploader.
     this.uploader = new BatchingUploader(
@@ -102,18 +108,9 @@ public class Server15RepositorySession e
 
     uploader.noMoreRecordsToUpload();
   }
 
   @Override
   public boolean dataAvailable() {
     return serverRepository.updateNeeded(getLastSyncTimestamp());
   }
-
-  protected static BatchingDownloader initializeDownloader(final Server15RepositorySession serverRepositorySession) {
-    return new BatchingDownloader(
-            serverRepositorySession.serverRepository.authHeaderProvider,
-            Uri.parse(serverRepositorySession.serverRepository.collectionURI().toString()),
-            serverRepositorySession.serverRepository.getSyncDeadline(),
-            serverRepositorySession.serverRepository.getAllowMultipleBatches(),
-            serverRepositorySession);
-  }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
@@ -15,16 +15,17 @@ import org.mozilla.gecko.sync.CryptoReco
 import org.mozilla.gecko.sync.DelayedWorkTracker;
 import org.mozilla.gecko.sync.SyncDeadlineReachedException;
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -57,34 +58,38 @@ public class BatchingDownloader {
     private static final String DEFAULT_SORT_ORDER = "index";
 
     private final RepositorySession repositorySession;
     private final DelayedWorkTracker workTracker = new DelayedWorkTracker();
     private final Uri baseCollectionUri;
     private final long fetchDeadline;
     private final boolean allowMultipleBatches;
 
+    private RepositoryStateProvider stateProvider;
+
     /* package-local */ final AuthHeaderProvider authHeaderProvider;
 
     // Used to track outstanding requests, so that we can abort them as needed.
     @VisibleForTesting
     protected final Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
     /* @GuardedBy("this") */ private String lastModified;
 
     public BatchingDownloader(
             AuthHeaderProvider authHeaderProvider,
             Uri baseCollectionUri,
             long fetchDeadline,
             boolean allowMultipleBatches,
+            RepositoryStateProvider stateProvider,
             RepositorySession repositorySession) {
         this.repositorySession = repositorySession;
         this.authHeaderProvider = authHeaderProvider;
         this.baseCollectionUri = baseCollectionUri;
         this.allowMultipleBatches = allowMultipleBatches;
         this.fetchDeadline = fetchDeadline;
+        this.stateProvider = stateProvider;
     }
 
     @VisibleForTesting
     protected static String flattenIDs(String[] guids) {
         // Consider using Utils.toDelimitedString if and when the signature changes
         // to Collection<String> guids.
         if (guids.length == 0) {
             return "";
@@ -125,18 +130,37 @@ public class BatchingDownloader {
                                                   String offset)
             throws URISyntaxException, UnsupportedEncodingException {
         final URI collectionURI = buildCollectionURI(baseCollectionUri, full, newer, batchLimit, sort, ids, offset);
         Logger.debug(LOG_TAG, collectionURI.toString());
 
         return new SyncStorageCollectionRequest(collectionURI);
     }
 
-    public void fetchSince(RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, long timestamp, long batchLimit, String sortOrder) {
-        this.fetchSince(fetchRecordsDelegate, timestamp, batchLimit, sortOrder, null);
+    public void fetchSince(RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, long sinceTimestamp, long batchLimit, String sortOrder) {
+        // If stateProvider is non-persistent, these will always be null at the start.
+        // If stateProvider is persistent, these might still be null, if they haven't been set during
+        // a previous sync.
+        final Long highWaterMark = stateProvider.getLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK);
+        final String offset = stateProvider.getString(RepositoryStateProvider.KEY_OFFSET);
+
+        // We are trying to eliminate need to re-download records if we already have them.
+        // If offset is available, use it to skip over records that were already provided to us.
+        if (offset != null) {
+            this.fetchSince(fetchRecordsDelegate, sinceTimestamp, batchLimit, sortOrder, offset);
+
+        // If offset is not available, try to fallback on highWaterMark to skip most of the records
+        // we already have. We always fetch from our current highWaterMark if sortOrder is 'oldest'.
+        } else if (highWaterMark != null && sortOrder.equals("oldest")) {
+            this.fetchSince(fetchRecordsDelegate, highWaterMark, batchLimit, sortOrder, null);
+
+        // Fallback on the provided timestamp if all fails.
+        } else {
+            this.fetchSince(fetchRecordsDelegate, sinceTimestamp, batchLimit, sortOrder, null);
+        }
     }
 
     @VisibleForTesting
     public void fetchSince(RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, long timestamp, long batchLimit, String sortOrder, String offset) {
         try {
             SyncStorageCollectionRequest request = makeSyncStorageCollectionRequest(timestamp,
                     batchLimit, true, sortOrder, null, offset);
             this.fetchWithParameters(timestamp, batchLimit, true, sortOrder, null, request, fetchRecordsDelegate);
@@ -194,26 +218,31 @@ public class BatchingDownloader {
         }
 
         // If we can (or must) stop batching at this point, let the delegate know that we're all done!
         final String offset = response.weaveOffset();
         if (offset == null || !allowMultipleBatches) {
             final long normalizedTimestamp = response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED);
             Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp);
 
+            this.stateProvider.clear(RepositoryStateProvider.KEY_OFFSET);
+            this.stateProvider.commit();
+
             this.workTracker.delayWorkItem(new Runnable() {
                 @Override
                 public void run() {
                     Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
                     fetchRecordsDelegate.onFetchCompleted(normalizedTimestamp);
                 }
             });
             return;
         }
 
+        this.stateProvider.setString(RepositoryStateProvider.KEY_OFFSET, offset);
+
         // We need to make another batching request!
         // Let the delegate know that a batch fetch just completed before we proceed.
         // This operation needs to run after every call to onFetchedRecord for this batch has been
         // processed, hence the delayWorkItem call.
         this.workTracker.delayWorkItem(new Runnable() {
             @Override
             public void run() {
                 Logger.debug(LOG_TAG, "Running onBatchCompleted.");
@@ -228,16 +257,17 @@ public class BatchingDownloader {
         }
 
         // Create and execute new batch request.
         try {
             final SyncStorageCollectionRequest newRequest = makeSyncStorageCollectionRequest(newer,
                     limit, full, sort, ids, offset);
             this.fetchWithParameters(newer, limit, full, sort, ids, newRequest, fetchRecordsDelegate);
         } catch (final URISyntaxException | UnsupportedEncodingException e) {
+            this.stateProvider.commit();
             this.workTracker.delayWorkItem(new Runnable() {
                 @Override
                 public void run() {
                     Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
                     fetchRecordsDelegate.onFetchFailed(e);
                 }
             });
         }
@@ -248,30 +278,41 @@ public class BatchingDownloader {
         handleFetchFailed(fetchRecordsDelegate, ex, null);
     }
 
     /* package-local */ void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
                               final Exception ex,
                               @Nullable final SyncStorageCollectionRequest request) {
         this.removeRequestFromPending(request);
         this.abortRequests();
+
+        // We might fail due to reaching a deadline, in which case we need to keep the offset so
+        // that we can hopefully resume our download where we left off.
+        if (!(ex instanceof SyncDeadlineReachedException)) {
+            this.stateProvider.clear(RepositoryStateProvider.KEY_OFFSET);
+        }
+
+        this.stateProvider.commit();
+
         this.workTracker.delayWorkItem(new Runnable() {
             @Override
             public void run() {
                 Logger.debug(LOG_TAG, "Running onFetchFailed.");
                 fetchRecordsDelegate.onFetchFailed(ex);
             }
         });
     }
 
     public void onFetchedRecord(CryptoRecord record,
                                 RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
         this.workTracker.incrementOutstanding();
+
         try {
             fetchRecordsDelegate.onFetchedRecord(record);
+            this.stateProvider.setLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK, record.lastModified);
         } catch (Exception ex) {
             Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
             throw new RuntimeException(ex);
         } finally {
             this.workTracker.decrementOutstanding();
         }
     }
 
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
@@ -5,16 +5,17 @@
 package org.mozilla.gecko.sync.stage;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.MetaGlobalException;
 import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
 import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.AndroidBrowserBookmarksRepository;
 import org.mozilla.gecko.sync.repositories.domain.BookmarkRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class AndroidBrowserBookmarksServerSyncStage extends ServerSyncStage {
   protected static final String LOG_TAG = "BookmarksStage";
@@ -45,17 +46,18 @@ public class AndroidBrowserBookmarksServ
             getCollection(),
             session.getSyncDeadline(),
             session.config.storageURL(),
             session.getAuthHeaderProvider(),
             session.config.infoCollections,
             session.config.infoConfiguration,
             BOOKMARKS_BATCH_LIMIT,
             BOOKMARKS_SORT,
-            true /* allow multiple batches */);
+            true /* allow multiple batches */,
+            new NonPersistentRepositoryStateProvider());
   }
 
   @Override
   protected Repository getLocalRepository() {
     return new BufferingMiddlewareRepository(
             session.getSyncDeadline(),
             new MemoryBufferStorage(),
             new AndroidBrowserBookmarksRepository()
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
@@ -5,16 +5,17 @@
 package org.mozilla.gecko.sync.stage;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.MetaGlobalException;
 import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
 import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
+import org.mozilla.gecko.sync.repositories.PersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.AndroidBrowserHistoryRepository;
 import org.mozilla.gecko.sync.repositories.domain.HistoryRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class AndroidBrowserHistoryServerSyncStage extends ServerSyncStage {
   protected static final String LOG_TAG = "HistoryStage";
@@ -54,17 +55,19 @@ public class AndroidBrowserHistoryServer
             getCollection(),
             session.getSyncDeadline(),
             session.config.storageURL(),
             session.getAuthHeaderProvider(),
             session.config.infoCollections,
             session.config.infoConfiguration,
             HISTORY_BATCH_LIMIT,
             HISTORY_SORT,
-            true /* allow multiple batches */);
+            true /* allow multiple batches */,
+            new PersistentRepositoryStateProvider(session.config.getBranch(getCollection() + ".state."))
+    );
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new HistoryRecordFactory();
   }
 
   @Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserRecentHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserRecentHistoryServerSyncStage.java
@@ -3,16 +3,17 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
 import org.mozilla.gecko.sync.MetaGlobalException;
 import org.mozilla.gecko.sync.NonObjectJSONException;
 import org.mozilla.gecko.sync.SynchronizerConfiguration;
 import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.Repository;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
 
 /**
  * History sync stage which is limited to just recent history, and will only run if the full history
  * sync stage did not complete yet. Its purpose is to give users with a lot of history in their
@@ -42,17 +43,18 @@ public class AndroidBrowserRecentHistory
                 getCollection(),
                 session.getSyncDeadline(),
                 session.config.storageURL(),
                 session.getAuthHeaderProvider(),
                 session.config.infoCollections,
                 session.config.infoConfiguration,
                 HISTORY_BATCH_LIMIT,
                 HISTORY_SORT,
-                false /* force single batch only */);
+                false /* force single batch only */,
+                new NonPersistentRepositoryStateProvider());
     }
 
     /**
      * This stage is only enabled if full history session is enabled and did not complete a sync yet.
      */
     @Override
     public boolean isEnabled() throws MetaGlobalException {
         final boolean historyStageEnabled = super.isEnabled();
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
@@ -5,16 +5,17 @@
 package org.mozilla.gecko.sync.stage;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.CryptoRecord;
 import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
 import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.FormHistoryRepositorySession;
 import org.mozilla.gecko.sync.repositories.domain.FormHistoryRecord;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class FormHistoryServerSyncStage extends ServerSyncStage {
@@ -46,17 +47,18 @@ public class FormHistoryServerSyncStage 
             collection,
             session.getSyncDeadline(),
             session.config.storageURL(),
             session.getAuthHeaderProvider(),
             session.config.infoCollections,
             session.config.infoConfiguration,
             FORM_HISTORY_BATCH_LIMIT,
             FORM_HISTORY_SORT,
-            true /* allow multiple batches */);
+            true /* allow multiple batches */,
+            new NonPersistentRepositoryStateProvider());
   }
 
   @Override
   protected Repository getLocalRepository() {
     return new BufferingMiddlewareRepository(
             session.getSyncDeadline(),
             new MemoryBufferStorage(),
             new FormHistoryRepositorySession.FormHistoryRepository()
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java
@@ -22,16 +22,17 @@ import org.mozilla.gecko.sync.delegates.
 import org.mozilla.gecko.sync.middleware.Crypto5MiddlewareRepository;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.BaseResource;
 import org.mozilla.gecko.sync.net.SyncStorageRequest;
 import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.repositories.InactiveSessionException;
 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
@@ -144,17 +145,18 @@ public abstract class ServerSyncStage ex
   // Override this in subclasses.
   protected Repository getRemoteRepository() throws URISyntaxException {
     String collection = getCollection();
     return new Server15Repository(collection,
                                   session.getSyncDeadline(),
                                   session.config.storageURL(),
                                   session.getAuthHeaderProvider(),
                                   session.config.infoCollections,
-                                  session.config.infoConfiguration);
+                                  session.config.infoConfiguration,
+                                  new NonPersistentRepositoryStateProvider());
   }
 
   /**
    * Return a Crypto5Middleware-wrapped Server15Repository.
    *
    * @throws NoCollectionKeysSetException
    * @throws URISyntaxException
    */
@@ -165,16 +167,22 @@ public abstract class ServerSyncStage ex
     cryptoRepo.recordFactory = getRecordFactory();
     return cryptoRepo;
   }
 
   protected String bundlePrefix() {
     return this.getCollection() + ".";
   }
 
+  // TODO is this the right place for this??
+  // The right prefix even?
+  protected String statePrefix() {
+    return this.getCollection() + ".state.";
+  }
+
   protected SynchronizerConfiguration getConfig() throws NonObjectJSONException, IOException {
     return new SynchronizerConfiguration(session.config.getBranch(bundlePrefix()));
   }
 
   protected void persistConfig(SynchronizerConfiguration synchronizerConfiguration) {
     synchronizerConfiguration.persist(session.config.getBranch(bundlePrefix()));
   }
 
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -1,16 +1,17 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.synchronizer;
 
 import android.support.annotation.NonNull;
 import android.support.annotation.Nullable;
+import android.support.annotation.VisibleForTesting;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.mozilla.gecko.background.common.log.Logger;
 import org.mozilla.gecko.sync.ReflowIsNecessaryException;
 import org.mozilla.gecko.sync.ThreadPool;
@@ -171,26 +172,17 @@ public class RecordsChannel implements
     numFetched.set(0);
     numFetchFailed.set(0);
     numStored.set(0);
     numStoreFailed.set(0);
     // Start a consumer thread.
     this.consumer = new ConcurrentRecordConsumer(this);
     ThreadPool.run(this.consumer);
     waitingForQueueDone = true;
-
-    // Fetch all records that were modified since our previous flow. If our previous flow succeeded,
-    // we will use source's last-sync timestamp. If our previous flow didn't complete, resume it,
-    // starting from sink's high water mark timestamp.
-    // If there was no previous flow (first sync, or data was cleared...), fetch everything.
-    // Resuming a flow is supported for buffered RepositorySessions. We degrade gracefully otherwise.
-    final long highWaterMark = sink.getHighWaterMarkTimestamp();
-    final long lastSync = source.getLastSyncTimestamp();
-    final long sinceTimestamp = Math.max(highWaterMark, lastSync);
-    source.fetchSince(sinceTimestamp, this);
+    source.fetchSince(source.getLastSyncTimestamp(), this);
   }
 
   /**
    * Begin both sessions, invoking flow() when done.
    * @throws InvalidSessionTransitionException
    */
   public void beginAndFlow() throws InvalidSessionTransitionException {
     Logger.trace(LOG_TAG, "Beginning source.");
@@ -210,19 +202,19 @@ public class RecordsChannel implements
 
   @Override
   public void onFetchFailed(Exception ex) {
     Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
     numFetchFailed.incrementAndGet();
     if (ex instanceof ReflowIsNecessaryException) {
       setReflowException((ReflowIsNecessaryException) ex);
     }
+    delegate.onFlowFetchFailed(this, ex);
     // Sink will be informed once consumer finishes.
     this.consumer.halt();
-    delegate.onFlowFetchFailed(this, ex);
   }
 
   @Override
   public void onFetchedRecord(Record record) {
     numFetched.incrementAndGet();
     this.toProcess.add(record);
     this.consumer.doNotify();
   }
@@ -313,16 +305,17 @@ public class RecordsChannel implements
 
     // Prevent "once consumer is done..." actions from taking place. They already have (case 2), or
     // we don't need them (case 1).
     waitingForQueueDone = false;
 
     // If consumer is still going at it, tell it to stop.
     this.consumer.halt();
 
+    delegate.onFlowStoreFailed(this, ex, null);
     delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis());
   }
 
   @Override
   public void onBeginFailed(Exception ex) {
     delegate.onFlowBeginFailed(this, ex);
   }
 
@@ -358,17 +351,18 @@ public class RecordsChannel implements
 
   @Override
   public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
     // Lie outright. We know that all of our fetch methods are safe.
     return this;
   }
 
   @Nullable
-  /* package-local */ synchronized ReflowIsNecessaryException getReflowException() {
+  @VisibleForTesting
+  public synchronized ReflowIsNecessaryException getReflowException() {
     return reflowException;
   }
 
   private synchronized void setReflowException(@NonNull ReflowIsNecessaryException e) {
     // It is a mistake to set reflow exception multiple times.
     if (reflowException != null) {
       throw new IllegalStateException("Reflow exception already set: " + reflowException);
     }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer15Repository.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/net/test/TestServer15Repository.java
@@ -6,36 +6,39 @@ package org.mozilla.android.sync.net.tes
 import android.os.SystemClock;
 
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.concurrent.TimeUnit;
 
 @RunWith(TestRunner.class)
 public class TestServer15Repository {
 
   private static final String COLLECTION = "bookmarks";
   private static final String COLLECTION_URL = "http://foo.com/1.5/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage";
   private static final long SYNC_DEADLINE = SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30);
 
   protected final InfoCollections infoCollections = new InfoCollections();
   protected final InfoConfiguration infoConfiguration = new InfoConfiguration();
+  protected final RepositoryStateProvider stateProvider = new NonPersistentRepositoryStateProvider();
 
   public static void assertQueryEquals(String expected, URI u) {
     Assert.assertEquals(expected, u.getRawQuery());
   }
 
   @Test
   public void testCollectionURI() throws URISyntaxException {
-    Server15Repository noTrailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL, null, infoCollections, infoConfiguration);
-    Server15Repository trailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL + "/", null, infoCollections, infoConfiguration);
+    Server15Repository noTrailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL, null, infoCollections, infoConfiguration, stateProvider);
+    Server15Repository trailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL + "/", null, infoCollections, infoConfiguration, stateProvider);
     Assert.assertEquals("http://foo.com/1.5/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage/bookmarks", noTrailingSlash.collectionURI().toASCIIString());
     Assert.assertEquals("http://foo.com/1.5/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage/bookmarks", trailingSlash.collectionURI().toASCIIString());
   }
 }
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
@@ -60,23 +60,21 @@ public class BufferingMiddlewareReposito
 
         MockRecord record = new MockRecord("guid1", null, 1, false);
         bufferingSession.store(record);
         assertEquals(1, bufferStorage.all().size());
 
         MockRecord record1 = new MockRecord("guid2", null, 1, false);
         bufferingSession.store(record1);
         assertEquals(2, bufferStorage.all().size());
-        assertEquals(1, bufferStorage.latestModifiedTimestamp());
 
         // record2 must replace record.
         MockRecord record2 = new MockRecord("guid1", null, 2, false);
         bufferingSession.store(record2);
         assertEquals(2, bufferStorage.all().size());
-        assertEquals(2, bufferStorage.latestModifiedTimestamp());
 
         // Ensure inner session doesn't see incoming records.
         verify(innerRepositorySession, never()).store(record);
         verify(innerRepositorySession, never()).store(record1);
         verify(innerRepositorySession, never()).store(record2);
     }
 
     @Test
@@ -172,32 +170,9 @@ public class BufferingMiddlewareReposito
     }
 
     @Test
     public void setStoreDelegate() throws Exception {
         RepositorySessionStoreDelegate delegate = mock(RepositorySessionStoreDelegate.class);
         bufferingSession.setStoreDelegate(delegate);
         verify(innerRepositorySession).setStoreDelegate(delegate);
     }
-
-    @Test
-    public void getHighWaterMarkTimestamp() throws Exception {
-        // Trivial case, empty buffer.
-        assertEquals(0, bufferingSession.getHighWaterMarkTimestamp());
-
-        MockRecord record = new MockRecord("guid1", null, 1, false);
-        bufferingSession.store(record);
-        assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
-
-        MockRecord record3 = new MockRecord("guid3", null, 5, false);
-        bufferingSession.store(record3);
-        assertEquals(5, bufferingSession.getHighWaterMarkTimestamp());
-
-        // NB: same guid as above.
-        MockRecord record4 = new MockRecord("guid3", null, -1, false);
-        bufferingSession.store(record4);
-        assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
-
-        MockRecord record2 = new MockRecord("guid2", null, 13, false);
-        bufferingSession.store(record2);
-        assertEquals(13, bufferingSession.getHighWaterMarkTimestamp());
-    }
 }
\ No newline at end of file
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java
@@ -14,16 +14,17 @@ import org.mozilla.gecko.background.test
 import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
 import org.mozilla.gecko.sync.CryptoRecord;
 import org.mozilla.gecko.sync.HTTPFailureException;
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
 import java.net.URI;
 import java.util.concurrent.ExecutorService;
@@ -49,16 +50,17 @@ public class BatchingDownloaderDelegateT
         public Exception ex;
 
         public MockDownloader(RepositorySession repositorySession) {
             super(
                     null,
                     Uri.EMPTY,
                     SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
                     true,
+                    new NonPersistentRepositoryStateProvider(),
                     repositorySession
             );
         }
 
         @Override
         public void onFetchCompleted(SyncStorageResponse response,
                                      final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
                                      final SyncStorageCollectionRequest request,
@@ -111,28 +113,30 @@ public class BatchingDownloaderDelegateT
     @Before
     public void setUp() throws Exception {
         repositorySession = new Server15RepositorySession(new Server15Repository(
                 "dummyCollection",
                 SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
                 DEFAULT_COLLECTION_URL,
                 null,
                 new InfoCollections(),
-                new InfoConfiguration())
+                new InfoConfiguration(),
+                new NonPersistentRepositoryStateProvider())
         );
         mockDownloader = new MockDownloader(repositorySession);
     }
 
     @Test
     public void testIfUnmodifiedSince() throws Exception {
         BatchingDownloader downloader = new BatchingDownloader(
                 null,
                 Uri.EMPTY,
                 SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
                 true,
+                new NonPersistentRepositoryStateProvider(),
                 repositorySession
         );
         RepositorySessionFetchRecordsDelegate delegate = new SimpleSessionFetchRecordsDelegate();
         BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(downloader, delegate,
                 new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
         String lastModified = "12345678";
         SyncStorageResponse response = makeSyncStorageResponse(200, lastModified);
         downloaderDelegate.handleRequestSuccess(response);
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java
@@ -2,40 +2,49 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.repositories.downloaders;
 
 import android.net.Uri;
 import android.os.SystemClock;
 import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
+import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
 import org.mozilla.gecko.sync.CryptoRecord;
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
+import org.mozilla.gecko.sync.SyncDeadlineReachedException;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import ch.boye.httpclientandroidlib.ProtocolVersion;
 import ch.boye.httpclientandroidlib.message.BasicHttpResponse;
 import ch.boye.httpclientandroidlib.message.BasicStatusLine;
 
 import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
 
@@ -47,16 +56,64 @@ public class BatchingDownloaderTest {
     private MockDownloader mockDownloader;
     private String DEFAULT_COLLECTION_NAME = "dummyCollection";
     private String DEFAULT_COLLECTION_URL = "http://dummy.url/";
     private long DEFAULT_NEWER = 1;
     private String DEFAULT_SORT = "oldest";
     private String DEFAULT_IDS = "1";
     private String DEFAULT_LMHEADER = "12345678";
 
+    private CountingShadowRepositoryState repositoryStateProvider;
+
+    // Memory-backed state implementation which keeps a shadow of current values, so that they can be
+    // queried by the tests. Classes under test do not have access to the shadow, and follow regular
+    // non-persistent state semantics (value changes visible only after commit).
+    class CountingShadowRepositoryState extends NonPersistentRepositoryStateProvider {
+        private AtomicInteger commitCount = new AtomicInteger(0);
+        private final Map<String, Object> shadowMap = Collections.synchronizedMap(new HashMap<String, Object>(2));
+
+        @Override
+        public void commit() {
+            commitCount.incrementAndGet();
+            super.commit();
+        }
+
+        int getCommitCount() {
+            return commitCount.get();
+        }
+
+        @Nullable
+        public Long getShadowedLong(String key) {
+            return (Long) shadowMap.get(key);
+        }
+
+        @Nullable
+        public String getShadowedString(String key) {
+            return (String) shadowMap.get(key);
+        }
+
+        @Override
+        public void setLong(String key, Long value) {
+            shadowMap.put(key, value);
+            super.setLong(key, value);
+        }
+
+        @Override
+        public void setString(String key, String value) {
+            shadowMap.put(key, value);
+            super.setString(key, value);
+        }
+
+        @Override
+        public void clear(String key) {
+            shadowMap.remove(key);
+            super.clear(key);
+        }
+    }
+
     class MockSessionFetchRecordsDelegate implements RepositorySessionFetchRecordsDelegate {
         public boolean isFailure;
         public boolean isFetched;
         public boolean isSuccess;
         public int batchesCompleted;
         public Exception ex;
         public Record record;
 
@@ -104,18 +161,19 @@ public class BatchingDownloaderTest {
         public long newer;
         public long limit;
         public boolean full;
         public String sort;
         public String ids;
         public String offset;
         public boolean abort;
 
-        public MockDownloader(RepositorySession repositorySession, boolean allowMultipleBatches) {
-            super(null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), allowMultipleBatches, repositorySession);
+        public MockDownloader(RepositorySession repositorySession, boolean allowMultipleBatches, RepositoryStateProvider repositoryStateProvider) {
+            super(null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
+                    allowMultipleBatches, repositoryStateProvider, repositorySession);
         }
 
         @Override
         public void fetchWithParameters(long newer,
                                  long batchLimit,
                                  boolean full,
                                  String sort,
                                  String ids,
@@ -148,17 +206,19 @@ public class BatchingDownloaderTest {
             return super.makeSyncStorageCollectionRequest(newer, batchLimit, full, sort, ids, offset);
         }
     }
 
     class MockSever15Repository extends Server15Repository {
         public MockSever15Repository(@NonNull String collection, @NonNull String storageURL,
                                      AuthHeaderProvider authHeaderProvider, @NonNull InfoCollections infoCollections,
                                      @NonNull InfoConfiguration infoConfiguration) throws URISyntaxException {
-            super(collection, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), storageURL, authHeaderProvider, infoCollections, infoConfiguration);
+            super(collection, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
+                    storageURL, authHeaderProvider, infoCollections, infoConfiguration,
+                    new NonPersistentRepositoryStateProvider());
         }
     }
 
     class MockRepositorySession extends Server15RepositorySession {
         public boolean abort;
 
         public MockRepositorySession(Repository repository) {
             super(repository);
@@ -172,17 +232,18 @@ public class BatchingDownloaderTest {
 
     @Before
     public void setUp() throws Exception {
         sessionFetchRecordsDelegate = new MockSessionFetchRecordsDelegate();
 
         serverRepository = new MockSever15Repository(DEFAULT_COLLECTION_NAME, DEFAULT_COLLECTION_URL, null,
                 new InfoCollections(), new InfoConfiguration());
         repositorySession = new Server15RepositorySession(serverRepository);
-        mockDownloader = new MockDownloader(repositorySession, true);
+        repositoryStateProvider = new CountingShadowRepositoryState();
+        mockDownloader = new MockDownloader(repositorySession, true, repositoryStateProvider);
     }
 
     @Test
     public void testFlattenId() {
         String[] emptyGuid = new String[]{};
         String flatten =  BatchingDownloader.flattenIDs(emptyGuid);
         assertEquals("", flatten);
 
@@ -199,38 +260,44 @@ public class BatchingDownloaderTest {
         multiGuid[1] = guid1;
         multiGuid[2] = guid2;
         flatten = BatchingDownloader.flattenIDs(multiGuid);
         assertEquals("123456789abc,456789abc,789abc", flatten);
     }
 
     @Test
     public void testBatchingTrivial() throws Exception {
-        MockDownloader mockDownloader = new MockDownloader(repositorySession, true);
+        MockDownloader mockDownloader = new MockDownloader(repositorySession, true, repositoryStateProvider);
 
         assertNull(mockDownloader.getLastModified());
         // Number of records == batch limit.
         final long BATCH_LIMIT = 100;
         mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
 
         SyncStorageResponse response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, null, "100");
         SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
         mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request,
                 DEFAULT_NEWER, BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         assertTrue(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(0, sessionFetchRecordsDelegate.batchesCompleted);
+
+        // NB: we set highWaterMark as part of onFetchedRecord, so we don't expect it to be set here.
+        // Expect no offset to be persisted.
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+        assertNull(repositoryStateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+        assertEquals(1, repositoryStateProvider.getCommitCount());
     }
 
     @Test
     public void testBatchingSingleBatchMode() throws Exception {
-        MockDownloader mockDownloader = new MockDownloader(repositorySession, false);
+        MockDownloader mockDownloader = new MockDownloader(repositorySession, false, repositoryStateProvider);
 
         assertNull(mockDownloader.getLastModified());
         // Number of records > batch limit. But, we're only allowed to make one batch request.
         final long BATCH_LIMIT = 100;
         mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
 
         String offsetHeader = "25";
         String recordsHeader = "500";
@@ -239,82 +306,95 @@ public class BatchingDownloaderTest {
         mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request,
                 DEFAULT_NEWER, BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         assertTrue(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(0, sessionFetchRecordsDelegate.batchesCompleted);
+
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+        // We don't care about the offset in a single batch mode.
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_OFFSET));
+        assertEquals(1, repositoryStateProvider.getCommitCount());
     }
 
     @Test
     public void testBatching() throws Exception {
         final long BATCH_LIMIT = 25;
-        mockDownloader = new MockDownloader(repositorySession, true);
+        mockDownloader = new MockDownloader(repositorySession, true, repositoryStateProvider);
 
         assertNull(mockDownloader.getLastModified());
         mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
 
-        String offsetHeader = "25";
-        String recordsHeader = "25";
-        SyncStorageResponse response = makeSyncStorageResponse(200,  DEFAULT_LMHEADER, offsetHeader, recordsHeader);
-        SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
-        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
-                BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
+        final String recordsHeader = "25";
+        performOnFetchCompleted("25", recordsHeader, BATCH_LIMIT);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         // Verify the same parameters are used in the next fetch.
         assertSameParameters(mockDownloader, BATCH_LIMIT);
-        assertEquals(offsetHeader, mockDownloader.offset);
+        assertEquals("25", mockDownloader.offset);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(1, sessionFetchRecordsDelegate.batchesCompleted);
 
+        // Offset set, but not yet committed.
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+        assertEquals("25", repositoryStateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+        assertEquals(0, repositoryStateProvider.getCommitCount());
+
         // The next batch, we still have an offset token and has not exceed the total limit.
-        offsetHeader = "50";
-        response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
-        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
-                BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
+        performOnFetchCompleted("50", recordsHeader, BATCH_LIMIT);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         // Verify the same parameters are used in the next fetch.
         assertSameParameters(mockDownloader, BATCH_LIMIT);
-        assertEquals(offsetHeader, mockDownloader.offset);
+        assertEquals("50", mockDownloader.offset);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(2, sessionFetchRecordsDelegate.batchesCompleted);
 
+        // Offset updated, but not yet committed.
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+        assertEquals("50", repositoryStateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+        assertEquals(0, repositoryStateProvider.getCommitCount());
+
         // The next batch, we still have an offset token and has not exceed the total limit.
-        offsetHeader = "75";
-        response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
-        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
-                BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
+        performOnFetchCompleted("75", recordsHeader, BATCH_LIMIT);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         // Verify the same parameters are used in the next fetch.
         assertSameParameters(mockDownloader, BATCH_LIMIT);
-        assertEquals(offsetHeader, mockDownloader.offset);
+        assertEquals("75", mockDownloader.offset);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(3, sessionFetchRecordsDelegate.batchesCompleted);
 
+        // Offset updated, but not yet committed.
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+        assertEquals("75", repositoryStateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+        assertEquals(0, repositoryStateProvider.getCommitCount());
+
         // No more offset token, so we complete batching.
-        response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, null, recordsHeader);
-        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
-                BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
+        performOnFetchCompleted(null, recordsHeader, BATCH_LIMIT);
 
         assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
         assertTrue(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(3, sessionFetchRecordsDelegate.batchesCompleted);
+
+        // Offset cleared since we finished batching, and committed.
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+        assertNull(repositoryStateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+        assertEquals(1, repositoryStateProvider.getCommitCount());
     }
 
     @Test
     public void testFailureLMChangedMultiBatch() throws Exception {
         assertNull(mockDownloader.getLastModified());
 
         String lmHeader = "12345678";
         String offsetHeader = "100";
@@ -331,57 +411,172 @@ public class BatchingDownloaderTest {
         assertTrue(mockDownloader.full);
         assertEquals(DEFAULT_SORT, mockDownloader.sort);
         assertEquals(DEFAULT_IDS, mockDownloader.ids);
         assertEquals(offsetHeader, mockDownloader.offset);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
 
+        // Offset set, not yet committed.
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+        assertEquals("100", repositoryStateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+        assertEquals(0, repositoryStateProvider.getCommitCount());
+
         // Last modified header somehow changed.
         lmHeader = "10000000";
         response = makeSyncStorageResponse(200, lmHeader, offsetHeader, null);
         mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
                 limit, true, DEFAULT_SORT, DEFAULT_IDS);
 
         assertNotEquals(lmHeader, mockDownloader.getLastModified());
         assertTrue(mockDownloader.abort);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertTrue(sessionFetchRecordsDelegate.isFailure);
+
+        // Since we failed due to a remotely modified collection, we expect offset to be reset.
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+        assertNull(repositoryStateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+        assertEquals(1, repositoryStateProvider.getCommitCount());
     }
 
     @Test
     public void testFailureException() throws Exception {
         Exception ex = new IllegalStateException();
         SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI("http://dummy.url"));
         mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
 
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFetched);
         assertTrue(sessionFetchRecordsDelegate.isFailure);
         assertEquals(ex.getClass(), sessionFetchRecordsDelegate.ex.getClass());
         assertNull(sessionFetchRecordsDelegate.record);
     }
 
     @Test
+    public void testOffsetPersistedResetAfterDeadline() throws Exception {
+        final long BATCH_LIMIT = 25;
+        mockDownloader = new MockDownloader(repositorySession, true, repositoryStateProvider);
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
+
+        SyncStorageCollectionRequest request = performOnFetchCompleted("25", "25", 25);
+
+        // Offset set, but not yet committed.
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+        assertEquals("25", repositoryStateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+        assertEquals(0, repositoryStateProvider.getCommitCount());
+
+        Exception ex = new SyncDeadlineReachedException();
+        mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
+
+        // Offset is committed.
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+        assertEquals("25", repositoryStateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+        assertEquals(1, repositoryStateProvider.getCommitCount());
+    }
+
+    @Test
+    public void testOffsetResetAfterConcurrentModification() throws Exception {
+        final long BATCH_LIMIT = 25;
+        mockDownloader = new MockDownloader(repositorySession, true, repositoryStateProvider);
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
+
+        SyncStorageCollectionRequest request = performOnFetchCompleted("25", "25", 25);
+
+        Exception ex = new CollectionConcurrentModificationException();
+        mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
+
+        // Offset is committed.
+        assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+        assertNull(repositoryStateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
+        assertEquals(1, repositoryStateProvider.getCommitCount());
+    }
+
+    @Test
+    public void testResumeWithOffset() throws Exception {
+        repositoryStateProvider.setString(RepositoryStateProvider.KEY_OFFSET, "offset1");
+        repositoryStateProvider.setLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK, 5L);
+        repositoryStateProvider.commit();
+        assertEquals("offset1", repositoryStateProvider.getString(RepositoryStateProvider.KEY_OFFSET));
+        assertEquals(Long.valueOf(5L), repositoryStateProvider.getLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+
+        mockDownloader = new MockDownloader(repositorySession, true, repositoryStateProvider);
+
+        // Test that we'll resume from offset regardless of sort value.
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, 3L, 25, "oldest");
+        assertEquals("offset1", mockDownloader.offset);
+        assertEquals(3L, mockDownloader.newer);
+
+        repositoryStateProvider.setString(RepositoryStateProvider.KEY_OFFSET, "offset2");
+        repositoryStateProvider.commit();
+
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, 3L, 25, "newest");
+        assertEquals("offset2", mockDownloader.offset);
+        assertEquals(3L, mockDownloader.newer);
+
+        repositoryStateProvider.setString(RepositoryStateProvider.KEY_OFFSET, "offset3");
+        repositoryStateProvider.commit();
+
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, 3L, 25, "index");
+        assertEquals("offset3", mockDownloader.offset);
+        assertEquals(3L, mockDownloader.newer);
+    }
+
+    @Test
+    public void testResumeWithHighWaterMark() throws Exception {
+        repositoryStateProvider.setLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK, 7L);
+        repositoryStateProvider.commit();
+        assertEquals(Long.valueOf(7L), repositoryStateProvider.getLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+
+        mockDownloader = new MockDownloader(repositorySession, true, repositoryStateProvider);
+
+        // Test that we'll use h.w.m. when sorting is by oldest.
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, 5L, 25, "oldest");
+        assertEquals(null, mockDownloader.offset);
+        assertEquals(7L, mockDownloader.newer);
+
+        // Test that we won't use h.w.m. when sorting is not oldest
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, 5L, 25, "newest");
+        assertEquals(null, mockDownloader.offset);
+        assertEquals(5L, mockDownloader.newer);
+
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, 5L, 25, "index");
+        assertEquals(null, mockDownloader.offset);
+        assertEquals(5L, mockDownloader.newer);
+    }
+
+    @Test
+    public void testNoResume() throws Exception {
+        assertNull(repositoryStateProvider.getString(RepositoryStateProvider.KEY_OFFSET));
+        assertNull(repositoryStateProvider.getLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
+
+        mockDownloader = new MockDownloader(repositorySession, true, repositoryStateProvider);
+        mockDownloader.fetchSince(sessionFetchRecordsDelegate, 3L, 25, DEFAULT_SORT);
+        assertNull(mockDownloader.offset);
+        assertEquals(3L, mockDownloader.newer);
+    }
+
+    @Test
     public void testFetchRecord() {
         CryptoRecord record = new CryptoRecord();
         mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
 
         assertTrue(sessionFetchRecordsDelegate.isFetched);
         assertFalse(sessionFetchRecordsDelegate.isSuccess);
         assertFalse(sessionFetchRecordsDelegate.isFailure);
         assertEquals(record, sessionFetchRecordsDelegate.record);
     }
 
     @Test
     public void testAbortRequests() {
         MockRepositorySession mockRepositorySession = new MockRepositorySession(serverRepository);
-        BatchingDownloader downloader = new BatchingDownloader(null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), true, mockRepositorySession);
+        BatchingDownloader downloader = new BatchingDownloader(
+                null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
+                true, new NonPersistentRepositoryStateProvider(), mockRepositorySession);
         assertFalse(mockRepositorySession.abort);
         downloader.abortRequests();
         assertTrue(mockRepositorySession.abort);
     }
 
     @Test
     public void testBuildCollectionURI() {
         try {
@@ -393,16 +588,25 @@ public class BatchingDownloaderTest {
 
             final Uri baseUri = Uri.parse("https://moztest.org/collection/");
             assertEquals(baseUri + "?full=1&ids=123%2Cabc&offset=1234", BatchingDownloader.buildCollectionURI(baseUri, true, -1L, -1, null, "123,abc", "1234").toString());
         } catch (URISyntaxException e) {
             fail();
         }
     }
 
+    private SyncStorageCollectionRequest performOnFetchCompleted(String offsetHeader, String recordsHeader, long batchLimit) throws URISyntaxException {
+        SyncStorageResponse response = makeSyncStorageResponse(200,  DEFAULT_LMHEADER, offsetHeader, recordsHeader);
+        SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
+        mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
+                batchLimit, true, DEFAULT_SORT, DEFAULT_IDS);
+
+        return request;
+    }
+
     private void assertSameParameters(MockDownloader mockDownloader, long limit) {
         assertEquals(DEFAULT_NEWER, mockDownloader.newer);
         assertEquals(limit, mockDownloader.limit);
         assertTrue(mockDownloader.full);
         assertEquals(DEFAULT_SORT, mockDownloader.sort);
         assertEquals(DEFAULT_IDS, mockDownloader.ids);
     }
 
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -13,16 +13,17 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.gecko.background.testhelpers.MockRecord;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
 import org.mozilla.gecko.sync.ExtendedJSONObject;
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.Utils;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
 
 import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.AbstractExecutorService;
@@ -458,16 +459,17 @@ public class BatchingUploaderTest {
     private Server15Repository makeConstrainedRepository(ExtendedJSONObject infoConfigurationJSON) {
         try {
             return new Server15Repository(
                     "dummyCollection",
                     SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
                     "http://dummy.url/",
                     null,
                     new InfoCollections(),
-                    new InfoConfiguration(infoConfigurationJSON)
+                    new InfoConfiguration(infoConfigurationJSON),
+                    new NonPersistentRepositoryStateProvider()
             );
         } catch (URISyntaxException e) {
             // Won't throw, and this won't happen.
             return null;
         }
     }
 }
\ No newline at end of file
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
@@ -11,16 +11,17 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mozilla.gecko.background.testhelpers.TestRunner;
 import org.mozilla.gecko.sync.HTTPFailureException;
 import org.mozilla.gecko.sync.InfoCollections;
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.NonObjectJSONException;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
 
 import java.io.ByteArrayInputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -88,17 +89,18 @@ public class PayloadUploadDelegateTest {
     @Before
     public void setUp() throws Exception {
         Server15Repository server15Repository = new Server15Repository(
                 "dummyCollection",
                 SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
                 "http://dummy.url/",
                 null,
                 new InfoCollections(),
-                new InfoConfiguration()
+                new InfoConfiguration(),
+                new NonPersistentRepositoryStateProvider()
         );
         batchingUploader = new MockUploader(
                 new Server15RepositorySession(server15Repository),
                 null,
                 null
         );
     }