Bug 1291821 - Buffering repository middleware r=rnewman
authorGrisha Kruglov <gkruglov@mozilla.com>
Thu, 19 Jan 2017 13:11:41 -0800
changeset 344859 86bc523c8c6024f4f8737813498e994c53625a87
parent 344858 ff7ff02d0edfcf053d1dd0b9a7a2c983aec311ef
child 344860 e41e4bbf72ab1283f7ab13d0d8e1b20b7dadcbe5
push id37970
push usergkruglov@mozilla.com
push dateSat, 25 Feb 2017 01:09:28 +0000
treeherderautoland@bd232d46a396 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersrnewman
bugs1291821
milestone54.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1291821 - Buffering repository middleware r=rnewman MozReview-Commit-ID: GS3M7k670Po
mobile/android/base/android-services.mozbuild
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -896,20 +896,24 @@ sync_java_files = [TOPSRCDIR + '/mobile/
     'sync/InfoConfiguration.java',
     'sync/InfoCounts.java',
     'sync/JSONRecordFetcher.java',
     'sync/KeyBundleProvider.java',
     'sync/MetaGlobal.java',
     'sync/MetaGlobalException.java',
     'sync/MetaGlobalMissingEnginesException.java',
     'sync/MetaGlobalNotSetException.java',
+    'sync/middleware/BufferingMiddlewareRepository.java',
+    'sync/middleware/BufferingMiddlewareRepositorySession.java',
     'sync/middleware/Crypto5MiddlewareRepository.java',
     'sync/middleware/Crypto5MiddlewareRepositorySession.java',
     'sync/middleware/MiddlewareRepository.java',
     'sync/middleware/MiddlewareRepositorySession.java',
+    'sync/middleware/storage/BufferStorage.java',
+    'sync/middleware/storage/MemoryBufferStorage.java',
     'sync/net/AbstractBearerTokenAuthHeaderProvider.java',
     'sync/net/AuthHeaderProvider.java',
     'sync/net/BaseResource.java',
     'sync/net/BaseResourceDelegate.java',
     'sync/net/BasicAuthHeaderProvider.java',
     'sync/net/BearerAuthHeaderProvider.java',
     'sync/net/BrowserIDAuthHeaderProvider.java',
     'sync/net/ConnectionMonitorThread.java',
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java
@@ -0,0 +1,60 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.middleware;
+
+import android.content.Context;
+
+import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
+import org.mozilla.gecko.sync.repositories.Repository;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
+
+/**
+ * A buffering-enabled middleware which is intended to wrap local repositories. Configurable with
+ * a sync deadline, buffer storage implementation and a consistency checker implementation.
+ *
+ * @author grisha
+ */
+public class BufferingMiddlewareRepository extends MiddlewareRepository {
+    private final long syncDeadline;
+    private final Repository inner;
+    private final BufferStorage bufferStorage;
+
+    private class BufferingMiddlewareRepositorySessionCreationDelegate extends MiddlewareRepository.SessionCreationDelegate {
+        private final BufferingMiddlewareRepository repository;
+        private final RepositorySessionCreationDelegate outerDelegate;
+
+        private BufferingMiddlewareRepositorySessionCreationDelegate(BufferingMiddlewareRepository repository, RepositorySessionCreationDelegate outerDelegate) {
+            this.repository = repository;
+            this.outerDelegate = outerDelegate;
+        }
+
+        @Override
+        public void onSessionCreateFailed(Exception ex) {
+            this.outerDelegate.onSessionCreateFailed(ex);
+        }
+
+        @Override
+        public void onSessionCreated(RepositorySession session) {
+            outerDelegate.onSessionCreated(new BufferingMiddlewareRepositorySession(
+                    session, this.repository, syncDeadline, bufferStorage
+            ));
+        }
+    }
+
+    public BufferingMiddlewareRepository(long syncDeadline, BufferStorage bufferStore, Repository wrappedRepository) {
+        this.syncDeadline = syncDeadline;
+        this.inner = wrappedRepository;
+        this.bufferStorage = bufferStore;
+    }
+
+    @Override
+    public void createSession(RepositorySessionCreationDelegate delegate, Context context) {
+        this.inner.createSession(
+                new BufferingMiddlewareRepositorySessionCreationDelegate(this, delegate),
+                context
+        );
+    }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java
@@ -0,0 +1,193 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.middleware;
+
+import android.os.SystemClock;
+import android.support.annotation.VisibleForTesting;
+
+import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
+import org.mozilla.gecko.sync.repositories.InactiveSessionException;
+import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Buffering middleware which is intended to wrap local RepositorySessions.
+ *
+ * Configure it:
+ *  - with an appropriate BufferStore (in-memory, record-type-aware database-backed, etc).
+ *
+ *  Fetch is pass-through, store is buffered.
+ *
+ * @author grisha
+ */
+/* package-local */ class BufferingMiddlewareRepositorySession extends MiddlewareRepositorySession {
+    private final BufferStorage bufferStorage;
+    private final long syncDeadlineMillis;
+
+    private ExecutorService storeDelegateExecutor = Executors.newSingleThreadExecutor();
+
+    private volatile boolean storeMarkedIncomplete = false;
+
+    /* package-local */ BufferingMiddlewareRepositorySession(
+            RepositorySession repositorySession, MiddlewareRepository repository,
+            long syncDeadlineMillis, BufferStorage bufferStorage) {
+        super(repositorySession, repository);
+        this.syncDeadlineMillis = syncDeadlineMillis;
+        this.bufferStorage = bufferStorage;
+    }
+
+    @Override
+    public void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate delegate) {
+        this.inner.fetchSince(timestamp, delegate);
+    }
+
+    @Override
+    public void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException {
+        this.inner.fetch(guids, delegate);
+    }
+
+    @Override
+    public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) {
+        this.inner.fetchAll(delegate);
+    }
+
+    /**
+     * Will be called when this repository is acting as a `source`, and a flow of records into `sink`
+     * was completed. That is, we've uploaded merged records to the server, so now is a good time
+     * to clean up our buffer for this repository.
+     */
+    @Override
+    public void performCleanup() {
+        bufferStorage.clear();
+    }
+
+    @Override
+    public void store(Record record) throws NoStoreDelegateException {
+        bufferStorage.addOrReplace(record);
+    }
+
+    @Override
+    public void storeIncomplete() {
+        storeMarkedIncomplete = true;
+    }
+
+    @Override
+    public void storeDone() {
+        storeDone(System.currentTimeMillis());
+    }
+
+    @Override
+    public void storeFlush() {
+        bufferStorage.flush();
+    }
+
+    @Override
+    public void storeDone(final long end) {
+        doStoreDonePrepare();
+
+        // Determine if we have enough time to perform consistency checks on the buffered data and
+        // then store it. If we don't have enough time now, we keep our buffer and try again later.
+        // We don't store results of a buffer consistency check anywhere, so we can't treat it
+        // separately from storage.
+        if (storeMarkedIncomplete || !mayProceedToMergeBuffer()) {
+            super.abort();
+            storeDelegate.deferredStoreDelegate(storeDelegateExecutor).onStoreCompleted(end);
+            return;
+        }
+
+        // Separate actual merge, so that it may be tested without involving system clock.
+        doStoreDone(end);
+    }
+
+    @VisibleForTesting
+    public void doStoreDonePrepare() {
+        // Now that records stopped flowing, persist them.
+        bufferStorage.flush();
+    }
+
+    @VisibleForTesting
+    public void doStoreDone(final long end) {
+        final Collection<Record> buffer = bufferStorage.all();
+
+        // Trivial case of an empty buffer.
+        if (buffer.isEmpty()) {
+            super.storeDone(end);
+            return;
+        }
+
+        // Flush our buffer to the wrapped local repository. Data goes live!
+        try {
+            for (Record record : buffer) {
+                this.inner.store(record);
+            }
+        } catch (NoStoreDelegateException e) {
+            // At this point we should have a delegate, so this won't happen.
+        }
+
+        // And, we're done!
+        super.storeDone(end);
+    }
+
+    /**
+     * When source fails to provide more records, we need to decide what to do with the buffer.
+     * We might fail because of a network partition, or because of a concurrent modification of a
+     * collection. Either way we do not clear the buffer in a general case. If a collection has been
+     * modified, affected records' last-modified timestamps will be bumped, and we will receive those
+     * records during the next sync. If we already have them in our buffer, we replace our now-old
+     * copy. Otherwise, they are new records and we just append them.
+     *
+     * We depend on GUIDs to be a primary key for incoming records.
+     *
+     * @param e indicates reason of failure.
+     */
+    @Override
+    public void sourceFailed(Exception e) {
+        bufferStorage.flush();
+        super.sourceFailed(e);
+    }
+
+    /**
+     * Session abnormally aborted. This doesn't mean our so-far buffered data is invalid.
+     * Clean up after ourselves, if there's anything to clean up.
+     */
+    @Override
+    public void abort() {
+        bufferStorage.flush();
+        super.abort();
+    }
+
+    @Override
+    public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
+        inner.setStoreDelegate(delegate);
+        this.storeDelegate = delegate;
+    }
+
+    @Override
+    public long getHighWaterMarkTimestamp() {
+        return bufferStorage.latestModifiedTimestamp();
+    }
+
+    private boolean mayProceedToMergeBuffer() {
+        // If our buffer storage is not persistent, disallowing merging after buffer has been filled
+        // means throwing away records only to re-download them later.
+        // In this case allow merge to proceed even if we're past the deadline.
+        if (!bufferStorage.isPersistent()) {
+            return true;
+        }
+
+        // While actual runtime of a merge operation is a function of record type, buffer size, etc.,
+        // let's do a simple thing for now and say that we may proceed if we have couple of minutes
+        // of runtime left. That surely is enough, right?
+        final long timeLeftMillis = syncDeadlineMillis - SystemClock.elapsedRealtime();
+        return timeLeftMillis > 1000 * 60 * 2;
+    }
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java
@@ -0,0 +1,35 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.middleware.storage;
+
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import java.util.Collection;
+
+/**
+ * A contract between BufferingMiddleware and specific storage implementations.
+ *
+ * @author grisha
+ */
+public interface BufferStorage {
+    // Returns all of the records currently present in the buffer.
+    Collection<Record> all();
+
+    // Implementations are responsible to ensure that any incoming records with duplicate GUIDs replace
+    // what's already present in the storage layer.
+    // NB: For a database-backed storage, "replace" happens at a transaction level.
+    void addOrReplace(Record record);
+
+    // For database-backed implementations, commits any records that came in up to this point.
+    void flush();
+
+    void clear();
+
+    // For buffers that are filled up oldest-first this is a high water mark, which enables resuming
+    // a sync.
+    long latestModifiedTimestamp();
+
+    boolean isPersistent();
+}
new file mode 100644
--- /dev/null
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java
@@ -0,0 +1,70 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+package org.mozilla.gecko.sync.middleware.storage;
+
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A trivial, memory-backed, transient implementation of a BufferStorage.
+ * Its intended use is to buffer syncing of small collections.
+ * Thread-safe.
+ *
+ * @author grisha
+ */
+public class MemoryBufferStorage implements BufferStorage {
+    private final Map<String, Record> recordBuffer = Collections.synchronizedMap(new HashMap<String, Record>());
+
+    @Override
+    public boolean isPersistent() {
+        return false;
+    }
+
+    @Override
+    public Collection<Record> all() {
+        synchronized (recordBuffer) {
+            return new ArrayList<>(recordBuffer.values());
+        }
+    }
+
+    @Override
+    public void addOrReplace(Record record) {
+        recordBuffer.put(record.guid, record);
+    }
+
+    @Override
+    public void flush() {
+        // This is a no-op; flush intended for database-backed stores.
+    }
+
+    @Override
+    public void clear() {
+        recordBuffer.clear();
+    }
+
+    @Override
+    public long latestModifiedTimestamp() {
+        long lastModified = 0;
+
+        synchronized (recordBuffer) {
+            if (recordBuffer.size() == 0) {
+                return lastModified;
+            }
+
+            for (Record record : recordBuffer.values()) {
+                if (record.lastModified > lastModified) {
+                    lastModified = record.lastModified;
+                }
+            }
+        }
+
+        return lastModified;
+    }
+}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java
@@ -71,16 +71,21 @@ public abstract class RepositorySession 
 
   // The time that the last sync on this collection completed, in milliseconds since epoch.
   private long lastSyncTimestamp = 0;
 
   public long getLastSyncTimestamp() {
     return lastSyncTimestamp;
   }
 
+  // Override this in the buffering wrappers.
+  public long getHighWaterMarkTimestamp() {
+    return 0;
+  }
+
   public static long now() {
     return System.currentTimeMillis();
   }
 
   public RepositorySession(Repository repository) {
     this.repository = repository;
   }
 
@@ -141,16 +146,37 @@ public abstract class RepositorySession 
       @Override
       public void run() {
         delegate.onStoreCompleted(end);
       }
     };
     storeWorkQueue.execute(command);
   }
 
+  /**
+   * Indicates that a number of records have been stored, more are still to come but after some time,
+   * and now would be a good time to flush records and perform any other similar operations.
+   */
+  public void storeFlush() {
+  }
+
+  /**
+   * During flow of records, indicates that source failed.
+   *
+   * @param e indicates reason of failure.
+     */
+  public void sourceFailed(Exception e) {
+  }
+
+  /**
+   * Indicates that a flow of records have been completed.
+   */
+  public void performCleanup() {
+  }
+
   public abstract void wipe(RepositorySessionWipeDelegate delegate);
 
   /**
    * Synchronously perform the shared work of beginning. Throws on failure.
    * @throws InvalidSessionTransitionException
    *
    */
   protected void sharedBegin() throws InvalidSessionTransitionException {
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -243,16 +243,19 @@ public class RecordsChannel implements
       this.sink.storeDone(); // Now we'll be waiting for onStoreCompleted.
     }
   }
 
   @Override
   public void onStoreCompleted(long storeEnd) {
     Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " +
                           "Fetch end is " + fetchEnd + ", store end is " + storeEnd);
+    // Source might have used caches used to facilitate flow of records, so now is a good
+    // time to clean up. Particularly pertinent for buffered sources.
+    this.source.performCleanup();
     // TODO: synchronize on consumer callback?
     delegate.onFlowCompleted(this, fetchEnd, storeEnd);
   }
 
   @Override
   public void onBeginFailed(Exception ex) {
     delegate.onFlowBeginFailed(this, ex);
   }
new file mode 100644
--- /dev/null
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java
@@ -0,0 +1,234 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+package org.mozilla.gecko.sync.middleware;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mozilla.gecko.background.testhelpers.MockRecord;
+import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
+import org.mozilla.gecko.sync.repositories.Repository;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
+import org.mozilla.gecko.sync.repositories.domain.Record;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class BufferingMiddlewareRepositorySessionTest {
+    private RepositorySession innerRepositorySession;
+    private BufferingMiddlewareRepositorySession bufferingSession;
+    private BufferingMiddlewareRepositorySession bufferingSessionMocked;
+    private BufferStorage bufferStorage;
+    private BufferStorage bufferStorageMocked;
+
+    @Before
+    public void setUp() throws Exception {
+        BufferingMiddlewareRepository bufferingRepository;
+        Repository innerRepositoy;
+
+        innerRepositoy = mock(Repository.class);
+        innerRepositorySession = mock(RepositorySession.class);
+        bufferingRepository = new BufferingMiddlewareRepository(
+                0L,
+                new MemoryBufferStorage(),
+                innerRepositoy
+        );
+
+        bufferStorage = new MemoryBufferStorage();
+        bufferStorageMocked = mock(MemoryBufferStorage.class);
+
+        bufferingSession = new BufferingMiddlewareRepositorySession(
+                innerRepositorySession, bufferingRepository, 0L,
+                bufferStorage);
+
+        bufferingSessionMocked = new BufferingMiddlewareRepositorySession(
+                innerRepositorySession, bufferingRepository, 0L,
+                bufferStorageMocked);
+    }
+
+    @Test
+    public void store() throws Exception {
+        assertEquals(0, bufferStorage.all().size());
+
+        MockRecord record = new MockRecord("guid1", null, 1, false);
+        bufferingSession.store(record);
+        assertEquals(1, bufferStorage.all().size());
+
+        MockRecord record1 = new MockRecord("guid2", null, 1, false);
+        bufferingSession.store(record1);
+        assertEquals(2, bufferStorage.all().size());
+        assertEquals(1, bufferStorage.latestModifiedTimestamp());
+
+        // record2 must replace record.
+        MockRecord record2 = new MockRecord("guid1", null, 2, false);
+        bufferingSession.store(record2);
+        assertEquals(2, bufferStorage.all().size());
+        assertEquals(2, bufferStorage.latestModifiedTimestamp());
+
+        // Ensure inner session doesn't see incoming records.
+        verify(innerRepositorySession, never()).store(record);
+        verify(innerRepositorySession, never()).store(record1);
+        verify(innerRepositorySession, never()).store(record2);
+    }
+
+    @Test
+    public void storeDone() throws Exception {
+        // Verify that storage's flush is called.
+        verify(bufferStorageMocked, times(0)).flush();
+        bufferingSessionMocked.doStoreDonePrepare();
+        verify(bufferStorageMocked, times(1)).flush();
+
+        // Trivial case, no records to merge.
+        bufferingSession.doStoreDone(123L);
+        verify(innerRepositorySession, times(1)).storeDone(123L);
+        verify(innerRepositorySession, never()).store(any(Record.class));
+
+        // Reset call counters.
+        reset(innerRepositorySession);
+
+        // Now store some records.
+        MockRecord record = new MockRecord("guid1", null, 1, false);
+        bufferingSession.store(record);
+
+        MockRecord record2 = new MockRecord("guid2", null, 13, false);
+        bufferingSession.store(record2);
+
+        MockRecord record3 = new MockRecord("guid3", null, 5, false);
+        bufferingSession.store(record3);
+
+        // NB: same guid as above.
+        MockRecord record4 = new MockRecord("guid3", null, -1, false);
+        bufferingSession.store(record4);
+
+        // Done storing.
+        bufferingSession.doStoreDone(123L);
+
+        // Ensure all records where stored in the wrapped session.
+        verify(innerRepositorySession, times(1)).store(record);
+        verify(innerRepositorySession, times(1)).store(record2);
+        verify(innerRepositorySession, times(1)).store(record4);
+
+        // Ensure storeDone was called on the wrapped session.
+        verify(innerRepositorySession, times(1)).storeDone(123L);
+
+        // Ensure buffer wasn't cleared on the wrapped session.
+        assertEquals(3, bufferStorage.all().size());
+    }
+
+    @Test
+    public void storeFlush() throws Exception {
+        verify(bufferStorageMocked, times(0)).flush();
+        bufferingSessionMocked.storeFlush();
+        verify(bufferStorageMocked, times(1)).flush();
+    }
+
+    @Test
+    public void performCleanup() throws Exception {
+        // Baseline.
+        assertEquals(0, bufferStorage.all().size());
+
+        // Test that we can call cleanup with an empty buffer storage.
+        bufferingSession.performCleanup();
+        assertEquals(0, bufferStorage.all().size());
+
+        // Store a couple of records.
+        MockRecord record = new MockRecord("guid1", null, 1, false);
+        bufferingSession.store(record);
+
+        MockRecord record2 = new MockRecord("guid2", null, 13, false);
+        bufferingSession.store(record2);
+
+        // Confirm it worked.
+        assertEquals(2, bufferStorage.all().size());
+
+        // Test that buffer storage is cleaned up.
+        bufferingSession.performCleanup();
+        assertEquals(0, bufferStorage.all().size());
+    }
+
+    @Test
+    public void sourceFailed() throws Exception {
+        // Source failes before any records have been stored.
+        bufferingSession.sourceFailed(new Exception());
+        assertEquals(0, bufferStorage.all().size());
+
+        // Store some records now.
+        MockRecord record = new MockRecord("guid1", null, 1, false);
+        bufferingSession.store(record);
+
+        MockRecord record2 = new MockRecord("guid2", null, 13, false);
+        bufferingSession.store(record2);
+
+        MockRecord record3 = new MockRecord("guid3", null, 5, false);
+        bufferingSession.store(record3);
+
+        // Verify that buffer is intact after source fails.
+        bufferingSession.sourceFailed(new Exception());
+        assertEquals(3, bufferStorage.all().size());
+
+        // Verify that buffer is flushed after source fails.
+        verify(bufferStorageMocked, times(0)).flush();
+        bufferingSessionMocked.sourceFailed(new Exception());
+        verify(bufferStorageMocked, times(1)).flush();
+    }
+
+    @Test
+    public void abort() throws Exception {
+        MockRecord record = new MockRecord("guid1", null, 1, false);
+        bufferingSession.store(record);
+
+        MockRecord record2 = new MockRecord("guid2", null, 13, false);
+        bufferingSession.store(record2);
+
+        MockRecord record3 = new MockRecord("guid3", null, 5, false);
+        bufferingSession.store(record3);
+
+        // NB: same guid as above.
+        MockRecord record4 = new MockRecord("guid3", null, -1, false);
+        bufferingSession.store(record4);
+
+        bufferingSession.abort();
+
+        // Verify number of records didn't change.
+        // Abort shouldn't clear the buffer.
+        assertEquals(3, bufferStorage.all().size());
+    }
+
+    @Test
+    public void setStoreDelegate() throws Exception {
+        RepositorySessionStoreDelegate delegate = mock(RepositorySessionStoreDelegate.class);
+        bufferingSession.setStoreDelegate(delegate);
+        verify(innerRepositorySession).setStoreDelegate(delegate);
+    }
+
+    @Test
+    public void getHighWaterMarkTimestamp() throws Exception {
+        // Trivial case, empty buffer.
+        assertEquals(0, bufferingSession.getHighWaterMarkTimestamp());
+
+        MockRecord record = new MockRecord("guid1", null, 1, false);
+        bufferingSession.store(record);
+        assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
+
+        MockRecord record3 = new MockRecord("guid3", null, 5, false);
+        bufferingSession.store(record3);
+        assertEquals(5, bufferingSession.getHighWaterMarkTimestamp());
+
+        // NB: same guid as above.
+        MockRecord record4 = new MockRecord("guid3", null, -1, false);
+        bufferingSession.store(record4);
+        assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
+
+        MockRecord record2 = new MockRecord("guid2", null, 13, false);
+        bufferingSession.store(record2);
+        assertEquals(13, bufferingSession.getHighWaterMarkTimestamp());
+    }
+}
\ No newline at end of file