Bug 1351673 - Use a single-threaded work queue to process batching downloader work items r=rnewman
☠☠ backed out by 8eba074485cc ☠ ☠
authorGrigory Kruglov <gkruglov@mozilla.com>
Wed, 20 Sep 2017 22:21:02 -0400
changeset 431644 9767e159a7018465824b7f6e4d504875cfa5cc6b
parent 431643 4a59e79b7e947915bae60d7c37bffebf99ff3ef8
child 431645 7c07cb7985302e288a56c29c3fac22cacc4096d2
push id7785
push userryanvm@gmail.com
push dateThu, 21 Sep 2017 13:39:55 +0000
treeherdermozilla-beta@06d4034a8a03 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersrnewman
bugs1351673
milestone57.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1351673 - Use a single-threaded work queue to process batching downloader work items r=rnewman Before we'd recurse instead while fetching multiple batches, overflowing the stack on older devices. MozReview-Commit-ID: 37BG6zGBdn0
mobile/android/base/android-services.mozbuild
mobile/android/services/src/main/java/org/mozilla/gecko/sync/DelayedWorkTracker.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
--- a/mobile/android/base/android-services.mozbuild
+++ b/mobile/android/base/android-services.mozbuild
@@ -878,17 +878,16 @@ sync_java_files = [TOPSRCDIR + '/mobile/
     'sync/crypto/HMACVerificationException.java',
     'sync/crypto/KeyBundle.java',
     'sync/crypto/MissingCryptoInputException.java',
     'sync/crypto/NoKeyBundleException.java',
     'sync/crypto/PBKDF2.java',
     'sync/crypto/PersistedCrypto5Keys.java',
     'sync/CryptoKeysChangedException.java',
     'sync/CryptoRecord.java',
-    'sync/DelayedWorkTracker.java',
     'sync/delegates/ClientsDataDelegate.java',
     'sync/delegates/FreshStartDelegate.java',
     'sync/delegates/GlobalSessionCallback.java',
     'sync/delegates/JSONRecordFetchDelegate.java',
     'sync/delegates/KeyUploadDelegate.java',
     'sync/delegates/MetaGlobalDelegate.java',
     'sync/delegates/WipeServerDelegate.java',
     'sync/EngineSettings.java',
deleted file mode 100644
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/DelayedWorkTracker.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-package org.mozilla.gecko.sync;
-
-import org.mozilla.gecko.background.common.log.Logger;
-
-/**
- * A little class to allow us to maintain a count of extant
- * things (in our case, callbacks that need to fire), and
- * some work that we want done when that count hits 0.
- *
- * @author rnewman
- *
- */
-public class DelayedWorkTracker {
-  private static final String LOG_TAG = "DelayedWorkTracker";
-  protected Runnable workItem = null;
-  protected int outstandingCount = 0;
-
-  public int incrementOutstanding() {
-    Logger.trace(LOG_TAG, "Incrementing outstanding.");
-    synchronized(this) {
-      return ++outstandingCount;
-    }
-  }
-  public int decrementOutstanding() {
-    Logger.trace(LOG_TAG, "Decrementing outstanding.");
-    Runnable job = null;
-    int count;
-    synchronized(this) {
-      if ((count = --outstandingCount) == 0 &&
-          workItem != null) {
-        job = workItem;
-        workItem = null;
-      } else {
-        return count;
-      }
-    }
-    job.run();
-    // In case it's changed.
-    return getOutstandingOperations();
-  }
-  public int getOutstandingOperations() {
-    synchronized(this) {
-      return outstandingCount;
-    }
-  }
-  public void delayWorkItem(Runnable item) {
-    Logger.trace(LOG_TAG, "delayWorkItem.");
-    boolean runnableNow = false;
-    synchronized(this) {
-      Logger.trace(LOG_TAG, "outstandingCount: " + outstandingCount);
-      if (outstandingCount == 0) {
-        runnableNow = true;
-      } else {
-        if (workItem != null) {
-          throw new IllegalStateException("Work item already set!");
-        }
-        workItem = item;
-      }
-    }
-    if (runnableNow) {
-      Logger.trace(LOG_TAG, "Running item now.");
-      item.run();
-    }
-  }
-}
\ No newline at end of file
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java
@@ -7,33 +7,34 @@ package org.mozilla.gecko.sync.repositor
 import android.net.Uri;
 import android.os.SystemClock;
 import android.support.annotation.Nullable;
 import android.support.annotation.VisibleForTesting;
 
 import org.mozilla.gecko.background.common.log.Logger;
 import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
 import org.mozilla.gecko.sync.CryptoRecord;
-import org.mozilla.gecko.sync.DelayedWorkTracker;
 import org.mozilla.gecko.sync.SyncDeadlineReachedException;
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.net.SyncResponse;
 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
 import org.mozilla.gecko.sync.net.SyncStorageResponse;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Batching Downloader implements batching protocol as supported by Sync 1.5.
  *
  * Downloader's batching behaviour is configured via two parameters, obtained from the repository:
  * - Per-batch limit, which specified how many records may be fetched in an individual GET request.
  * - allowMultipleBatches, which determines if downloader is allowed to perform more than one fetch.
@@ -53,31 +54,32 @@ import java.util.concurrent.TimeUnit;
  * header. Server will ensure that our collection did not change while we are batching, if it did it will
  * fail our fetch with a 412 error. Additionally, we perform the same checks locally.
  */
 public class BatchingDownloader {
     public static final String LOG_TAG = "BatchingDownloader";
     private static final String DEFAULT_SORT_ORDER = "index";
 
     private final RepositorySession repositorySession;
-    private final DelayedWorkTracker workTracker = new DelayedWorkTracker();
     private final Uri baseCollectionUri;
     private final long fetchDeadline;
     private final boolean allowMultipleBatches;
     private final boolean keepTrackOfHighWaterMark;
 
     private RepositoryStateProvider stateProvider;
 
     /* package-local */ final AuthHeaderProvider authHeaderProvider;
 
     // Used to track outstanding requests, so that we can abort them as needed.
     @VisibleForTesting
     protected final Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
     /* @GuardedBy("this") */ private String lastModified;
 
+    private final ExecutorService taskQueue = Executors.newSingleThreadExecutor();
+
     public BatchingDownloader(
             AuthHeaderProvider authHeaderProvider,
             Uri baseCollectionUri,
             long fetchDeadline,
             boolean allowMultipleBatches,
             boolean keepTrackOfHighWaterMark,
             RepositoryStateProvider stateProvider,
             RepositorySession repositorySession) {
@@ -86,17 +88,17 @@ public class BatchingDownloader {
         this.baseCollectionUri = baseCollectionUri;
         this.allowMultipleBatches = allowMultipleBatches;
         this.keepTrackOfHighWaterMark = keepTrackOfHighWaterMark;
         this.fetchDeadline = fetchDeadline;
         this.stateProvider = stateProvider;
     }
 
     @VisibleForTesting
-    protected static String flattenIDs(String[] guids) {
+    /* package-private */ static String flattenIDs(String[] guids) {
         // Consider using Utils.toDelimitedString if and when the signature changes
         // to Collection<String> guids.
         if (guids.length == 0) {
             return "";
         }
         if (guids.length == 1) {
             return guids[0];
         }
@@ -105,28 +107,33 @@ public class BatchingDownloader {
         for (String guid : guids) {
             b.append(guid);
             b.append(",");
         }
         return b.substring(0, b.length() - 1);
     }
 
     @VisibleForTesting
-    protected void fetchWithParameters(long newer,
-                                    long batchLimit,
-                                    boolean full,
-                                    String sort,
-                                    String ids,
-                                    SyncStorageCollectionRequest request,
-                                    RepositorySessionFetchRecordsDelegate fetchRecordsDelegate)
+    protected void fetchWithParameters(final long newer,
+                                    final long batchLimit,
+                                    final boolean full,
+                                    final String sort,
+                                    final String ids,
+                                    final SyncStorageCollectionRequest request,
+                                    final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate)
             throws URISyntaxException, UnsupportedEncodingException {
-        request.delegate = new BatchingDownloaderDelegate(this, fetchRecordsDelegate, request,
-                newer, batchLimit, full, sort, ids);
-        this.pending.add(request);
-        request.get();
+        taskQueue.execute(new Runnable() {
+            @Override
+            public void run() {
+                request.delegate = new BatchingDownloaderDelegate(BatchingDownloader.this, fetchRecordsDelegate, request,
+                        newer, batchLimit, full, sort, ids);
+                pending.add(request);
+                request.get();
+            }
+        });
     }
 
     @VisibleForTesting
     protected SyncStorageCollectionRequest makeSyncStorageCollectionRequest(long newer,
                                                   long batchLimit,
                                                   boolean full,
                                                   String sort,
                                                   String ids,
@@ -210,20 +217,20 @@ public class BatchingDownloader {
             // sync we'll erroneously try to resume downloading. If resume proceeds, we will fetch
             // from an older timestamp, but offset by the amount of records we've fetched prior.
             // Since we're diligent about setting a X-I-U-S header, any remote collection changes
             // will be caught and we'll receive a 412.
             if (!BatchingDownloaderController.resetResumeContextAndCommit(this.stateProvider)) {
                 Logger.warn(LOG_TAG, "Failed to reset resume context while completing a batch");
             }
 
-            this.workTracker.delayWorkItem(new Runnable() {
+            taskQueue.execute(new Runnable() {
                 @Override
                 public void run() {
-                    Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
+                    Logger.debug(LOG_TAG, "onFetchCompleted running.");
                     fetchRecordsDelegate.onFetchCompleted();
                 }
             });
             return;
         }
 
         // This is unfortunate, but largely just means that in case we need to resume later on, it
         // either won't be possible (and we'll fetch w/o resuming), or won't be as efficient (i.e.
@@ -235,19 +242,20 @@ public class BatchingDownloader {
         } else {
             if (!BatchingDownloaderController.setInitialResumeContextAndCommit(this.stateProvider, offset, newer, sort)) {
                 Logger.warn(LOG_TAG, "Failed to set initial resume context while processing a batch.");
             }
         }
 
         // We need to make another batching request!
         // Let the delegate know that a batch fetch just completed before we proceed.
-        // This operation needs to run after every call to onFetchedRecord for this batch has been
-        // processed, hence the delayWorkItem call.
-        this.workTracker.delayWorkItem(new Runnable() {
+        // Beware that while this operation will run after every call to onFetchedRecord returned,
+        // it's not guaranteed that the 'sink' session actually processed all of the fetched records.
+        // See Bug https://bugzilla.mozilla.org/show_bug.cgi?id=1351673#c28 for details.
+        taskQueue.execute(new Runnable() {
             @Override
             public void run() {
                 Logger.debug(LOG_TAG, "Running onBatchCompleted.");
                 fetchRecordsDelegate.onBatchCompleted();
             }
         });
 
         // Should we proceed, however? Do we have enough time?
@@ -260,20 +268,20 @@ public class BatchingDownloader {
         try {
             final SyncStorageCollectionRequest newRequest = makeSyncStorageCollectionRequest(newer,
                     limit, full, sort, ids, offset);
             this.fetchWithParameters(newer, limit, full, sort, ids, newRequest, fetchRecordsDelegate);
         } catch (final URISyntaxException | UnsupportedEncodingException e) {
             if (!this.stateProvider.commit()) {
                 Logger.warn(LOG_TAG, "Failed to commit repository state while handling request creation error");
             }
-            this.workTracker.delayWorkItem(new Runnable() {
+            taskQueue.execute(new Runnable() {
                 @Override
                 public void run() {
-                    Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
+                    Logger.debug(LOG_TAG, "onFetchCompleted running.");
                     fetchRecordsDelegate.onFetchFailed(e);
                 }
             });
         }
     }
 
     private void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
                                   final Exception ex) {
@@ -299,40 +307,36 @@ public class BatchingDownloader {
         } else {
             // Failing to commit the context here means that we didn't commit the latest high-water-mark,
             // and won't be as efficient once we re-sync. That is, we might download more records than necessary.
             if (!this.stateProvider.commit()) {
                 Logger.warn(LOG_TAG, "Failed to commit resume context while processing a deadline exception");
             }
         }
 
-        this.workTracker.delayWorkItem(new Runnable() {
+        taskQueue.execute(new Runnable() {
             @Override
             public void run() {
                 Logger.debug(LOG_TAG, "Running onFetchFailed.");
                 fetchRecordsDelegate.onFetchFailed(ex);
             }
         });
     }
 
     public void onFetchedRecord(CryptoRecord record,
                                 RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
-        this.workTracker.incrementOutstanding();
-
         try {
             fetchRecordsDelegate.onFetchedRecord(record);
             // NB: changes to stateProvider are committed in either onFetchCompleted or handleFetchFailed.
             if (this.keepTrackOfHighWaterMark) {
                 this.stateProvider.setLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK, record.lastModified);
             }
         } catch (Exception ex) {
             Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
             throw new RuntimeException(ex);
-        } finally {
-            this.workTracker.decrementOutstanding();
         }
     }
 
     private void removeRequestFromPending(SyncStorageCollectionRequest request) {
         if (request == null) {
             return;
         }
         this.pending.remove(request);
@@ -357,17 +361,17 @@ public class BatchingDownloader {
     private static boolean mayProceedWithBatching(long deadline) {
         // For simplicity, allow batching to proceed if there's at least a minute left for the sync.
         // This should be enough to fetch and process records in the batch.
         final long timeLeft = deadline - SystemClock.elapsedRealtime();
         return timeLeft > TimeUnit.MINUTES.toMillis(1);
     }
 
     @VisibleForTesting
-    public static URI buildCollectionURI(Uri baseCollectionUri, boolean full, long newer, long limit, String sort, String ids, String offset) throws URISyntaxException {
+    /* package-private */ static URI buildCollectionURI(Uri baseCollectionUri, boolean full, long newer, long limit, String sort, String ids, String offset) throws URISyntaxException {
         Uri.Builder uriBuilder = baseCollectionUri.buildUpon();
 
         if (full) {
             uriBuilder.appendQueryParameter("full", "1");
         }
 
         if (newer >= 0) {
             // Translate local millisecond timestamps into server decimal seconds.