Bug 1362206 - Have android abort bookmark syncs if a record is too large to upload to the server r?Grisha draft
authorThom Chiovoloni <tchiovoloni@mozilla.com>
Wed, 06 Sep 2017 17:48:18 -0400
changeset 663393 048fae0a0ed2c75aceff893b65216353379bdbea
parent 663392 9e7cdda04cbfc5d4dae19e071498d3351923196e
child 731193 aac720be425ec690220f9141c810585987325ecd
push id79424
push userbmo:tchiovoloni@mozilla.com
push dateTue, 12 Sep 2017 23:17:54 +0000
reviewersGrisha
bugs1362206
milestone57.0a1
Bug 1362206 - Have android abort bookmark syncs if a record is too large to upload to the server r?Grisha MozReview-Commit-ID: JBggAu9Ajbu
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
@@ -10,16 +10,17 @@ import android.support.annotation.Visibl
 import org.json.simple.JSONObject;
 import org.mozilla.gecko.background.common.log.Logger;
 import org.mozilla.gecko.sync.CryptoRecord;
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.Server15PreviousPostFailedException;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
+import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
 import org.mozilla.gecko.sync.repositories.domain.Record;
 
 import java.util.ArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Uploader which implements batching introduced in Sync 1.5.
@@ -130,58 +131,48 @@ public class BatchingUploader {
         // If store failed entirely, just bail out. We've already told our delegate that we failed.
         if (payloadDispatcher.storeFailed) {
             return;
         }
 
         // If a record or a payload failed, we won't let subsequent requests proceed.
         // This means that we may bail much earlier.
         if (payloadDispatcher.recordUploadFailed) {
-            sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(
-                    new Server15PreviousPostFailedException(), guid
-            );
+            failRecordStore(new Server15PreviousPostFailedException(), record);
             return;
         }
 
         final JSONObject recordJSON = record.toJSONObject();
 
         final String payloadField = (String) recordJSON.get(CryptoRecord.KEY_PAYLOAD);
         if (payloadField == null) {
-            sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(
-                    new IllegalRecordException(), guid
-            );
+            failRecordStore(new IllegalRecordException(), record);
             return;
         }
 
         // We can't upload individual records whose payload fields exceed our payload field byte limit.
         // UTF-8 uses 1 byte per character for the ASCII range. Contents of the payloadField are
         // base64 and hex encoded, so character count is sufficient.
         if (payloadField.length() > this.maxPayloadFieldBytes) {
-            sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(
-                    new PayloadTooLargeToUpload(), guid
-            );
+            failRecordStore(new PayloadTooLargeToUpload(), record);
             return;
         }
 
         final byte[] recordBytes = Record.stringToJSONBytes(recordJSON.toJSONString());
         if (recordBytes == null) {
-            sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(
-                    new IllegalRecordException(), guid
-            );
+            failRecordStore(new IllegalRecordException(), record);
             return;
         }
 
         final long recordDeltaByteCount = recordBytes.length + PER_RECORD_OVERHEAD_BYTE_COUNT;
         Logger.debug(LOG_TAG, "Processing a record with guid: " + guid);
 
         // We can't upload individual records which exceed our payload total byte limit.
         if ((recordDeltaByteCount + PER_PAYLOAD_OVERHEAD_BYTE_COUNT) > payload.maxBytes) {
-            sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(
-                    new RecordTooLargeToUpload(), guid
-            );
+            failRecordStore(new RecordTooLargeToUpload(), record);
             return;
         }
 
         synchronized (payloadLock) {
             final boolean canFitRecordIntoBatch = uploaderMeta.canFit(recordDeltaByteCount);
             final boolean canFitRecordIntoPayload = payload.canFit(recordDeltaByteCount);
 
             // Record fits!
@@ -241,24 +232,38 @@ public class BatchingUploader {
         payloadDispatcher.finalizeQueue(uploaderMeta.needToCommit(), new Runnable() {
             @Override
             public void run() {
                 flush(true, true);
             }
         });
     }
 
+    @VisibleForTesting
+    /* package-local */ boolean shouldFailBatchOnFailure(Record record) {
+        return record instanceof BookmarkRecord;
+    }
+
     /* package-local */ void setLastStoreTimestamp(AtomicLong lastModifiedTimestamp) {
         repositorySession.setLastStoreTimestamp(lastModifiedTimestamp.get());
     }
 
     /* package-local */ void finished() {
         sessionStoreDelegate.deferredStoreDelegate(executor).onStoreCompleted();
     }
 
+    // Common handling for marking a record failure and calling our delegate's onRecordStoreFailed.
+    private void failRecordStore(Exception e, Record record) {
+        if (shouldFailBatchOnFailure(record)) {
+            payloadDispatcher.batchFailed(e);
+        } else {
+            sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(e, record.guid);
+        }
+    }
+
     // Will be called from a thread dispatched by PayloadDispatcher.
     // NB: Access to `uploaderMeta.isUnlimited` is guarded by the payloadLock.
     /* package-local */ void setUnlimitedMode(boolean isUnlimited) {
         // If we know for sure that we're not in a batching mode,
         // consider our batch to be of unlimited size.
         this.uploaderMeta.setIsUnlimited(isUnlimited);
     }
 
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
@@ -135,16 +135,22 @@ class PayloadDispatcher {
     }
 
     void concurrentModificationDetected() {
         recordUploadFailed = true;
         storeFailed = true;
         uploader.sessionStoreDelegate.onStoreFailed(new CollectionConcurrentModificationException());
     }
 
+    void batchFailed(final Exception e) {
+        Logger.debug(LOG_TAG, "Batch failed with exception: " + e.toString());
+        storeFailed = true;
+        uploader.sessionStoreDelegate.onStoreFailed(e);
+    }
+
     void prepareForNextBatch() {
         batchWhiteboard = batchWhiteboard.nextBatchMeta();
     }
 
     private static void bumpTimestampTo(final AtomicLong current, long newValue) {
         while (true) {
             long existing = current.get();
             if (existing > newValue) {
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -20,16 +20,18 @@ import org.mozilla.gecko.sync.InfoCollec
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
+import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
+import org.mozilla.gecko.sync.repositories.domain.Record;
 
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -385,16 +387,17 @@ public class BatchingUploaderTest {
         // At this point our byte count for the batch is at 3600+overhead;
         // since we have just 496 bytes left in the batch, it's unlikely we'll fit another record.
         // Expect a batch commit
         uploader.process(record);
         assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
         assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
     }
 
+
     @Test
     public void testRandomPayloadSizesBatching() {
         BatchingUploader uploader = makeConstrainedUploader(2, 4);
 
         final Random random = new Random();
         for (int i = 0; i < 15000; i++) {
             uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, random.nextInt(15000)));
         }
@@ -446,19 +449,41 @@ public class BatchingUploaderTest {
         assertTrue(((MockStoreDelegate) storeDelegate).lastRecordStoreFailedException instanceof BatchingUploader.PayloadTooLargeToUpload);
 
         uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 1000));
         assertEquals(2, ((MockStoreDelegate) storeDelegate).storeFailed);
         assertTrue(((MockStoreDelegate) storeDelegate).lastRecordStoreFailedException instanceof BatchingUploader.PayloadTooLargeToUpload);
 
         uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 9));
         assertEquals(2, ((MockStoreDelegate) storeDelegate).storeFailed);
+
     }
 
     @Test
+    public void testFailAbortsBatch() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 4, 10, false, true);
+        uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 10));
+        uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 10));
+        assertEquals(0, ((MockStoreDelegate) storeDelegate).storeFailed);
+        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(((MockStoreDelegate) storeDelegate).lastStoreFailedException, null);
+
+        uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 11));
+        assertTrue(((MockStoreDelegate) storeDelegate).lastStoreFailedException instanceof BatchingUploader.PayloadTooLargeToUpload);
+
+        // Ensure that further calls to process do nothing.
+        for (int i = 0; i < 5; ++i) {
+            uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 1));
+        }
+
+        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+    }
+
+
+    @Test
     public void testNoMoreRecordsAfterPayloadPost() {
         BatchingUploader uploader = makeConstrainedUploader(2, 4);
 
         // Process two records (payload limit is also two, batch is four),
         // and ensure that 'no more records' commits.
         MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
         uploader.process(record);
         uploader.process(record);
@@ -544,61 +569,73 @@ public class BatchingUploaderTest {
         return makeConstrainedUploader(maxPostRecords, maxTotalRecords, false);
     }
 
     private BatchingUploader makeConstrainedUploader(long maxPostRecords, long maxTotalRecords, boolean firstSync) {
         return makeConstrainedUploader(maxPostRecords, maxTotalRecords, 1000L, firstSync);
     }
 
     private BatchingUploader makeConstrainedUploader(long maxPostRecords, long maxTotalRecords, long maxPayloadBytes, boolean firstSync) {
+        return makeConstrainedUploader(maxPostRecords, maxTotalRecords, maxPayloadBytes, firstSync, false);
+    }
+
+    private BatchingUploader makeConstrainedUploader(long maxPostRecords, long maxTotalRecords, long maxPayloadBytes, boolean firstSync, boolean abortOnRecordFail) {
         ExtendedJSONObject infoConfigurationJSON = new ExtendedJSONObject();
         infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_BYTES, 4096L);
         infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_RECORDS, maxTotalRecords);
         infoConfigurationJSON.put(InfoConfiguration.MAX_POST_RECORDS, maxPostRecords);
         infoConfigurationJSON.put(InfoConfiguration.MAX_POST_BYTES, 1024L);
         infoConfigurationJSON.put(InfoConfiguration.MAX_REQUEST_BYTES, 1024L);
         infoConfigurationJSON.put(InfoConfiguration.MAX_PAYLOAD_BYTES, maxPayloadBytes);
 
         Server15RepositorySession server15RepositorySession = new Server15RepositorySession(
                 makeConstrainedRepository(firstSync)
         );
         server15RepositorySession.setStoreDelegate(storeDelegate);
         return new MockUploader(
                 server15RepositorySession, workQueue, storeDelegate, Uri.EMPTY, 0L,
-                new InfoConfiguration(infoConfigurationJSON), null);
+                new InfoConfiguration(infoConfigurationJSON), null, abortOnRecordFail);
     }
 
+
     class MockPayloadDispatcher extends PayloadDispatcher {
         MockPayloadDispatcher(final Executor workQueue, final BatchingUploader uploader, Long lastModified) {
             super(workQueue, uploader, lastModified);
         }
 
         @Override
         Runnable createRecordUploadRunnable(ArrayList<byte[]> outgoing, ArrayList<String> outgoingGuids, long byteCount, boolean isCommit, boolean isLastPayload) {
             // No-op runnable. We don't want this to actually do any work for these tests.
             return new Runnable() {
                 @Override
                 public void run() {}
             };
         }
     }
 
     class MockUploader extends BatchingUploader {
+        boolean abortOnRecordFail;
         MockUploader(final RepositorySession repositorySession, final ExecutorService workQueue,
                      final RepositorySessionStoreDelegate sessionStoreDelegate, final Uri baseCollectionUri,
                      final Long localCollectionLastModified, final InfoConfiguration infoConfiguration,
-                     final AuthHeaderProvider authHeaderProvider) {
+                     final AuthHeaderProvider authHeaderProvider, final boolean abortOnRecordFail) {
             super(repositorySession, workQueue, sessionStoreDelegate, baseCollectionUri,
                     localCollectionLastModified, infoConfiguration, authHeaderProvider);
+            this.abortOnRecordFail = abortOnRecordFail;
         }
 
         @Override
         PayloadDispatcher createPayloadDispatcher(ExecutorService workQueue, Long localCollectionLastModified) {
             return new MockPayloadDispatcher(workQueue, this, localCollectionLastModified);
         }
+
+        @Override
+        boolean shouldFailBatchOnFailure(Record r) {
+            return abortOnRecordFail;
+        }
     }
 
     private Server15Repository makeConstrainedRepository(boolean firstSync) {
         InfoCollections infoCollections;
         if (firstSync) {
             infoCollections = new InfoCollections() {
                 @Override
                 public Long getTimestamp(String collection) {