Bug 1291821 - Wrap local repositories in buffering middleware r=rnewman
authorGrisha Kruglov <gkruglov@mozilla.com>
Thu, 20 Oct 2016 16:31:31 -0700
changeset 344868 d61efb5192df344972d4c5c0f21d4d1c04459441
parent 344867 3a43ecf3c62573024cd9b9d45d2edfc9c9e8d319
child 344869 7e35c924287af2aa4408252d7c83e2e1e431b12f
push id37970
push usergkruglov@mozilla.com
push dateSat, 25 Feb 2017 01:09:28 +0000
treeherderautoland@bd232d46a396 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersrnewman
bugs1291821
milestone54.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1291821 - Wrap local repositories in buffering middleware r=rnewman History stage does not wrap history respository in a buffer, because we'd like to use a high-water-mark and offset resuming later on, and using a persistent buffer for this stage does not make sense. MozReview-Commit-ID: FS1swml2bIC
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserHistoryRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserHistoryRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserHistoryRepositorySession.java
@@ -186,23 +186,28 @@ public class AndroidBrowserHistoryReposi
       }
       trackRecord(succeeded);
       storeDelegate.onRecordStoreSucceeded(succeeded.guid); // At this point, we are really inserted.
     }
   }
 
   @Override
   public void storeDone() {
+    storeDone(System.currentTimeMillis());
+  }
+
+  @Override
+  public void storeDone(final long end) {
     storeWorkQueue.execute(new Runnable() {
       @Override
       public void run() {
         synchronized (recordsBufferMonitor) {
           try {
             flushNewRecords();
           } catch (Exception e) {
             Logger.warn(LOG_TAG, "Error flushing records to database.", e);
           }
         }
-        storeDone(System.currentTimeMillis());
+        AndroidBrowserHistoryRepositorySession.super.storeDone(end);
       }
     });
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
@@ -2,16 +2,18 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.MetaGlobalException;
+import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConstrainedServer11Repository;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.AndroidBrowserBookmarksRepository;
 import org.mozilla.gecko.sync.repositories.domain.BookmarkRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class AndroidBrowserBookmarksServerSyncStage extends ServerSyncStage {
@@ -48,17 +50,21 @@ public class AndroidBrowserBookmarksServ
             session.config.infoConfiguration,
             BOOKMARKS_BATCH_LIMIT,
             BOOKMARKS_SORT,
             true /* allow multiple batches */);
   }
 
   @Override
   protected Repository getLocalRepository() {
-    return new AndroidBrowserBookmarksRepository();
+    return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
+            new MemoryBufferStorage(),
+            new AndroidBrowserBookmarksRepository()
+    );
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new BookmarkRecordFactory();
   }
 
   @Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
@@ -1,14 +1,16 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
+import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.FennecTabsRepository;
 import org.mozilla.gecko.sync.repositories.domain.TabsRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class FennecTabsServerSyncStage extends ServerSyncStage {
   private static final String COLLECTION = "tabs";
@@ -25,16 +27,20 @@ public class FennecTabsServerSyncStage e
 
   @Override
   public Integer getStorageVersion() {
     return VersionConstants.TABS_ENGINE_VERSION;
   }
 
   @Override
   protected Repository getLocalRepository() {
-    return new FennecTabsRepository(session.getClientsDelegate());
+    return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
+            new MemoryBufferStorage(),
+            new FennecTabsRepository(session.getClientsDelegate())
+    );
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new TabsRecordFactory();
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
@@ -2,16 +2,18 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.CryptoRecord;
+import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConstrainedServer11Repository;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.FormHistoryRepositorySession;
 import org.mozilla.gecko.sync.repositories.domain.FormHistoryRecord;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
@@ -49,17 +51,21 @@ public class FormHistoryServerSyncStage 
             session.config.infoConfiguration,
             FORM_HISTORY_BATCH_LIMIT,
             FORM_HISTORY_SORT,
             true /* allow multiple batches */);
   }
 
   @Override
   protected Repository getLocalRepository() {
-    return new FormHistoryRepositorySession.FormHistoryRepository();
+    return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
+            new MemoryBufferStorage(),
+            new FormHistoryRepositorySession.FormHistoryRepository()
+    );
   }
 
   public class FormHistoryRecordFactory extends RecordFactory {
 
     @Override
     public Record createRecord(Record record) {
       FormHistoryRecord r = new FormHistoryRecord();
       r.initFromEnvelope((CryptoRecord) record);
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
@@ -1,14 +1,16 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
+import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.PasswordsRepositorySession;
 import org.mozilla.gecko.sync.repositories.domain.PasswordRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class PasswordsServerSyncStage extends ServerSyncStage {
   @Override
@@ -23,16 +25,20 @@ public class PasswordsServerSyncStage ex
 
   @Override
   public Integer getStorageVersion() {
     return VersionConstants.PASSWORDS_ENGINE_VERSION;
   }
 
   @Override
   protected Repository getLocalRepository() {
-    return new PasswordsRepositorySession.PasswordsRepository();
+    return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
+            new MemoryBufferStorage(),
+            new PasswordsRepositorySession.PasswordsRepository()
+    );
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new PasswordRecordFactory();
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -165,17 +165,26 @@ public class RecordsChannel implements
     numFetched.set(0);
     numFetchFailed.set(0);
     numStored.set(0);
     numStoreFailed.set(0);
     // Start a consumer thread.
     this.consumer = new ConcurrentRecordConsumer(this);
     ThreadPool.run(this.consumer);
     waitingForQueueDone = true;
-    source.fetchSince(source.getLastSyncTimestamp(), this);
+
+    // Fetch all records that were modified since our previous flow. If our previous flow succeeded,
+    // we will use source's last-sync timestamp. If our previous flow didn't complete, resume it,
+    // starting from sink's high water mark timestamp.
+    // If there was no previous flow (first sync, or data was cleared...), fetch everything.
+    // Resuming a flow is supported for buffered RepositorySessions. We degrade gracefully otherwise.
+    final long highWaterMark = sink.getHighWaterMarkTimestamp();
+    final long lastSync = source.getLastSyncTimestamp();
+    final long sinceTimestamp = Math.max(highWaterMark, lastSync);
+    source.fetchSince(sinceTimestamp, this);
   }
 
   /**
    * Begin both sessions, invoking flow() when done.
    * @throws InvalidSessionTransitionException
    */
   public void beginAndFlow() throws InvalidSessionTransitionException {
     Logger.trace(LOG_TAG, "Beginning source.");