Bug 1373254 - Ensure onStoreFailed won't be called twice r=rnewman
MozReview-Commit-ID: 5IE7t5qs6VU
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
@@ -123,17 +123,17 @@ public class BatchingUploader {
this.executor = workQueue;
}
// Called concurrently from the threads running off of a record consumer thread pool.
public void process(final Record record) {
final String guid = record.guid;
// If store failed entirely, just bail out. We've already told our delegate that we failed.
- if (payloadDispatcher.storeFailed) {
+ if (payloadDispatcher.storeFailed.get()) {
return;
}
// If a record or a payload failed, we won't let subsequent requests proceed.
// This means that we may bail much earlier.
if (payloadDispatcher.recordUploadFailed) {
sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(
new Server15PreviousPostFailedException(), guid
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java
@@ -10,16 +10,17 @@ import android.support.annotation.Visibl
import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.Server15RecordPostFailedException;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import java.util.ArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Dispatches payload runnables and handles their results.
*
* All of the methods, except for `queue` and `finalizeQueue`, will be called from the thread(s)
* running sequentially on the SingleThreadExecutor `executor`.
*/
@@ -33,17 +34,17 @@ class PayloadDispatcher {
private final Executor executor;
private final BatchingUploader uploader;
// For both of these flags:
// Written from sequentially running thread(s) on the SingleThreadExecutor `executor`.
// Read by many threads running concurrently on the records consumer thread pool.
volatile boolean recordUploadFailed = false;
- volatile boolean storeFailed = false;
+ final AtomicBoolean storeFailed = new AtomicBoolean(false);
PayloadDispatcher(Executor executor, BatchingUploader uploader, @Nullable Long initialLastModified) {
// Initially we don't know if we're in a batching mode.
this.batchWhiteboard = new BatchMeta(initialLastModified, null);
this.uploader = uploader;
this.executor = executor;
}
@@ -100,17 +101,17 @@ class PayloadDispatcher {
// If this was our very last commit, we're done storing records.
// Get Last-Modified timestamp from the response, and pass it upstream.
if (isLastPayload) {
uploader.finished();
}
}
void lastPayloadFailed(Exception e) {
- uploader.sessionStoreDelegate.onStoreFailed(e);
+ doStoreFailed(e);
}
void finalizeQueue(final boolean needToCommit, final Runnable finalRunnable) {
executor.execute(new NonPayloadContextRunnable() {
@Override
public void run() {
// Must be called after last payload upload finishes.
if (needToCommit && Boolean.TRUE.equals(batchWhiteboard.getInBatchingMode())) {
@@ -130,25 +131,31 @@ class PayloadDispatcher {
void recordFailed(final Exception e, final String recordGuid) {
Logger.debug(LOG_TAG, "Record store failed for guid " + recordGuid + " with exception: " + e.toString());
recordUploadFailed = true;
uploader.sessionStoreDelegate.onRecordStoreFailed(e, recordGuid);
}
void concurrentModificationDetected() {
- recordUploadFailed = true;
- storeFailed = true;
- uploader.sessionStoreDelegate.onStoreFailed(new CollectionConcurrentModificationException());
+ doStoreFailed(new CollectionConcurrentModificationException());
}
void prepareForNextBatch() {
batchWhiteboard = batchWhiteboard.nextBatchMeta();
}
+ private void doStoreFailed(Exception reason) {
+ if (storeFailed.getAndSet(true)) {
+ return;
+ }
+ recordUploadFailed = true;
+ uploader.sessionStoreDelegate.onStoreFailed(reason);
+ }
+
private static void bumpTimestampTo(final AtomicLong current, long newValue) {
while (true) {
long existing = current.get();
if (existing > newValue) {
return;
}
if (current.compareAndSet(existing, newValue)) {
return;
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
@@ -431,17 +431,17 @@ public class PayloadUploadDelegateTest {
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(authHeaderProvider, payloadDispatcher, postedGuids, false, false);
final HttpResponse response = new BasicHttpResponse(
new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 412, "Precondition failed"));
payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response));
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertTrue(payloadDispatcher.recordUploadFailed);
- assertTrue(payloadDispatcher.storeFailed);
+ assertTrue(payloadDispatcher.storeFailed.get());
assertTrue(((MockRepositorySessionStoreDelegate) sessionStoreDelegate).storeFailedException instanceof CollectionConcurrentModificationException);
}
@Test
public void testIfUnmodifiedSinceNoLM() {
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
authHeaderProvider, payloadDispatcher, new ArrayList<String>(), false, false);