Bug 1291821 - Buffering repository middleware r=rnewman
authorGrisha Kruglov <gkruglov@mozilla.com>
Thu, 19 Jan 2017 13:11:41 -0800
changeset 373888 86bc523c8c6024f4f8737813498e994c53625a87
parent 373887 ff7ff02d0edfcf053d1dd0b9a7a2c983aec311ef
child 373889 e41e4bbf72ab1283f7ab13d0d8e1b20b7dadcbe5
push id10863
push userjlorenzo@mozilla.com
push dateMon, 06 Mar 2017 23:02:23 +0000
treeherdermozilla-aurora@0931190cd725 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersrnewman
bugs1291821
milestone54.0a1
Bug 1291821 - Buffering repository middleware r=rnewman MozReview-Commit-ID: GS3M7k670Po
mobile/android/base/android-services.mozbuild
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -896,20 +896,24 @@ sync_java_files = [TOPSRCDIR + '/mobile/
     'sync/InfoConfiguration.java',
     'sync/InfoCounts.java',
     'sync/JSONRecordFetcher.java',
     'sync/KeyBundleProvider.java',
     'sync/MetaGlobal.java',
     'sync/MetaGlobalException.java',
     'sync/MetaGlobalMissingEnginesException.java',
     'sync/MetaGlobalNotSetException.java',
+    'sync/middleware/BufferingMiddlewareRepository.java',
+    'sync/middleware/BufferingMiddlewareRepositorySession.java',
     'sync/middleware/Crypto5MiddlewareRepository.java',
     'sync/middleware/Crypto5MiddlewareRepositorySession.java',
     'sync/middleware/MiddlewareRepository.java',
     'sync/middleware/MiddlewareRepositorySession.java',
+    'sync/middleware/storage/BufferStorage.java',
+    'sync/middleware/storage/MemoryBufferStorage.java',
     'sync/net/AbstractBearerTokenAuthHeaderProvider.java',
     'sync/net/AuthHeaderProvider.java',
     'sync/net/BaseResource.java',
     'sync/net/BaseResourceDelegate.java',
     'sync/net/BasicAuthHeaderProvider.java',
     'sync/net/BearerAuthHeaderProvider.java',
     'sync/net/BrowserIDAuthHeaderProvider.java',
     'sync/net/ConnectionMonitorThread.java',
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java
@@ -0,0 +1,60 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.middleware;
+
+import android.content.Context;
+
+import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
+import org.mozilla.gecko.sync.repositories.Repository;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
+
+/**
+ * A buffering-enabled middleware which is intended to wrap local repositories. Configurable with
+ * a sync deadline, buffer storage implementation and a consistency checker implementation.
+ *
+ * @author grisha
+ */
+public class BufferingMiddlewareRepository extends MiddlewareRepository {
+    private final long syncDeadline;
+    private final Repository inner;
+    private final BufferStorage bufferStorage;
+
+    private class BufferingMiddlewareRepositorySessionCreationDelegate extends MiddlewareRepository.SessionCreationDelegate {
+        private final BufferingMiddlewareRepository repository;
+        private final RepositorySessionCreationDelegate outerDelegate;
+
+        private BufferingMiddlewareRepositorySessionCreationDelegate(BufferingMiddlewareRepository repository, RepositorySessionCreationDelegate outerDelegate) {
+            this.repository = repository;
+            this.outerDelegate = outerDelegate;
+        }
+
+        @Override
+        public void onSessionCreateFailed(Exception ex) {
+            this.outerDelegate.onSessionCreateFailed(ex);
+        }
+
+        @Override
+        public void onSessionCreated(RepositorySession session) {
+            outerDelegate.onSessionCreated(new BufferingMiddlewareRepositorySession(
+                    session, this.repository, syncDeadline, bufferStorage
+            ));
+        }
+    }
+
+    public BufferingMiddlewareRepository(long syncDeadline, BufferStorage bufferStore, Repository wrappedRepository) {
+        this.syncDeadline = syncDeadline;
+        this.inner = wrappedRepository;
+        this.bufferStorage = bufferStore;
+    }
+
+    @Override
+    public void createSession(RepositorySessionCreationDelegate delegate, Context context) {
+        this.inner.createSession(
+                new BufferingMiddlewareRepositorySessionCreationDelegate(this, delegate),
+                context
+        );
+    }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
@@ -0,0 +1,193 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.middleware;
+
+import android.os.SystemClock;
+import android.support.annotation.VisibleForTesting;
+
+import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
+import org.mozilla.gecko.sync.repositories.InactiveSessionException;
+import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Buffering middleware which is intended to wrap local RepositorySessions.
+ *
+ * Configure it:
+ *  - with an appropriate BufferStore (in-memory, record-type-aware database-backed, etc).
+ *
+ *  Fetch is pass-through, store is buffered.
+ *
+ * @author grisha
+ */
+/* package-local */ class BufferingMiddlewareRepositorySession extends MiddlewareRepositorySession {
+    private final BufferStorage bufferStorage;
+    private final long syncDeadlineMillis;
+
+    private ExecutorService storeDelegateExecutor = Executors.newSingleThreadExecutor();
+
+    private volatile boolean storeMarkedIncomplete = false;
+
+    /* package-local */ BufferingMiddlewareRepositorySession(
+            RepositorySession repositorySession, MiddlewareRepository repository,
+            long syncDeadlineMillis, BufferStorage bufferStorage) {
+        super(repositorySession, repository);
+        this.syncDeadlineMillis = syncDeadlineMillis;
+        this.bufferStorage = bufferStorage;
+    }
+
+    @Override
+    public void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate delegate) {
+        this.inner.fetchSince(timestamp, delegate);
+    }
+
+    @Override
+    public void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException {
+        this.inner.fetch(guids, delegate);
+    }
+
+    @Override
+    public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) {
+        this.inner.fetchAll(delegate);
+    }
+
+    /**
+     * Will be called when this repository is acting as a `source`, and a flow of records into `sink`
+     * was completed. That is, we've uploaded merged records to the server, so now is a good time
+     * to clean up our buffer for this repository.
+     */
+    @Override
+    public void performCleanup() {
+        bufferStorage.clear();
+    }
+
+    @Override
+    public void store(Record record) throws NoStoreDelegateException {
+        bufferStorage.addOrReplace(record);
+    }
+
+    @Override
+    public void storeIncomplete() {
+        storeMarkedIncomplete = true;
+    }
+
+    @Override
+    public void storeDone() {
+        storeDone(System.currentTimeMillis());
+    }
+
+    @Override
+    public void storeFlush() {
+        bufferStorage.flush();
+    }
+
+    @Override
+    public void storeDone(final long end) {
+        doStoreDonePrepare();
+
+        // Determine if we have enough time to perform consistency checks on the buffered data and
+        // then store it. If we don't have enough time now, we keep our buffer and try again later.
+        // We don't store results of a buffer consistency check anywhere, so we can't treat it
+        // separately from storage.
+        if (storeMarkedIncomplete || !mayProceedToMergeBuffer()) {
+            super.abort();
+            storeDelegate.deferredStoreDelegate(storeDelegateExecutor).onStoreCompleted(end);
+            return;
+        }
+
+        // Separate actual merge, so that it may be tested without involving system clock.
+        doStoreDone(end);
+    }
+
+    @VisibleForTesting
+    public void doStoreDonePrepare() {
+        // Now that records stopped flowing, persist them.
+        bufferStorage.flush();
+    }
+
+    @VisibleForTesting
+    public void doStoreDone(final long end) {
+        final Collection<Record> buffer = bufferStorage.all();
+
+        // Trivial case of an empty buffer.
+        if (buffer.isEmpty()) {
+            super.storeDone(end);
+            return;
+        }
+
+        // Flush our buffer to the wrapped local repository. Data goes live!
+        try {
+            for (Record record : buffer) {
+                this.inner.store(record);
+            }
+        } catch (NoStoreDelegateException e) {
+            // At this point we should have a delegate, so this won't happen.
+        }
+
+        // And, we're done!
+        super.storeDone(end);
+    }
+
+    /**
+     * When source fails to provide more records, we need to decide what to do with the buffer.
+     * We might fail because of a network partition, or because of a concurrent modification of a
+     * collection. Either way we do not clear the buffer in a general case. If a collection has been
+     * modified, affected records' last-modified timestamps will be bumped, and we will receive those
+     * records during the next sync. If we already have them in our buffer, we replace our now-old
+     * copy. Otherwise, they are new records and we just append them.
+     *
+     * We depend on GUIDs to be a primary key for incoming records.
+     *
+     * @param e indicates reason of failure.
+     */
+    @Override
+    public void sourceFailed(Exception e) {
+        bufferStorage.flush();
+        super.sourceFailed(e);
+    }
+
+    /**
+     * Session abnormally aborted. This doesn't mean our so-far buffered data is invalid.
+     * Clean up after ourselves, if there's anything to clean up.
+     */
+    @Override
+    public void abort() {
+        bufferStorage.flush();
+        super.abort();
+    }
+
+    @Override
+    public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
+        inner.setStoreDelegate(delegate);
+        this.storeDelegate = delegate;
+    }
+
+    @Override
+    public long getHighWaterMarkTimestamp() {
+        return bufferStorage.latestModifiedTimestamp();
+    }
+
+    private boolean mayProceedToMergeBuffer() {
+        // If our buffer storage is not persistent, disallowing merging after buffer has been filled
+        // means throwing away records only to re-download them later.
+        // In this case allow merge to proceed even if we're past the deadline.
+        if (!bufferStorage.isPersistent()) {
+            return true;
+        }
+
+        // While actual runtime of a merge operation is a function of record type, buffer size, etc.,
+        // let's do a simple thing for now and say that we may proceed if we have couple of minutes
+        // of runtime left. That surely is enough, right?
+        final long timeLeftMillis = syncDeadlineMillis - SystemClock.elapsedRealtime();
+        return timeLeftMillis > 1000 * 60 * 2;
+    }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
@@ -0,0 +1,35 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.middleware.storage;
+
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import java.util.Collection;
+
+/**
+ * A contract between BufferingMiddleware and specific storage implementations.
+ *
+ * @author grisha
+ */
+public interface BufferStorage {
+    // Returns all of the records currently present in the buffer.
+    Collection<Record> all();
+
+    // Implementations are responsible to ensure that any incoming records with duplicate GUIDs replace
+    // what's already present in the storage layer.
+    // NB: For a database-backed storage, "replace" happens at a transaction level.
+    void addOrReplace(Record record);
+
+    // For database-backed implementations, commits any records that came in up to this point.
+    void flush();
+
+    void clear();
+
+    // For buffers that are filled up oldest-first this is a high water mark, which enables resuming
+    // a sync.
+    long latestModifiedTimestamp();
+
+    boolean isPersistent();
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
@@ -0,0 +1,70 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.middleware.storage;
+
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A trivial, memory-backed, transient implementation of a BufferStorage.
+ * Its intended use is to buffer syncing of small collections.
+ * Thread-safe.
+ *
+ * @author grisha
+ */
+public class MemoryBufferStorage implements BufferStorage {
+    private final Map<String, Record> recordBuffer = Collections.synchronizedMap(new HashMap<String, Record>());
+
+    @Override
+    public boolean isPersistent() {
+        return false;
+    }
+
+    @Override
+    public Collection<Record> all() {
+        synchronized (recordBuffer) {
+            return new ArrayList<>(recordBuffer.values());
+        }
+    }
+
+    @Override
+    public void addOrReplace(Record record) {
+        recordBuffer.put(record.guid, record);
+    }
+
+    @Override
+    public void flush() {
+        // This is a no-op; flush intended for database-backed stores.
+    }
+
+    @Override
+    public void clear() {
+        recordBuffer.clear();
+    }
+
+    @Override
+    public long latestModifiedTimestamp() {
+        long lastModified = 0;
+
+        synchronized (recordBuffer) {
+            if (recordBuffer.size() == 0) {
+                return lastModified;
+            }
+
+            for (Record record : recordBuffer.values()) {
+                if (record.lastModified > lastModified) {
+                    lastModified = record.lastModified;
+                }
+            }
+        }
+
+        return lastModified;
+    }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
@@ -71,16 +71,21 @@ public abstract class RepositorySession 
 
   // The time that the last sync on this collection completed, in milliseconds since epoch.
   private long lastSyncTimestamp = 0;
 
   public long getLastSyncTimestamp() {
     return lastSyncTimestamp;
   }
 
+  // Override this in the buffering wrappers.
+  public long getHighWaterMarkTimestamp() {
+    return 0;
+  }
+
   public static long now() {
     return System.currentTimeMillis();
   }
 
   public RepositorySession(Repository repository) {
     this.repository = repository;
   }
 
@@ -141,16 +146,37 @@ public abstract class RepositorySession 
       @Override
       public void run() {
         delegate.onStoreCompleted(end);
       }
     };
     storeWorkQueue.execute(command);
   }
 
+  /**
+   * Indicates that a number of records have been stored, more are still to come but after some time,
+   * and now would be a good time to flush records and perform any other similar operations.
+   */
+  public void storeFlush() {
+  }
+
+  /**
+   * During flow of records, indicates that source failed.
+   *
+   * @param e indicates reason of failure.
+     */
+  public void sourceFailed(Exception e) {
+  }
+
+  /**
+   * Indicates that a flow of records have been completed.
+   */
+  public void performCleanup() {
+  }
+
   public abstract void wipe(RepositorySessionWipeDelegate delegate);
 
   /**
    * Synchronously perform the shared work of beginning. Throws on failure.
    * @throws InvalidSessionTransitionException
    *
    */
   protected void sharedBegin() throws InvalidSessionTransitionException {
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -243,16 +243,19 @@ public class RecordsChannel implements
       this.sink.storeDone(); // Now we'll be waiting for onStoreCompleted.
     }
   }
 
   @Override
   public void onStoreCompleted(long storeEnd) {
     Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " +
                           "Fetch end is " + fetchEnd + ", store end is " + storeEnd);
+    // Source might have used caches used to facilitate flow of records, so now is a good
+    // time to clean up. Particularly pertinent for buffered sources.
+    this.source.performCleanup();
     // TODO: synchronize on consumer callback?
     delegate.onFlowCompleted(this, fetchEnd, storeEnd);
   }
 
   @Override
   public void onBeginFailed(Exception ex) {
     delegate.onFlowBeginFailed(this, ex);
   }
new file mode 100644
--- /dev/null
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
@@ -0,0 +1,234 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+package org.mozilla.gecko.sync.middleware;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mozilla.gecko.background.testhelpers.MockRecord;
+import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
+import org.mozilla.gecko.sync.repositories.Repository;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class BufferingMiddlewareRepositorySessionTest {
+    private RepositorySession innerRepositorySession;
+    private BufferingMiddlewareRepositorySession bufferingSession;
+    private BufferingMiddlewareRepositorySession bufferingSessionMocked;
+    private BufferStorage bufferStorage;
+    private BufferStorage bufferStorageMocked;
+
+    @Before
+    public void setUp() throws Exception {
+        BufferingMiddlewareRepository bufferingRepository;
+        Repository innerRepositoy;
+
+        innerRepositoy = mock(Repository.class);
+        innerRepositorySession = mock(RepositorySession.class);
+        bufferingRepository = new BufferingMiddlewareRepository(
+                0L,
+                new MemoryBufferStorage(),
+                innerRepositoy
+        );
+
+        bufferStorage = new MemoryBufferStorage();
+        bufferStorageMocked = mock(MemoryBufferStorage.class);
+
+        bufferingSession = new BufferingMiddlewareRepositorySession(
+                innerRepositorySession, bufferingRepository, 0L,
+                bufferStorage);
+
+        bufferingSessionMocked = new BufferingMiddlewareRepositorySession(
+                innerRepositorySession, bufferingRepository, 0L,
+                bufferStorageMocked);
+    }
+
+    @Test
+    public void store() throws Exception {
+        assertEquals(0, bufferStorage.all().size());
+
+        MockRecord record = new MockRecord("guid1", null, 1, false);
+        bufferingSession.store(record);
+        assertEquals(1, bufferStorage.all().size());
+
+        MockRecord record1 = new MockRecord("guid2", null, 1, false);
+        bufferingSession.store(record1);
+        assertEquals(2, bufferStorage.all().size());
+        assertEquals(1, bufferStorage.latestModifiedTimestamp());
+
+        // record2 must replace record.
+        MockRecord record2 = new MockRecord("guid1", null, 2, false);
+        bufferingSession.store(record2);
+        assertEquals(2, bufferStorage.all().size());
+        assertEquals(2, bufferStorage.latestModifiedTimestamp());
+
+        // Ensure inner session doesn't see incoming records.
+        verify(innerRepositorySession, never()).store(record);
+        verify(innerRepositorySession, never()).store(record1);
+        verify(innerRepositorySession, never()).store(record2);
+    }
+
+    @Test
+    public void storeDone() throws Exception {
+        // Verify that storage's flush is called.
+        verify(bufferStorageMocked, times(0)).flush();
+        bufferingSessionMocked.doStoreDonePrepare();
+        verify(bufferStorageMocked, times(1)).flush();
+
+        // Trivial case, no records to merge.
+        bufferingSession.doStoreDone(123L);
+        verify(innerRepositorySession, times(1)).storeDone(123L);
+        verify(innerRepositorySession, never()).store(any(Record.class));
+
+        // Reset call counters.
+        reset(innerRepositorySession);
+
+        // Now store some records.
+        MockRecord record = new MockRecord("guid1", null, 1, false);
+        bufferingSession.store(record);
+
+        MockRecord record2 = new MockRecord("guid2", null, 13, false);
+        bufferingSession.store(record2);
+
+        MockRecord record3 = new MockRecord("guid3", null, 5, false);
+        bufferingSession.store(record3);
+
+        // NB: same guid as above.
+        MockRecord record4 = new MockRecord("guid3", null, -1, false);
+        bufferingSession.store(record4);
+
+        // Done storing.
+        bufferingSession.doStoreDone(123L);
+
+        // Ensure all records where stored in the wrapped session.
+        verify(innerRepositorySession, times(1)).store(record);
+        verify(innerRepositorySession, times(1)).store(record2);
+        verify(innerRepositorySession, times(1)).store(record4);
+
+        // Ensure storeDone was called on the wrapped session.
+        verify(innerRepositorySession, times(1)).storeDone(123L);
+
+        // Ensure buffer wasn't cleared on the wrapped session.
+        assertEquals(3, bufferStorage.all().size());
+    }
+
+    @Test
+    public void storeFlush() throws Exception {
+        verify(bufferStorageMocked, times(0)).flush();
+        bufferingSessionMocked.storeFlush();
+        verify(bufferStorageMocked, times(1)).flush();
+    }
+
+    @Test
+    public void performCleanup() throws Exception {
+        // Baseline.
+        assertEquals(0, bufferStorage.all().size());
+
+        // Test that we can call cleanup with an empty buffer storage.
+        bufferingSession.performCleanup();
+        assertEquals(0, bufferStorage.all().size());
+
+        // Store a couple of records.
+        MockRecord record = new MockRecord("guid1", null, 1, false);
+        bufferingSession.store(record);
+
+        MockRecord record2 = new MockRecord("guid2", null, 13, false);
+        bufferingSession.store(record2);
+
+        // Confirm it worked.
+        assertEquals(2, bufferStorage.all().size());
+
+        // Test that buffer storage is cleaned up.
+        bufferingSession.performCleanup();
+        assertEquals(0, bufferStorage.all().size());
+    }
+
+    @Test
+    public void sourceFailed() throws Exception {
+        // Source failes before any records have been stored.
+        bufferingSession.sourceFailed(new Exception());
+        assertEquals(0, bufferStorage.all().size());
+
+        // Store some records now.
+        MockRecord record = new MockRecord("guid1", null, 1, false);
+        bufferingSession.store(record);
+
+        MockRecord record2 = new MockRecord("guid2", null, 13, false);
+        bufferingSession.store(record2);
+
+        MockRecord record3 = new MockRecord("guid3", null, 5, false);
+        bufferingSession.store(record3);
+
+        // Verify that buffer is intact after source fails.
+        bufferingSession.sourceFailed(new Exception());
+        assertEquals(3, bufferStorage.all().size());
+
+        // Verify that buffer is flushed after source fails.
+        verify(bufferStorageMocked, times(0)).flush();
+        bufferingSessionMocked.sourceFailed(new Exception());
+        verify(bufferStorageMocked, times(1)).flush();
+    }
+
+    @Test
+    public void abort() throws Exception {
+        MockRecord record = new MockRecord("guid1", null, 1, false);
+        bufferingSession.store(record);
+
+        MockRecord record2 = new MockRecord("guid2", null, 13, false);
+        bufferingSession.store(record2);
+
+        MockRecord record3 = new MockRecord("guid3", null, 5, false);
+        bufferingSession.store(record3);
+
+        // NB: same guid as above.
+        MockRecord record4 = new MockRecord("guid3", null, -1, false);
+        bufferingSession.store(record4);
+
+        bufferingSession.abort();
+
+        // Verify number of records didn't change.
+        // Abort shouldn't clear the buffer.
+        assertEquals(3, bufferStorage.all().size());
+    }
+
+    @Test
+    public void setStoreDelegate() throws Exception {
+        RepositorySessionStoreDelegate delegate = mock(RepositorySessionStoreDelegate.class);
+        bufferingSession.setStoreDelegate(delegate);
+        verify(innerRepositorySession).setStoreDelegate(delegate);
+    }
+
+    @Test
+    public void getHighWaterMarkTimestamp() throws Exception {
+        // Trivial case, empty buffer.
+        assertEquals(0, bufferingSession.getHighWaterMarkTimestamp());
+
+        MockRecord record = new MockRecord("guid1", null, 1, false);
+        bufferingSession.store(record);
+        assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
+
+        MockRecord record3 = new MockRecord("guid3", null, 5, false);
+        bufferingSession.store(record3);
+        assertEquals(5, bufferingSession.getHighWaterMarkTimestamp());
+
+        // NB: same guid as above.
+        MockRecord record4 = new MockRecord("guid3", null, -1, false);
+        bufferingSession.store(record4);
+        assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
+
+        MockRecord record2 = new MockRecord("guid2", null, 13, false);
+        bufferingSession.store(record2);
+        assertEquals(13, bufferingSession.getHighWaterMarkTimestamp());
+    }
+}
\ No newline at end of file