Bug 1362206 - Have android abort bookmark syncs if a record is too large to upload to the server r=Grisha
authorThom Chiovoloni <tchiovoloni@mozilla.com>
Wed, 06 Sep 2017 17:48:18 -0400
changeset 385952 2ad9ea63f3cc4413097e3fa2ab978b25836954f1
parent 385951 2b746fdf27e0d696aaa3da5339c8fa369e947339
child 385953 4730ce3175f8ce3f10c40af5e21c6831668187a8
push id32672
push userarchaeopteryx@coole-files.de
push dateFri, 13 Oct 2017 09:00:05 +0000
treeherdermozilla-central@3efcb26e5f37 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersGrisha
bugs1362206
milestone58.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1362206 - Have android abort bookmark syncs if a record is too large to upload to the server r=Grisha MozReview-Commit-ID: JBggAu9Ajbu
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
@@ -97,16 +97,19 @@ public class BatchingUploader {
     // In context of the uploader, a "payload" is an entirety of what gets POST-ed to the server in
     // an individual request.
     // In context of Sync Storage, "payload" is a BSO (basic storage object) field defined as: "a
     // string containing the data of the record."
     // Sync Storage servers place a hard limit on how large a payload _field_ might be, and so we
     // maintain this limit for a single sanity check.
     private final long maxPayloadFieldBytes;
 
+    // Set if this channel should ignore further calls to process.
+    private volatile boolean aborted = false;
+
     public BatchingUploader(
             final RepositorySession repositorySession, final ExecutorService workQueue,
             final RepositorySessionStoreDelegate sessionStoreDelegate, final Uri baseCollectionUri,
             final Long localCollectionLastModified, final InfoConfiguration infoConfiguration,
             final AuthHeaderProvider authHeaderProvider) {
         this.repositorySession = repositorySession;
         this.sessionStoreDelegate = sessionStoreDelegate;
         this.collectionUri = baseCollectionUri;
@@ -124,79 +127,71 @@ public class BatchingUploader {
         this.executor = workQueue;
     }
 
     // Called concurrently from the threads running off of a record consumer thread pool.
     public void process(final Record record) {
         final String guid = record.guid;
 
         // If store failed entirely, just bail out. We've already told our delegate that we failed.
-        if (payloadDispatcher.storeFailed.get()) {
+        if (payloadDispatcher.storeFailed.get() || aborted) {
             return;
         }
 
         final JSONObject recordJSON = record.toJSONObject();
 
         final String payloadField = (String) recordJSON.get(CryptoRecord.KEY_PAYLOAD);
         if (payloadField == null) {
-            sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(
-                    new IllegalRecordException(), guid
-            );
+            failRecordStore(new IllegalRecordException(), record, false);
             return;
         }
 
         // We can't upload individual records whose payload fields exceed our payload field byte limit.
         // UTF-8 uses 1 byte per character for the ASCII range. Contents of the payloadField are
         // base64 and hex encoded, so character count is sufficient.
         if (payloadField.length() > this.maxPayloadFieldBytes) {
-            sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(
-                    new PayloadTooLargeToUpload(), guid
-            );
+            failRecordStore(new PayloadTooLargeToUpload(), record, true);
             return;
         }
 
         final byte[] recordBytes = Record.stringToJSONBytes(recordJSON.toJSONString());
         if (recordBytes == null) {
-            sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(
-                    new IllegalRecordException(), guid
-            );
+            failRecordStore(new IllegalRecordException(), record, false);
             return;
         }
 
         final long recordDeltaByteCount = recordBytes.length + PER_RECORD_OVERHEAD_BYTE_COUNT;
         Logger.debug(LOG_TAG, "Processing a record with guid: " + guid);
 
         // We can't upload individual records which exceed our payload total byte limit.
         if ((recordDeltaByteCount + PER_PAYLOAD_OVERHEAD_BYTE_COUNT) > payload.maxBytes) {
-            sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(
-                    new RecordTooLargeToUpload(), guid
-            );
+            failRecordStore(new RecordTooLargeToUpload(), record, true);
             return;
         }
 
         synchronized (payloadLock) {
             final boolean canFitRecordIntoBatch = uploaderMeta.canFit(recordDeltaByteCount);
             final boolean canFitRecordIntoPayload = payload.canFit(recordDeltaByteCount);
 
             // Record fits!
             if (canFitRecordIntoBatch && canFitRecordIntoPayload) {
                 Logger.debug(LOG_TAG, "Record fits into the current batch and payload");
                 addAndFlushIfNecessary(recordDeltaByteCount, recordBytes, guid);
 
-            // Payload won't fit the record.
+                // Payload won't fit the record.
             } else if (canFitRecordIntoBatch) {
                 Logger.debug(LOG_TAG, "Current payload won't fit incoming record, uploading payload.");
                 flush(false, false);
 
                 Logger.debug(LOG_TAG, "Recording the incoming record into a new payload");
 
                 // Keep track of the overflow record.
                 addAndFlushIfNecessary(recordDeltaByteCount, recordBytes, guid);
 
-            // Batch won't fit the record.
+                // Batch won't fit the record.
             } else {
                 Logger.debug(LOG_TAG, "Current batch won't fit incoming record, committing batch.");
                 flush(true, false);
 
                 Logger.debug(LOG_TAG, "Recording the incoming record into a new batch");
 
                 // Keep track of the overflow record.
                 addAndFlushIfNecessary(recordDeltaByteCount, recordBytes, guid);
@@ -233,24 +228,60 @@ public class BatchingUploader {
         payloadDispatcher.finalizeQueue(uploaderMeta.needToCommit(), new Runnable() {
             @Override
             public void run() {
                 flush(true, true);
             }
         });
     }
 
+    // We fail the batch for bookmark records because uploading only a subset of bookmark records is
+    // very likely to cause corruption (e.g. uploading a parent without its children or vice versa).
+    @VisibleForTesting
+    /* package-local */ boolean shouldFailBatchOnFailure(Record record) {
+        return record instanceof BookmarkRecord;
+    }
+
     /* package-local */ void setLastStoreTimestamp(AtomicLong lastModifiedTimestamp) {
         repositorySession.setLastStoreTimestamp(lastModifiedTimestamp.get());
     }
 
     /* package-local */ void finished() {
         sessionStoreDelegate.deferredStoreDelegate(executor).onStoreCompleted();
     }
 
+    // Common handling for marking a record failure and calling our delegate's onRecordStoreFailed.
+    private void failRecordStore(final Exception e, final Record record, boolean sizeOverflow) {
+        // There are three cases we're handling here. See bug 1362206 for some rationale here.
+        // 1. If `record` is not a bookmark and it failed sanity checks for reasons other than
+        //    "it's too large" (say, `record`'s json is 0 bytes),
+        //     - Then mark record's store as 'failed' and continue uploading
+        // 2. If `record` is not a bookmark and it failed sanity checks because it's too large,
+        //     - Continue uploading, and don't fail synchronization because of this one.
+        // 3. If `record` *is* a bookmark, and it failed for any reason
+        //     - Stop uploading.
+        if (shouldFailBatchOnFailure(record)) {
+            // case 3
+            Logger.debug(LOG_TAG, "Batch failed with exception: " + e.toString());
+            // Start ignoring records, and send off to our delegate that we failed.
+            aborted = true;
+            executor.execute(new PayloadDispatcher.NonPayloadContextRunnable() {
+                @Override
+                public void run() {
+                    sessionStoreDelegate.onRecordStoreFailed(e, record.guid);
+                    payloadDispatcher.doStoreFailed(e);
+                }
+            });
+        } else if (!sizeOverflow) {
+            // case 1
+            sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(e, record.guid);
+        }
+        // case 2 is an implicit empty else {} here.
+    }
+
     // Will be called from a thread dispatched by PayloadDispatcher.
     // NB: Access to `uploaderMeta.isUnlimited` is guarded by the payloadLock.
     /* package-local */ void setUnlimitedMode(boolean isUnlimited) {
         // If we know for sure that we're not in a batching mode,
         // consider our batch to be of unlimited size.
         this.uploaderMeta.setIsUnlimited(isUnlimited);
     }
 
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
@@ -135,17 +135,17 @@ class PayloadDispatcher {
     void concurrentModificationDetected() {
         doStoreFailed(new CollectionConcurrentModificationException());
     }
 
     void prepareForNextBatch() {
         batchWhiteboard = batchWhiteboard.nextBatchMeta();
     }
 
-    private void doStoreFailed(Exception reason) {
+    /* package-local */ void doStoreFailed(Exception reason) {
         if (storeFailed.getAndSet(true)) {
             return;
         }
         uploader.abort();
         uploader.sessionStoreDelegate.onStoreFailed(reason);
     }
 
     private static void bumpTimestampTo(final AtomicLong current, long newValue) {
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -20,16 +20,18 @@ import org.mozilla.gecko.sync.InfoCollec
 import org.mozilla.gecko.sync.InfoConfiguration;
 import org.mozilla.gecko.sync.Utils;
 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
 import org.mozilla.gecko.sync.repositories.RepositorySession;
 import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
 import org.mozilla.gecko.sync.repositories.Server15Repository;
 import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
+import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
+import org.mozilla.gecko.sync.repositories.domain.Record;
 
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -385,16 +387,17 @@ public class BatchingUploaderTest {
         // At this point our byte count for the batch is at 3600+overhead;
         // since we have just 496 bytes left in the batch, it's unlikely we'll fit another record.
         // Expect a batch commit
         uploader.process(record);
         assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
         assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
     }
 
+
     @Test
     public void testRandomPayloadSizesBatching() {
         BatchingUploader uploader = makeConstrainedUploader(2, 4);
 
         final Random random = new Random();
         for (int i = 0; i < 15000; i++) {
             uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, random.nextInt(15000)));
         }
@@ -436,29 +439,50 @@ public class BatchingUploaderTest {
         assertEquals(0, ((MockStoreDelegate) storeDelegate).storeFailed);
 
         uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 5));
         assertEquals(0, ((MockStoreDelegate) storeDelegate).storeFailed);
 
         uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 10));
         assertEquals(0, ((MockStoreDelegate) storeDelegate).storeFailed);
 
+        // Check that we don't complain when we're configured not to abort
         uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 11));
-        assertEquals(1, ((MockStoreDelegate) storeDelegate).storeFailed);
-        assertTrue(((MockStoreDelegate) storeDelegate).lastRecordStoreFailedException instanceof BatchingUploader.PayloadTooLargeToUpload);
+        assertEquals(0, ((MockStoreDelegate) storeDelegate).storeFailed);
 
         uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 1000));
-        assertEquals(2, ((MockStoreDelegate) storeDelegate).storeFailed);
-        assertTrue(((MockStoreDelegate) storeDelegate).lastRecordStoreFailedException instanceof BatchingUploader.PayloadTooLargeToUpload);
+        assertEquals(0, ((MockStoreDelegate) storeDelegate).storeFailed);
 
         uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 9));
-        assertEquals(2, ((MockStoreDelegate) storeDelegate).storeFailed);
+        assertEquals(0, ((MockStoreDelegate) storeDelegate).storeFailed);
+
     }
 
     @Test
+    public void testFailAbortsBatch() {
+        BatchingUploader uploader = makeConstrainedUploader(2, 4, 10, false, true);
+        uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 10));
+        uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 10));
+        assertEquals(0, ((MockStoreDelegate) storeDelegate).storeFailed);
+        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+        assertEquals(((MockStoreDelegate) storeDelegate).lastStoreFailedException, null);
+
+        uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 11));
+        assertTrue(((MockStoreDelegate) storeDelegate).lastStoreFailedException instanceof BatchingUploader.PayloadTooLargeToUpload);
+
+        // Ensure that further calls to process do nothing.
+        for (int i = 0; i < 5; ++i) {
+            uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, 1));
+        }
+
+        assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
+    }
+
+
+    @Test
     public void testNoMoreRecordsAfterPayloadPost() {
         BatchingUploader uploader = makeConstrainedUploader(2, 4);
 
         // Process two records (payload limit is also two, batch is four),
         // and ensure that 'no more records' commits.
         MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
         uploader.process(record);
         uploader.process(record);
@@ -544,61 +568,73 @@ public class BatchingUploaderTest {
         return makeConstrainedUploader(maxPostRecords, maxTotalRecords, false);
     }
 
     private BatchingUploader makeConstrainedUploader(long maxPostRecords, long maxTotalRecords, boolean firstSync) {
         return makeConstrainedUploader(maxPostRecords, maxTotalRecords, 1000L, firstSync);
     }
 
     private BatchingUploader makeConstrainedUploader(long maxPostRecords, long maxTotalRecords, long maxPayloadBytes, boolean firstSync) {
+        return makeConstrainedUploader(maxPostRecords, maxTotalRecords, maxPayloadBytes, firstSync, false);
+    }
+
+    private BatchingUploader makeConstrainedUploader(long maxPostRecords, long maxTotalRecords, long maxPayloadBytes, boolean firstSync, boolean abortOnRecordFail) {
         ExtendedJSONObject infoConfigurationJSON = new ExtendedJSONObject();
         infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_BYTES, 4096L);
         infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_RECORDS, maxTotalRecords);
         infoConfigurationJSON.put(InfoConfiguration.MAX_POST_RECORDS, maxPostRecords);
         infoConfigurationJSON.put(InfoConfiguration.MAX_POST_BYTES, 1024L);
         infoConfigurationJSON.put(InfoConfiguration.MAX_REQUEST_BYTES, 1024L);
         infoConfigurationJSON.put(InfoConfiguration.MAX_PAYLOAD_BYTES, maxPayloadBytes);
 
         Server15RepositorySession server15RepositorySession = new Server15RepositorySession(
                 makeConstrainedRepository(firstSync)
         );
         server15RepositorySession.setStoreDelegate(storeDelegate);
         return new MockUploader(
                 server15RepositorySession, workQueue, storeDelegate, Uri.EMPTY, 0L,
-                new InfoConfiguration(infoConfigurationJSON), null);
+                new InfoConfiguration(infoConfigurationJSON), null, abortOnRecordFail);
     }
 
+
     class MockPayloadDispatcher extends PayloadDispatcher {
         MockPayloadDispatcher(final Executor workQueue, final BatchingUploader uploader, Long lastModified) {
             super(workQueue, uploader, lastModified);
         }
 
         @Override
         Runnable createRecordUploadRunnable(ArrayList<byte[]> outgoing, ArrayList<String> outgoingGuids, long byteCount, boolean isCommit, boolean isLastPayload) {
             // No-op runnable. We don't want this to actually do any work for these tests.
             return new Runnable() {
                 @Override
                 public void run() {}
             };
         }
     }
 
     class MockUploader extends BatchingUploader {
+        boolean abortOnRecordFail;
         MockUploader(final RepositorySession repositorySession, final ExecutorService workQueue,
                      final RepositorySessionStoreDelegate sessionStoreDelegate, final Uri baseCollectionUri,
                      final Long localCollectionLastModified, final InfoConfiguration infoConfiguration,
-                     final AuthHeaderProvider authHeaderProvider) {
+                     final AuthHeaderProvider authHeaderProvider, final boolean abortOnRecordFail) {
             super(repositorySession, workQueue, sessionStoreDelegate, baseCollectionUri,
                     localCollectionLastModified, infoConfiguration, authHeaderProvider);
+            this.abortOnRecordFail = abortOnRecordFail;
         }
 
         @Override
         PayloadDispatcher createPayloadDispatcher(ExecutorService workQueue, Long localCollectionLastModified) {
             return new MockPayloadDispatcher(workQueue, this, localCollectionLastModified);
         }
+
+        @Override
+        boolean shouldFailBatchOnFailure(Record r) {
+            return abortOnRecordFail;
+        }
     }
 
     private Server15Repository makeConstrainedRepository(boolean firstSync) {
         InfoCollections infoCollections;
         if (firstSync) {
             infoCollections = new InfoCollections() {
                 @Override
                 public Long getTimestamp(String collection) {