Bug 1291821 - Decouple BatchingUploader from Server11Repository r=rnewman
MozReview-Commit-ID: 7mPy1cmr3vq
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11RepositorySession.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/Server11RepositorySession.java
@@ -1,14 +1,16 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.repositories;
+import android.net.Uri;
+
import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
import org.mozilla.gecko.sync.repositories.domain.Record;
import org.mozilla.gecko.sync.repositories.downloaders.BatchingDownloader;
import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader;
@@ -30,17 +32,20 @@ public class Server11RepositorySession e
return serverRepository;
}
@Override
public void setStoreDelegate(RepositorySessionStoreDelegate storeDelegate) {
super.setStoreDelegate(storeDelegate);
// Now that we have the delegate, we can initialize our uploader.
- this.uploader = new BatchingUploader(this, storeWorkQueue, delegate);
+ this.uploader = new BatchingUploader(
+ this, storeWorkQueue, storeDelegate, Uri.parse(serverRepository.collectionURI.toString()),
+ serverRepository.getCollectionLastModified(), serverRepository.getInfoConfiguration(),
+ serverRepository.authHeaderProvider);
}
@Override
public void guidsSince(long timestamp,
RepositorySessionGuidsSinceDelegate delegate) {
// TODO Auto-generated method stub
}
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java
@@ -5,19 +5,20 @@
package org.mozilla.gecko.sync.repositories.uploaders;
import android.net.Uri;
import android.support.annotation.VisibleForTesting;
import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.Server11RecordPostFailedException;
+import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
-import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import org.mozilla.gecko.sync.repositories.domain.Record;
import java.util.ArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -66,44 +67,48 @@ public class BatchingUploader {
// Accessed by synchronously running threads, OK to not synchronize and just make it volatile.
private volatile Boolean inBatchingMode;
// Used to ensure we have thread-safe access to the following:
// - byte and record counts in both Payload and BatchMeta objects
// - buffers in the Payload object
private final Object payloadLock = new Object();
- protected Executor workQueue;
- protected final RepositorySessionStoreDelegate sessionStoreDelegate;
- protected final Server11RepositorySession repositorySession;
+ private Executor workQueue;
+ private final RepositorySessionStoreDelegate sessionStoreDelegate;
+ private final RepositorySession repositorySession;
+ /* package-local */ final AuthHeaderProvider authHeaderProvider;
- protected AtomicLong uploadTimestamp = new AtomicLong(0);
+ private AtomicLong uploadTimestamp = new AtomicLong(0);
- protected static final int PER_RECORD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORD_SEPARATOR.length;
- protected static final int PER_PAYLOAD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORDS_END.length;
+ private static final int PER_RECORD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORD_SEPARATOR.length;
+ /* package-local */ static final int PER_PAYLOAD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORDS_END.length;
// Sanity check. RECORD_SEPARATOR and RECORD_START are assumed to be of the same length.
static {
if (RecordUploadRunnable.RECORD_SEPARATOR.length != RecordUploadRunnable.RECORDS_START.length) {
throw new IllegalStateException("Separator and start tokens must be of the same length");
}
}
- public BatchingUploader(final Server11RepositorySession repositorySession, final Executor workQueue, final RepositorySessionStoreDelegate sessionStoreDelegate) {
+ public BatchingUploader(
+ final RepositorySession repositorySession, final Executor workQueue,
+ final RepositorySessionStoreDelegate sessionStoreDelegate, final Uri baseCollectionUri,
+ final Long localCollectionLastModified, final InfoConfiguration infoConfiguration,
+ final AuthHeaderProvider authHeaderProvider) {
this.repositorySession = repositorySession;
this.workQueue = workQueue;
this.sessionStoreDelegate = sessionStoreDelegate;
- this.collectionUri = Uri.parse(repositorySession.getServerRepository().collectionURI().toString());
+ this.collectionUri = baseCollectionUri;
+ this.authHeaderProvider = authHeaderProvider;
- InfoConfiguration config = repositorySession.getServerRepository().getInfoConfiguration();
this.batchMeta = new BatchMeta(
- payloadLock, config.maxTotalBytes, config.maxTotalRecords,
- repositorySession.getServerRepository().getCollectionLastModified()
- );
- this.payload = new Payload(payloadLock, config.maxPostBytes, config.maxPostRecords);
+ payloadLock, infoConfiguration.maxTotalBytes, infoConfiguration.maxTotalRecords, localCollectionLastModified);
+ this.payload = new Payload(
+ payloadLock, infoConfiguration.maxPostBytes, infoConfiguration.maxPostRecords);
}
public void process(final Record record) {
final String guid = record.guid;
final byte[] recordBytes = record.toJSONBytes();
final long recordDeltaByteCount = recordBytes.length + PER_RECORD_OVERHEAD_BYTE_COUNT;
Logger.debug(LOG_TAG, "Processing a record with guid: " + guid);
@@ -230,29 +235,29 @@ public class BatchingUploader {
bumpTimestampTo(uploadTimestamp, lastModifiedTimestamp);
finished(uploadTimestamp);
}
private void finished(AtomicLong lastModifiedTimestamp) {
repositorySession.storeDone(lastModifiedTimestamp.get());
}
- public BatchMeta getCurrentBatch() {
+ /* package-local */ BatchMeta getCurrentBatch() {
return batchMeta;
}
- public void setInBatchingMode(boolean inBatchingMode) {
+ /* package-local */ void setInBatchingMode(boolean inBatchingMode) {
this.inBatchingMode = inBatchingMode;
// If we know for sure that we're not in a batching mode,
// consider our batch to be of unlimited size.
this.batchMeta.setIsUnlimited(!inBatchingMode);
}
- public Boolean getInBatchingMode() {
+ /* package-local */ Boolean getInBatchingMode() {
return inBatchingMode;
}
public void setLastModified(final Long lastModified, final boolean isCommit) throws BatchingUploaderException {
// Sanity check.
if (inBatchingMode == null) {
throw new IllegalStateException("Can't process Last-Modified before we know we're in a batching mode.");
}
@@ -273,20 +278,16 @@ public class BatchingUploader {
}
public void recordFailed(final Exception e, final String recordGuid) {
Logger.debug(LOG_TAG, "Record store failed for guid " + recordGuid + " with exception: " + e.toString());
recordUploadFailed = true;
sessionStoreDelegate.onRecordStoreFailed(e, recordGuid);
}
- public Server11RepositorySession getRepositorySession() {
- return repositorySession;
- }
-
private static void bumpTimestampTo(final AtomicLong current, long newValue) {
while (true) {
long existing = current.get();
if (existing > newValue) {
return;
}
if (current.compareAndSet(existing, newValue)) {
return;
--- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java
+++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java
@@ -13,36 +13,36 @@ import org.mozilla.gecko.sync.NonObjectJ
import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import java.util.ArrayList;
-public class PayloadUploadDelegate implements SyncStorageRequestDelegate {
+/* package-local */ class PayloadUploadDelegate implements SyncStorageRequestDelegate {
private static final String LOG_TAG = "PayloadUploadDelegate";
private static final String KEY_BATCH = "batch";
private final BatchingUploader uploader;
private ArrayList<String> postedRecordGuids;
private final boolean isCommit;
private final boolean isLastPayload;
- public PayloadUploadDelegate(BatchingUploader uploader, ArrayList<String> postedRecordGuids, boolean isCommit, boolean isLastPayload) {
+ /* package-local */ PayloadUploadDelegate(BatchingUploader uploader, ArrayList<String> postedRecordGuids, boolean isCommit, boolean isLastPayload) {
this.uploader = uploader;
this.postedRecordGuids = postedRecordGuids;
this.isCommit = isCommit;
this.isLastPayload = isLastPayload;
}
@Override
public AuthHeaderProvider getAuthHeaderProvider() {
- return uploader.getRepositorySession().getServerRepository().getAuthHeaderProvider();
+ return uploader.authHeaderProvider;
}
@Override
public String ifUnmodifiedSince() {
final Long lastModified = uploader.getCurrentBatch().getLastModified();
if (lastModified == null) {
return null;
}
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java
@@ -1,13 +1,14 @@
/* Any copyright is dedicated to the Public Domain.
http://creativecommons.org/publicdomain/zero/1.0/ */
package org.mozilla.gecko.sync.repositories.uploaders;
+import android.net.Uri;
import android.support.annotation.NonNull;
import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.MockRecord;
@@ -399,43 +400,39 @@ public class BatchingUploaderTest {
uploader.process(record);
uploader.commitIfNecessaryAfterLastPayload();
assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
}
private BatchingUploader makeConstrainedUploader(long maxPostRecords, long maxTotalRecords) {
+ ExtendedJSONObject infoConfigurationJSON = new ExtendedJSONObject();
+ infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_BYTES, 4096L);
+ infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_RECORDS, maxTotalRecords);
+ infoConfigurationJSON.put(InfoConfiguration.MAX_POST_RECORDS, maxPostRecords);
+ infoConfigurationJSON.put(InfoConfiguration.MAX_POST_BYTES, 1024L);
+ infoConfigurationJSON.put(InfoConfiguration.MAX_REQUEST_BYTES, 1024L);
+
Server11RepositorySession server11RepositorySession = new Server11RepositorySession(
- makeCountConstrainedRepository(maxPostRecords, maxTotalRecords)
+ makeConstrainedRepository(infoConfigurationJSON)
);
server11RepositorySession.setStoreDelegate(storeDelegate);
- return new BatchingUploader(server11RepositorySession, workQueue, storeDelegate);
- }
-
- private Server11Repository makeCountConstrainedRepository(long maxPostRecords, long maxTotalRecords) {
- return makeConstrainedRepository(1024, 1024, maxPostRecords, 4096, maxTotalRecords);
+ return new BatchingUploader(
+ server11RepositorySession, workQueue, storeDelegate, Uri.EMPTY, null,
+ new InfoConfiguration(infoConfigurationJSON), null);
}
- private Server11Repository makeConstrainedRepository(long maxRequestBytes, long maxPostBytes, long maxPostRecords, long maxTotalBytes, long maxTotalRecords) {
- ExtendedJSONObject infoConfigurationJSON = new ExtendedJSONObject();
- infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_BYTES, maxTotalBytes);
- infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_RECORDS, maxTotalRecords);
- infoConfigurationJSON.put(InfoConfiguration.MAX_POST_RECORDS, maxPostRecords);
- infoConfigurationJSON.put(InfoConfiguration.MAX_POST_BYTES, maxPostBytes);
- infoConfigurationJSON.put(InfoConfiguration.MAX_REQUEST_BYTES, maxRequestBytes);
-
- InfoConfiguration infoConfiguration = new InfoConfiguration(infoConfigurationJSON);
-
+ private Server11Repository makeConstrainedRepository(ExtendedJSONObject infoConfigurationJSON) {
try {
return new Server11Repository(
"dummyCollection",
"http://dummy.url/",
null,
new InfoCollections(),
- infoConfiguration
+ new InfoConfiguration(infoConfigurationJSON)
);
} catch (URISyntaxException e) {
// Won't throw, and this won't happen.
return null;
}
}
}
\ No newline at end of file
--- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
+++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java
@@ -1,25 +1,28 @@
/* Any copyright is dedicated to the Public Domain.
http://creativecommons.org/publicdomain/zero/1.0/ */
package org.mozilla.gecko.sync.repositories.uploaders;
+import android.net.Uri;
+
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.sync.HTTPFailureException;
import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.NonObjectJSONException;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
+import org.mozilla.gecko.sync.repositories.RepositorySession;
+import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
import org.mozilla.gecko.sync.repositories.Server11Repository;
-import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.Executor;
import ch.boye.httpclientandroidlib.HttpResponse;
@@ -38,18 +41,19 @@ public class PayloadUploadDelegateTest {
public final ArrayList<String> successRecords = new ArrayList<>();
public final HashMap<String, Exception> failedRecords = new HashMap<>();
public boolean didLastPayloadFail = false;
public ArrayList<SyncStorageResponse> successResponses = new ArrayList<>();
public int commitPayloadsSucceeded = 0;
public int lastPayloadsSucceeded = 0;
- public MockUploader(final Server11RepositorySession repositorySession, final Executor workQueue, final RepositorySessionStoreDelegate sessionStoreDelegate) {
- super(repositorySession, workQueue, sessionStoreDelegate);
+ public MockUploader(final RepositorySession repositorySession, final Executor workQueue,
+ final RepositorySessionStoreDelegate sessionStoreDelegate) {
+ super(repositorySession, workQueue, sessionStoreDelegate, Uri.EMPTY, null, new InfoConfiguration(), null);
}
@Override
public void payloadSucceeded(final SyncStorageResponse response, final boolean isCommit, final boolean isLastPayload) {
successResponses.add(response);
if (isCommit) {
++commitPayloadsSucceeded;
}