Bug 1291821 - Wrap local repositories in buffering middleware r=rnewman draft
authorGrisha Kruglov <gkruglov@mozilla.com>
Thu, 20 Oct 2016 16:31:31 -0700
changeset 445745 1862bd8b73b2e8fd4e170a9c086978a216801e67
parent 445744 4e3302271fb345d69650fd37225abd5397adf653
child 445746 a92e9a723ac2b859de488c35eba1bace0d77a7d6
push id37599
push usergkruglov@mozilla.com
push dateWed, 30 Nov 2016 06:33:43 +0000
reviewersrnewman
bugs1291821
milestone53.0a1
Bug 1291821 - Wrap local repositories in buffering middleware r=rnewman MozReview-Commit-ID: FS1swml2bIC
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserHistoryRepositorySession.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserHistoryRepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/android/AndroidBrowserHistoryRepositorySession.java
@@ -186,23 +186,28 @@ public class AndroidBrowserHistoryReposi
       }
       trackRecord(succeeded);
       storeDelegate.onRecordStoreSucceeded(succeeded.guid); // At this point, we are really inserted.
     }
   }
 
   @Override
   public void storeDone() {
+    storeDone(System.currentTimeMillis());
+  }
+
+  @Override
+  public void storeDone(final long end) {
     storeWorkQueue.execute(new Runnable() {
       @Override
       public void run() {
         synchronized (recordsBufferMonitor) {
           try {
             flushNewRecords();
           } catch (Exception e) {
             Logger.warn(LOG_TAG, "Error flushing records to database.", e);
           }
         }
-        storeDone(System.currentTimeMillis());
+        AndroidBrowserHistoryRepositorySession.super.storeDone(end);
       }
     });
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserBookmarksServerSyncStage.java
@@ -2,16 +2,18 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.MetaGlobalException;
+import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConstrainedServer11Repository;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.AndroidBrowserBookmarksRepository;
 import org.mozilla.gecko.sync.repositories.domain.BookmarkRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class AndroidBrowserBookmarksServerSyncStage extends ServerSyncStage {
@@ -48,17 +50,21 @@ public class AndroidBrowserBookmarksServ
             session.config.infoConfiguration,
             BOOKMARKS_BATCH_LIMIT,
             BOOKMARKS_SORT,
             true /* allow multiple batches */);
   }
 
   @Override
   protected Repository getLocalRepository() {
-    return new AndroidBrowserBookmarksRepository();
+    return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
+            new MemoryBufferStorage(),
+            new AndroidBrowserBookmarksRepository()
+    );
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new BookmarkRecordFactory();
   }
 
   @Override
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/AndroidBrowserHistoryServerSyncStage.java
@@ -2,16 +2,18 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.MetaGlobalException;
+import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConstrainedServer11Repository;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.AndroidBrowserHistoryRepository;
 import org.mozilla.gecko.sync.repositories.domain.HistoryRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class AndroidBrowserHistoryServerSyncStage extends ServerSyncStage {
@@ -34,17 +36,21 @@ public class AndroidBrowserHistoryServer
 
   @Override
   public Integer getStorageVersion() {
     return VersionConstants.HISTORY_ENGINE_VERSION;
   }
 
   @Override
   protected Repository getLocalRepository() {
-    return new AndroidBrowserHistoryRepository();
+    return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
+            new MemoryBufferStorage(),
+            new AndroidBrowserHistoryRepository()
+    );
   }
 
   @Override
   protected Repository getRemoteRepository() throws URISyntaxException {
     return new ConstrainedServer11Repository(
             getCollection(),
             session.getSyncDeadline(),
             session.config.storageURL(),
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FennecTabsServerSyncStage.java
@@ -1,14 +1,16 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
+import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.FennecTabsRepository;
 import org.mozilla.gecko.sync.repositories.domain.TabsRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class FennecTabsServerSyncStage extends ServerSyncStage {
   private static final String COLLECTION = "tabs";
@@ -25,16 +27,20 @@ public class FennecTabsServerSyncStage e
 
   @Override
   public Integer getStorageVersion() {
     return VersionConstants.TABS_ENGINE_VERSION;
   }
 
   @Override
   protected Repository getLocalRepository() {
-    return new FennecTabsRepository(session.getClientsDelegate());
+    return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
+            new MemoryBufferStorage(),
+            new FennecTabsRepository(session.getClientsDelegate())
+    );
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new TabsRecordFactory();
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/FormHistoryServerSyncStage.java
@@ -2,16 +2,18 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
 import java.net.URISyntaxException;
 
 import org.mozilla.gecko.sync.CryptoRecord;
+import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.ConstrainedServer11Repository;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.FormHistoryRepositorySession;
 import org.mozilla.gecko.sync.repositories.domain.FormHistoryRecord;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
@@ -49,17 +51,21 @@ public class FormHistoryServerSyncStage 
             session.config.infoConfiguration,
             FORM_HISTORY_BATCH_LIMIT,
             FORM_HISTORY_SORT,
             true /* allow multiple batches */);
   }
 
   @Override
   protected Repository getLocalRepository() {
-    return new FormHistoryRepositorySession.FormHistoryRepository();
+    return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
+            new MemoryBufferStorage(),
+            new FormHistoryRepositorySession.FormHistoryRepository()
+    );
   }
 
   public class FormHistoryRecordFactory extends RecordFactory {
 
     @Override
     public Record createRecord(Record record) {
       FormHistoryRecord r = new FormHistoryRecord();
       r.initFromEnvelope((CryptoRecord) record);
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/PasswordsServerSyncStage.java
@@ -1,14 +1,16 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 package org.mozilla.gecko.sync.stage;
 
+import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
+import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
 import org.mozilla.gecko.sync.repositories.RecordFactory;
 import org.mozilla.gecko.sync.repositories.Repository;
 import org.mozilla.gecko.sync.repositories.android.PasswordsRepositorySession;
 import org.mozilla.gecko.sync.repositories.domain.PasswordRecordFactory;
 import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
 
 public class PasswordsServerSyncStage extends ServerSyncStage {
   @Override
@@ -23,16 +25,20 @@ public class PasswordsServerSyncStage ex
 
   @Override
   public Integer getStorageVersion() {
     return VersionConstants.PASSWORDS_ENGINE_VERSION;
   }
 
   @Override
   protected Repository getLocalRepository() {
-    return new PasswordsRepositorySession.PasswordsRepository();
+    return new BufferingMiddlewareRepository(
+            session.getSyncDeadline(),
+            new MemoryBufferStorage(),
+            new PasswordsRepositorySession.PasswordsRepository()
+    );
   }
 
   @Override
   protected RecordFactory getRecordFactory() {
     return new PasswordRecordFactory();
   }
 }
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java
@@ -165,17 +165,26 @@ public class RecordsChannel implements
     numFetched.set(0);
     numFetchFailed.set(0);
     numStored.set(0);
     numStoreFailed.set(0);
     // Start a consumer thread.
     this.consumer = new ConcurrentRecordConsumer(this);
     ThreadPool.run(this.consumer);
     waitingForQueueDone = true;
-    source.fetchSince(source.getLastSyncTimestamp(), this);
+
+    // Fetch all records that were modified since our previous flow. If our previous flow succeeded,
+    // we will use source's last-sync timestamp. If our previous flow didn't complete, resume it,
+    // starting from sink's high water mark timestamp.
+    // If there was no previous flow (first sync, or data was cleared...), fetch everything.
+    // Resuming a flow is supported for buffered RepositorySessions. We degrade gracefully otherwise.
+    final long highWaterMark = sink.getHighWaterMarkTimestamp();
+    final long lastSync = source.getLastSyncTimestamp();
+    final long sinceTimestamp = Math.max(highWaterMark, lastSync);
+    source.fetchSince(sinceTimestamp, this);
   }
 
   /**
    * Begin both sessions, invoking flow() when done.
    * @throws InvalidSessionTransitionException
    */
   public void beginAndFlow() throws InvalidSessionTransitionException {
     Logger.trace(LOG_TAG, "Beginning source.");