Bug 1253051 - Implement atomic uploads for sync collection data r?rnewman draft
authorThom Chiovoloni <tchiovoloni@mozilla.com>
Wed, 24 Aug 2016 16:02:14 -0400
changeset 405119 e4d795abd1ff0e75237457c048ef4f8f2fdc8ecf
parent 404258 1150d238edc731d37bdbbe1601141214ce0415bc
child 529357 5ddc88cadad876d16ef4f7e723148ff7be697d70
push id27395
push userbmo:tchiovoloni@mozilla.com
push dateWed, 24 Aug 2016 20:02:38 +0000
reviewersrnewman
bugs1253051
milestone51.0a1
Bug 1253051 - Implement atomic uploads for sync collection data r?rnewman MozReview-Commit-ID: Hm9NbnC2Vk8
services/sync/modules/constants.js
services/sync/modules/engines.js
services/sync/modules/engines/forms.js
services/sync/modules/engines/history.js
services/sync/modules/record.js
services/sync/modules/service.js
services/sync/tests/unit/head_http_server.js
services/sync/tests/unit/test_postqueue.js
services/sync/tests/unit/test_syncengine_sync.js
services/sync/tests/unit/xpcshell.ini
--- a/services/sync/modules/constants.js
+++ b/services/sync/modules/constants.js
@@ -143,16 +143,18 @@ RESPONSE_OVER_QUOTA:                   "
 
 // engine failure status codes
 ENGINE_UPLOAD_FAIL:                    "error.engine.reason.record_upload_fail",
 ENGINE_DOWNLOAD_FAIL:                  "error.engine.reason.record_download_fail",
 ENGINE_UNKNOWN_FAIL:                   "error.engine.reason.unknown_fail",
 ENGINE_APPLY_FAIL:                     "error.engine.reason.apply_fail",
 ENGINE_METARECORD_DOWNLOAD_FAIL:       "error.engine.reason.metarecord_download_fail",
 ENGINE_METARECORD_UPLOAD_FAIL:         "error.engine.reason.metarecord_upload_fail",
+// an upload failure where the batch was interrupted with a 412
+ENGINE_BATCH_INTERRUPTED:              "error.engine.reason.batch_interrupted",
 
 JPAKE_ERROR_CHANNEL:                   "jpake.error.channel",
 JPAKE_ERROR_NETWORK:                   "jpake.error.network",
 JPAKE_ERROR_SERVER:                    "jpake.error.server",
 JPAKE_ERROR_TIMEOUT:                   "jpake.error.timeout",
 JPAKE_ERROR_INTERNAL:                  "jpake.error.internal",
 JPAKE_ERROR_INVALID:                   "jpake.error.invalid",
 JPAKE_ERROR_NODATA:                    "jpake.error.nodata",
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -628,16 +628,22 @@ Engine.prototype = {
   // _storeObj, and _trackerObj should to be overridden in subclasses
   _storeObj: Store,
   _trackerObj: Tracker,
 
   // Local 'constant'.
   // Signal to the engine that processing further records is pointless.
   eEngineAbortApplyIncoming: "error.engine.abort.applyincoming",
 
+  // Should we keep syncing if we find a record that cannot be uploaded (ever)?
+  // If this is false, we'll throw, otherwise, we'll ignore the record and
+  // continue. This currently can only happen due to the record being larger
+  // than the record upload limit.
+  allowSkippedRecord: false,
+
   get prefName() {
     return this.name;
   },
 
   get enabled() {
     return Svc.Prefs.get("engine." + this.prefName, false);
   },
 
@@ -1455,45 +1461,62 @@ SyncEngine.prototype = {
     if (modifiedIDs.length) {
       this._log.trace("Preparing " + modifiedIDs.length +
                       " outgoing records");
 
       let counts = { sent: modifiedIDs.length, failed: 0 };
 
       // collection we'll upload
       let up = new Collection(this.engineURL, null, this.service);
-      let handleResponse = resp => {
+
+      let failed = [];
+      let successful = [];
+      let handleResponse = (resp, batchOngoing = false) => {
+        // Note: We don't want to update this.lastSync, or this._modified until
+        // the batch is complete, however we want to remember success/failure
+        // indicators for when that happens.
         if (!resp.success) {
           this._log.debug("Uploading records failed: " + resp);
-          resp.failureCode = ENGINE_UPLOAD_FAIL;
+          resp.failureCode = resp.status == 412 ? ENGINE_BATCH_INTERRUPTED : ENGINE_UPLOAD_FAIL;
           throw resp;
         }
 
         // Update server timestamp from the upload.
-        let modified = resp.headers["x-weave-timestamp"];
-        if (modified > this.lastSync)
-          this.lastSync = modified;
+        failed = failed.concat(Object.keys(resp.obj.failed));
+        successful = successful.concat(resp.obj.success);
 
-        let failed_ids = Object.keys(resp.obj.failed);
-        counts.failed += failed_ids.length;
-        if (failed_ids.length)
+        if (batchOngoing) {
+          // Nothing to do yet
+          return;
+        }
+        // Advance lastSync since we've finished the batch.
+        let modified = resp.headers["x-weave-timestamp"];
+        if (modified > this.lastSync) {
+          this.lastSync = modified;
+        }
+        if (failed.length && this._log.level <= Log.Level.Debug) {
           this._log.debug("Records that will be uploaded again because "
                           + "the server couldn't store them: "
-                          + failed_ids.join(", "));
+                          + failed.join(", "));
+        }
 
-        // Clear successfully uploaded objects.
-        const succeeded_ids = Object.values(resp.obj.success);
-        for (let id of succeeded_ids) {
+        counts.failed += failed.length;
+
+        for (let id of successful) {
           delete this._modified[id];
         }
 
-        this._onRecordsWritten(succeeded_ids, failed_ids);
-      }
+        this._onRecordsWritten(successful, failed);
 
-      let postQueue = up.newPostQueue(this._log, handleResponse);
+        // clear for next batch
+        failed.length = 0;
+        successful.length = 0;
+      };
+
+      let postQueue = up.newPostQueue(this._log, this.lastSync, handleResponse);
 
       for (let id of modifiedIDs) {
         let out;
         let ok = false;
         try {
           out = this._createRecord(id);
           if (this._log.level <= Log.Level.Trace)
             this._log.trace("Outgoing: " + out);
@@ -1502,21 +1525,27 @@ SyncEngine.prototype = {
           ok = true;
         } catch (ex) {
           if (Async.isShutdownException(ex)) {
             throw ex;
           }
           this._log.warn("Error creating record", ex);
         }
         if (ok) {
-          postQueue.enqueue(out);
+          let { enqueued, error } = postQueue.enqueue(out);
+          if (!enqueued) {
+            ++counts.failed;
+            if (!this.allowSkippedRecord) {
+              throw error;
+            }
+          }
         }
         this._store._sleep(0);
       }
-      postQueue.flush();
+      postQueue.flush(true);
       Observers.notify("weave:engine:sync:uploaded", counts, this.name);
     }
   },
 
   _onRecordsWritten(succeeded, failed) {
     // Implement this method to take specific actions against successfully
     // uploaded records and failed records.
   },
--- a/services/sync/modules/engines/forms.js
+++ b/services/sync/modules/engines/forms.js
@@ -101,16 +101,17 @@ this.FormEngine = function FormEngine(se
   SyncEngine.call(this, "Forms", service);
 }
 FormEngine.prototype = {
   __proto__: SyncEngine.prototype,
   _storeObj: FormStore,
   _trackerObj: FormTracker,
   _recordObj: FormRec,
   applyIncomingBatchSize: FORMS_STORE_BATCH_SIZE,
+  allowSkippedRecord: true,
 
   syncPriority: 6,
 
   get prefName() {
     return "history";
   },
 
   _findDupe: function _findDupe(item) {
--- a/services/sync/modules/engines/history.js
+++ b/services/sync/modules/engines/history.js
@@ -37,16 +37,17 @@ this.HistoryEngine = function HistoryEng
 }
 HistoryEngine.prototype = {
   __proto__: SyncEngine.prototype,
   _recordObj: HistoryRec,
   _storeObj: HistoryStore,
   _trackerObj: HistoryTracker,
   downloadLimit: MAX_HISTORY_DOWNLOAD,
   applyIncomingBatchSize: HISTORY_STORE_BATCH_SIZE,
+  allowSkippedRecord: true,
 
   syncPriority: 7,
 
   _processIncoming: function (newitems) {
     // We want to notify history observers that a batch operation is underway
     // so they don't do lots of work for each incoming record.
     let observers = PlacesUtils.history.getObservers();
     function notifyHistoryObservers(notification) {
--- a/services/sync/modules/record.js
+++ b/services/sync/modules/record.js
@@ -526,16 +526,19 @@ this.Collection = function Collection(ur
   this._service = service;
 
   this._full = false;
   this._ids = null;
   this._limit = 0;
   this._older = 0;
   this._newer = 0;
   this._data = [];
+  // optional members used by batch operations.
+  this._batch = null;
+  this._commit = false;
 }
 Collection.prototype = {
   __proto__: Resource.prototype,
   _logName: "Sync.Collection",
 
   _rebuildURL: function Coll__rebuildURL() {
     // XXX should consider what happens if it's not a URL...
     this.uri.QueryInterface(Ci.nsIURL);
@@ -549,16 +552,20 @@ Collection.prototype = {
     if (this.full)
       args.push('full=1');
     if (this.sort)
       args.push('sort=' + this.sort);
     if (this.ids != null)
       args.push("ids=" + this.ids);
     if (this.limit > 0 && this.limit != Infinity)
       args.push("limit=" + this.limit);
+    if (this._batch)
+      args.push("batch=" + this._batch);
+    if (this._commit)
+      args.push("commit=true");
 
     this.uri.query = (args.length > 0)? '?' + args.join('&') : '';
   },
 
   // get full items
   get full() { return this._full; },
   set full(value) {
     this._full = value;
@@ -598,16 +605,29 @@ Collection.prototype = {
   // newest (newest first)
   // index
   get sort() { return this._sort; },
   set sort(value) {
     this._sort = value;
     this._rebuildURL();
   },
 
+  // Set information about the batch for this request.
+  get batch() { return _batch; },
+  set batch(value) {
+    this._batch = value;
+    this._rebuildURL();
+  },
+
+  get commit() { return _commit; },
+  set commit(value) {
+    this._commit = value && true;
+    this._rebuildURL();
+  },
+
   set recordHandler(onRecord) {
     // Save this because onProgress is called with this as the ChannelListener
     let coll = this;
 
     // Switch to newline separated records for incremental parsing
     coll.setHeader("Accept", "application/newlines");
 
     this._onProgress = function() {
@@ -625,81 +645,242 @@ Collection.prototype = {
     };
   },
 
   // This object only supports posting via the postQueue object.
   post() {
     throw new Error("Don't directly post to a collection - use newPostQueue instead");
   },
 
-  newPostQueue(log, postCallback) {
-    let poster = data => {
+  newPostQueue(log, timestamp, postCallback) {
+    let poster = (data, headers, batch, commit) => {
+      this.batch = batch;
+      this.commit = commit;
+      for (let [header, value] of headers) {
+        this.setHeader(header, value);
+      }
       return Resource.prototype.post.call(this, data);
     }
-    return new PostQueue(poster, log, postCallback);
+    let getConfig = (name, defaultVal) => {
+      if (this._service.serverConfiguration && this._service.serverConfiguration.hasOwnProperty(name)) {
+        return this._service.serverConfiguration[name];
+      }
+      return defaultVal;
+    }
+
+    let config = {
+      max_post_bytes: getConfig("max_post_bytes", MAX_UPLOAD_BYTES),
+      max_post_records: getConfig("max_post_records", MAX_UPLOAD_RECORDS),
+
+      max_batch_bytes: getConfig("max_total_bytes", Infinity),
+      max_batch_records: getConfig("max_total_records", Infinity),
+    }
+
+    // Handle config edge cases
+    if (config.max_post_records <= 0) { config.max_post_records = MAX_UPLOAD_RECORDS; }
+    if (config.max_batch_records <= 0) { config.max_post_records = Infinity; }
+    if (config.max_post_bytes <= 0) { config.max_post_records = MAX_UPLOAD_BYTES; }
+    if (config.max_batch_bytes <= 0) { config.max_post_records = Infinity; }
+
+    // Max size of BSO payload is 256k. This assumes at most 4k of overhead,
+    // which sounds like plenty. If the server says it can't handle this, we
+    // might have valid records we can't sync, so we give up on syncing.
+    let requiredMax = 260 * 1024;
+    if (config.max_post_bytes < requiredMax) {
+      this._log.error("Server configuration max_post_bytes is too low", config);
+      throw new Error("Server configuration max_post_bytes is too low");
+    }
+
+    return new PostQueue(poster, timestamp, config, log, postCallback);
   },
 };
 
 /* A helper to manage the posting of records while respecting the various
    size limits.
+
+   This supports the concept of a server-side "batch". The general idea is:
+   * We queue as many records as allowed in memory, then make a single POST.
+   * This first POST (optionally) gives us a batch ID, which we use for
+     all subsequent posts, until...
+   * At some point we hit a batch-maximum, and jump through a few hoops to
+     commit the current batch (ie, all previous POSTs) and start a new one.
+   * Eventually commit the final batch.
+
+  In most cases we expect there to be exactly 1 batch consisting of possibly
+  multiple POSTs.
 */
-function PostQueue(poster, log, postCallback) {
+function PostQueue(poster, timestamp, config, log, postCallback) {
   // The "post" function we should use when it comes time to do the post.
   this.poster = poster;
   this.log = log;
 
+  // The config we use. We expect it to have fields "max_post_records",
+  // "max_batch_records", "max_post_bytes", and "max_batch_bytes"
+  this.config = config;
+
   // The callback we make with the response when we do get around to making the
   // post (which could be during any of the enqueue() calls or the final flush())
   // This callback may be called multiple times and must not add new items to
   // the queue.
+  // The second argument passed to this callback is a boolean value that is true
+  // if we're in the middle of a batch, and false if either the batch is
+  // complete, or it's a post to a server that does not understand batching.
   this.postCallback = postCallback;
 
   // The string where we are capturing the stringified version of the records
   // queued so far. It will always be invalid JSON as it is always missing the
-  // close bracket.
+  // closing bracket.
   this.queued = "";
 
-  // The number of records we've queued so far.
+  // The number of records we've queued so far but are yet to POST.
   this.numQueued = 0;
+
+  // The number of records/bytes we've processed in previous POSTs for our
+  // current batch. Does *not* include records currently queued for the next POST.
+  this.numAlreadyBatched = 0;
+  this.bytesAlreadyBatched = 0;
+
+  // The ID of our current batch. Can be undefined (meaning we are yet to make
+  // the first post of a patch, so don't know if we have a batch), null (meaning
+  // we've made the first post but the server response indicated no batching
+  // semantics), otherwise we have made the first post and it holds the batch ID
+  // returned from the server.
+  this.batchID = undefined;
+
+  // Time used for X-If-Unmodified-Since -- should be the timestamp from the last GET.
+  this.lastModified = timestamp;
 }
 
 PostQueue.prototype = {
   enqueue(record) {
     // We want to ensure the record has a .toJSON() method defined - even
     // though JSON.stringify() would implicitly call it, the stringify might
     // still work even if it isn't defined, which isn't what we want.
     let jsonRepr = record.toJSON();
     if (!jsonRepr) {
       throw new Error("You must only call this with objects that explicitly support JSON");
     }
     let bytes = JSON.stringify(jsonRepr);
-    // Note that we purposely don't check if a single record would exceed our
-    // limit - we still attempt the post and if it sees a 413 like we think it
-    // will, we just let that do whatever it does (which is probably cause
-    // ongoing sync failures for that engine - bug 1241356 exists to fix this)
-    // (Note that counter-intuitively, the post of the oversized record will
-    // not happen here but on the next .enqueue/.flush.)
+
+    // Do a flush if we can't add this record without exceeding our single-request
+    // limits, or without exceeding the total limit for a single batch.
+    let newLength = this.queued.length + bytes.length + 2; // extras for leading "[" / "," and trailing "]"
+
+    let maxAllowedBytes = Math.min(256 * 1024, this.config.max_post_bytes);
+
+    let postSizeExceeded = this.numQueued >= this.config.max_post_records ||
+                           newLength >= maxAllowedBytes;
+
+    let batchSizeExceeded = (this.numQueued + this.numAlreadyBatched) >= this.config.max_batch_records ||
+                            (newLength + this.bytesAlreadyBatched) >= this.config.max_batch_bytes;
+
+    let singleRecordTooBig = bytes.length + 2 > maxAllowedBytes;
 
-    // Do a flush if we can't add this record without exceeding our limits.
-    let newLength = this.queued.length + bytes.length + 1; // extra 1 for trailing "]"
-    if (this.numQueued >= MAX_UPLOAD_RECORDS || newLength >= MAX_UPLOAD_BYTES) {
-      this.log.trace("PostQueue flushing"); // flush logs more info...
-      // We need to write the queue out before handling this one.
-      this.flush();
+    if (postSizeExceeded || batchSizeExceeded) {
+      this.log.trace(`PostQueue flushing due to postSizeExceeded=${postSizeExceeded}, batchSizeExceeded=${batchSizeExceeded}` +
+                     `, max_batch_bytes: ${this.config.max_batch_bytes}, max_post_bytes: ${this.config.max_post_bytes}`);
+
+      if (singleRecordTooBig) {
+        return { enqueued: false, error: new Error("Single record too large to submit to server") };
+      }
+
+      // We need to write the queue out before handling this one, but we only
+      // commit the batch (and thus start a new one) if the batch is full.
+      // Note that if a single record is too big for the batch or post, then
+      // the batch may be empty, and so we don't flush in that case.
+      if (this.numQueued) {
+        this.flush(batchSizeExceeded || singleRecordTooBig);
+      }
     }
     // Either a ',' or a '[' depending on whether this is the first record.
     this.queued += this.numQueued ? "," : "[";
     this.queued += bytes;
     this.numQueued++;
+    return { enqueued: true };
   },
 
-  flush() {
+  flush(finalBatchPost) {
     if (!this.queued) {
-      // nothing queued.
+      // nothing queued - we can't be in a batch, and something has gone very
+      // bad if we think we are.
+      if (this.batchID) {
+        throw new Error(`Flush called when no queued records but we are in a batch ${this.batchID}`);
+      }
       return;
     }
-    this.log.info(`Posting ${this.numQueued} records of ${this.queued.length+1} bytes`);
+    // the batch query-param and headers we'll send.
+    let batch;
+    let headers = [];
+    if (this.batchID === undefined) {
+      // First commit in a (possible) batch.
+      batch = "true";
+    } else if (this.batchID) {
+      // We have an existing batch.
+      batch = this.batchID;
+    } else {
+      // Not the first post and we know we have no batch semantics.
+      batch = null;
+    }
+
+    headers.push(["x-if-unmodified-since", this.lastModified]);
+
+    this.log.info(`Posting ${this.numQueued} records of ${this.queued.length+1} bytes with batch=${batch}`);
     let queued = this.queued + "]";
+    if (finalBatchPost) {
+      this.bytesAlreadyBatched = 0;
+      this.numAlreadyBatched = 0;
+    } else {
+      this.bytesAlreadyBatched += queued.length;
+      this.numAlreadyBatched += this.numQueued;
+    }
     this.queued = "";
     this.numQueued = 0;
-    this.postCallback(this.poster(queued));
+    let response = this.poster(queued, headers, batch, !!(finalBatchPost && this.batchID !== null));
+
+    if (!response.success) {
+      this.log.trace("Server error response during a batch", response);
+      // not clear what we should do here - we expect the consumer of this to
+      // abort by throwing in the postCallback below.
+      return this.postCallback(response, !finalBatchPost);
+    }
+
+    if (finalBatchPost) {
+      this.log.trace("Committed batch", this.batchID);
+      this.batchID = undefined; // we are now in "first post for the batch" state.
+      this.lastModified = response.headers["x-last-modified"];
+      return this.postCallback(response, false);
+    }
+
+    if (response.status != 202) {
+      if (this.batchID) {
+        throw new Error("Server responded non-202 success code while a batch was in progress");
+      }
+      this.batchID = null; // no batch semantics are in place.
+      this.lastModified = response.headers["x-last-modified"];
+      return this.postCallback(response, false);
+    }
+
+    // this response is saying the server has batch semantics - we should
+    // always have a batch ID in the response.
+    let responseBatchID = response.obj.batch;
+    this.log.trace("Server responsed 202 with batch", responseBatchID);
+    if (!responseBatchID) {
+      this.log.error("Invalid server response: 202 without a batch ID", response);
+      throw new Error("Invalid server response: 202 without a batch ID");
+    }
+
+    if (this.batchID === undefined) {
+      this.batchID = responseBatchID;
+      if (!this.lastModified) {
+        this.lastModified = response.headers["x-last-modified"];
+        if (!this.lastModified) {
+          throw new Error("Batch response without x-last-modified");
+        }
+      }
+    }
+
+    if (this.batchID != responseBatchID) {
+      throw new Error(`Invalid client/server batch state - client has ${this.batchID}, server has ${responseBatchID}`);
+    }
+
+    this.postCallback(response, true);
   },
 }
--- a/services/sync/modules/service.js
+++ b/services/sync/modules/service.js
@@ -1059,21 +1059,56 @@ Sync11Service.prototype = {
     try {
       cb.wait();
       return null;
     } catch (ex) {
       return this.errorHandler.errorStr(ex.body);
     }
   },
 
+  // Note: returns false if we failed for a reason other than the server not yet
+  // supporting the api.
+  _fetchServerConfiguration() {
+    // This is similar to _fetchInfo, but with different error handling.
+
+    let infoURL = this.userBaseURL + "info/configuration";
+    this._log.debug("Fetching server configuration", infoURL);
+    let configResponse;
+    try {
+      configResponse = this.resource(infoURL).get();
+    } catch (ex) {
+      // This is probably a network or similar error.
+      this._log.warn("Failed to fetch info/configuration", ex);
+      this.errorHandler.checkServerError(ex);
+      return false;
+    }
+
+    if (configResponse.status == 404) {
+      // This server doesn't support the URL yet - that's OK.
+      this._log.debug("info/configuration returned 404 - using default upload semantics");
+    } else if (configResponse.status != 200) {
+      this._log.warn(`info/configuration returned ${configResponse.status} - using default configuration`);
+      this.errorHandler.checkServerError(configResponse);
+      return false;
+    } else {
+      this.serverConfiguration = configResponse.obj;
+    }
+    this._log.trace("info/configuration for this server", this.serverConfiguration);
+    return true;
+  },
+
   // Stuff we need to do after login, before we can really do
   // anything (e.g. key setup).
   _remoteSetup: function _remoteSetup(infoResponse) {
     let reset = false;
 
+    if (!this._fetchServerConfiguration()) {
+      return false;
+    }
+
     this._log.debug("Fetching global metadata record");
     let meta = this.recordManager.get(this.metaURL);
 
     // Checking modified time of the meta record.
     if (infoResponse &&
         (infoResponse.obj.meta != this.metaModified) &&
         (!meta || !meta.isNew)) {
 
--- a/services/sync/tests/unit/head_http_server.js
+++ b/services/sync/tests/unit/head_http_server.js
@@ -389,36 +389,36 @@ ServerCollection.prototype = {
         options.newer = parseFloat(options.newer);
       }
       if (options.limit) {
         options.limit = parseInt(options.limit, 10);
       }
 
       switch(request.method) {
         case "GET":
-          body = self.get(options);
+          body = self.get(options, request);
           // "If supported by the db, this header will return the number of
           // records total in the request body of any multiple-record GET
           // request."
           let records = options.recordCount;
           self._log.info("Records: " + records);
           if (records != null) {
             response.setHeader("X-Weave-Records", "" + records);
           }
           break;
 
         case "POST":
-          let res = self.post(readBytesFromInputStream(request.bodyInputStream));
+          let res = self.post(readBytesFromInputStream(request.bodyInputStream), request);
           body = JSON.stringify(res);
           response.newModified = res.modified;
           break;
 
         case "DELETE":
           self._log.debug("Invoking ServerCollection.DELETE.");
-          let deleted = self.delete(options);
+          let deleted = self.delete(options, request);
           let ts = new_timestamp();
           body = JSON.stringify(ts);
           response.newModified = ts;
           response.deleted = deleted;
           break;
       }
       response.setHeader("X-Weave-Timestamp",
                          "" + new_timestamp(),
new file mode 100644
--- /dev/null
+++ b/services/sync/tests/unit/test_postqueue.js
@@ -0,0 +1,455 @@
+/* Any copyright is dedicated to the Public Domain.
+ * http://creativecommons.org/publicdomain/zero/1.0/ */
+
+let { PostQueue } = Cu.import("resource://services-sync/record.js", {});
+
+initTestLogging("Trace");
+
+function makeRecord(nbytes) {
+  // make a string 2-bytes less - the added quotes will make it correct.
+  return {
+    toJSON: () => "x".repeat(nbytes-2),
+  }
+}
+
+function makePostQueue(config, lastModTime, responseGenerator) {
+  let stats = {
+    posts: [],
+  }
+  let poster = (data, headers, batch, commit) => {
+    let thisPost = { nbytes: data.length, batch, commit };
+    if (headers.length) {
+      thisPost.headers = headers;
+    }
+    stats.posts.push(thisPost);
+    return responseGenerator.next().value;
+  }
+
+  let done = () => {}
+  let pq = new PostQueue(poster, lastModTime, config, getTestLogger(), done);
+  return { pq, stats };
+}
+
+add_test(function test_simple() {
+  let config = {
+    max_post_bytes: 1000,
+    max_post_records: 100,
+    max_batch_bytes: Infinity,
+    max_batch_records: Infinity,
+  }
+
+  const time = 11111111;
+
+  function* responseGenerator() {
+    yield { success: true, status: 200, headers: { 'x-weave-timestamp': time + 100, 'x-last-modified': time + 100 } };
+  }
+
+  let { pq, stats } = makePostQueue(config, time, responseGenerator());
+  pq.enqueue(makeRecord(10));
+  pq.flush(true);
+
+  deepEqual(stats.posts, [{
+    nbytes: 12, // expect our 10 byte record plus "[]" to wrap it.
+    commit: true, // we don't know if we have batch semantics, so committed.
+    headers: [["x-if-unmodified-since", time]],
+    batch: "true"}]);
+
+  run_next_test();
+});
+
+// Test we do the right thing when we need to make multiple posts when there
+// are no batch semantics
+add_test(function test_max_post_bytes_no_batch() {
+  let config = {
+    max_post_bytes: 50,
+    max_post_records: 4,
+    max_batch_bytes: Infinity,
+    max_batch_records: Infinity,
+  }
+
+  const time = 11111111;
+  function* responseGenerator() {
+    yield { success: true, status: 200, headers: { 'x-weave-timestamp': time + 100, 'x-last-modified': time + 100 } };
+    yield { success: true, status: 200, headers: { 'x-weave-timestamp': time + 200, 'x-last-modified': time + 200 } };
+  }
+
+  let { pq, stats } = makePostQueue(config, time, responseGenerator());
+  pq.enqueue(makeRecord(20)); // total size now 22 bytes - "[" + record + "]"
+  pq.enqueue(makeRecord(20)); // total size now 43 bytes - "[" + record + "," + record + "]"
+  pq.enqueue(makeRecord(20)); // this will exceed our byte limit, so will be in the 2nd POST.
+  pq.flush(true);
+
+  deepEqual(stats.posts, [
+    {
+      nbytes: 43, // 43 for the first post
+      commit: false,
+      headers: [["x-if-unmodified-since", time]],
+      batch: "true",
+    },{
+      nbytes: 22,
+      commit: false, // we know we aren't in a batch, so never commit.
+      headers: [["x-if-unmodified-since", time + 100]],
+      batch: null,
+    }
+  ]);
+  equal(pq.lastModified, time + 200);
+
+  run_next_test();
+});
+
+// Similar to the above, but we've hit max_records instead of max_bytes.
+add_test(function test_max_post_records_no_batch() {
+  let config = {
+    max_post_bytes: 100,
+    max_post_records: 2,
+    max_batch_bytes: Infinity,
+    max_batch_records: Infinity,
+  }
+
+  const time = 11111111;
+
+  function* responseGenerator() {
+    yield { success: true, status: 200, headers: { 'x-weave-timestamp': time + 100, 'x-last-modified': time + 100 } };
+    yield { success: true, status: 200, headers: { 'x-weave-timestamp': time + 200, 'x-last-modified': time + 200 } };
+  }
+
+  let { pq, stats } = makePostQueue(config, time, responseGenerator());
+  pq.enqueue(makeRecord(20)); // total size now 22 bytes - "[" + record + "]"
+  pq.enqueue(makeRecord(20)); // total size now 43 bytes - "[" + record + "," + record + "]"
+  pq.enqueue(makeRecord(20)); // this will exceed our records limit, so will be in the 2nd POST.
+  pq.flush(true);
+
+  deepEqual(stats.posts, [
+    {
+      nbytes: 43, // 43 for the first post
+      commit: false,
+      batch: "true",
+      headers: [["x-if-unmodified-since", time]],
+    },{
+      nbytes: 22,
+      commit: false, // we know we aren't in a batch, so never commit.
+      batch: null,
+      headers: [["x-if-unmodified-since", time + 100]],
+    }
+  ]);
+  equal(pq.lastModified, time + 200);
+
+  run_next_test();
+});
+
+// Batch tests.
+
+// Test making a single post when batch semantics are in place.
+add_test(function test_single_batch() {
+  let config = {
+    max_post_bytes: 1000,
+    max_post_records: 100,
+    max_batch_bytes: 2000,
+    max_batch_records: 200,
+  }
+  const time = 11111111;
+  function* responseGenerator() {
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': time, 'x-weave-timestamp': time + 100 },
+    };
+  }
+
+  let { pq, stats } = makePostQueue(config, time, responseGenerator());
+  ok(pq.enqueue(makeRecord(10)).enqueued);
+  pq.flush(true);
+
+  deepEqual(stats.posts, [
+    {
+      nbytes: 12, // expect our 10 byte record plus "[]" to wrap it.
+      commit: true, // we don't know if we have batch semantics, so committed.
+      batch: "true",
+      headers: [["x-if-unmodified-since", time]],
+    }
+  ]);
+
+  run_next_test();
+});
+
+// Test we do the right thing when we need to make multiple posts when there
+// are batch semantics in place.
+add_test(function test_max_post_bytes_batch() {
+  let config = {
+    max_post_bytes: 50,
+    max_post_records: 4,
+    max_batch_bytes: 5000,
+    max_batch_records: 100,
+  }
+
+  const time = 11111111;
+  function* responseGenerator() {
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': time, 'x-weave-timestamp': time + 100 },
+    };
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': time + 200, 'x-weave-timestamp': time + 200 },
+   };
+  }
+
+  let { pq, stats } = makePostQueue(config, time, responseGenerator());
+  ok(pq.enqueue(makeRecord(20)).enqueued); // total size now 22 bytes - "[" + record + "]"
+  ok(pq.enqueue(makeRecord(20)).enqueued); // total size now 43 bytes - "[" + record + "," + record + "]"
+  ok(pq.enqueue(makeRecord(20)).enqueued); // this will exceed our byte limit, so will be in the 2nd POST.
+  pq.flush(true);
+
+  deepEqual(stats.posts, [
+    {
+      nbytes: 43, // 43 for the first post
+      commit: false,
+      batch: "true",
+      headers: [['x-if-unmodified-since', time]],
+    },{
+      nbytes: 22,
+      commit: true,
+      batch: 1234,
+      headers: [['x-if-unmodified-since', time]],
+    }
+  ]);
+
+  equal(pq.lastModified, time + 200);
+
+  run_next_test();
+});
+
+// Test we do the right thing when the batch bytes limit is exceeded.
+add_test(function test_max_post_bytes_batch() {
+  let config = {
+    max_post_bytes: 50,
+    max_post_records: 20,
+    max_batch_bytes: 70,
+    max_batch_records: 100,
+  }
+
+  const time0 = 11111111;
+  const time1 = 22222222;
+  function* responseGenerator() {
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': time0, 'x-weave-timestamp': time0 + 100 },
+    };
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': time1, 'x-weave-timestamp': time1 },
+    };
+    yield { success: true, status: 202, obj: { batch: 5678 },
+            headers: { 'x-last-modified': time1, 'x-weave-timestamp': time1 + 100 },
+    };
+    yield { success: true, status: 202, obj: { batch: 5678 },
+            headers: { 'x-last-modified': time1 + 200, 'x-weave-timestamp': time1 + 200 },
+    };
+  }
+
+  let { pq, stats } = makePostQueue(config, time0, responseGenerator());
+  ok(pq.enqueue(makeRecord(20)).enqueued); // total size now 22 bytes - "[" + record + "]"
+  ok(pq.enqueue(makeRecord(20)).enqueued); // total size now 43 bytes - "[" + record + "," + record + "]"
+  // this will exceed our POST byte limit, so will be in the 2nd POST - but still in the first batch.
+  ok(pq.enqueue(makeRecord(20)).enqueued); // 22 bytes for 2nd post, 55 bytes in the batch.
+  // this will exceed our batch byte limit, so will be in a new batch.
+  ok(pq.enqueue(makeRecord(20)).enqueued); // 22 bytes in 3rd post/2nd batch
+  ok(pq.enqueue(makeRecord(20)).enqueued); // 43 bytes in 3rd post/2nd batch
+  // This will exceed POST byte limit, so will be in the 4th post, part of the 2nd batch.
+  ok(pq.enqueue(makeRecord(20)).enqueued); // 22 bytes for 4th post/2nd batch
+  pq.flush(true);
+
+  deepEqual(stats.posts, [
+    {
+      nbytes: 43, // 43 for the first post
+      commit: false,
+      batch: "true",
+      headers: [['x-if-unmodified-since', time0]],
+    },{
+      // second post of 22 bytes in the first batch, committing it.
+      nbytes: 22,
+      commit: true,
+      batch: 1234,
+      headers: [['x-if-unmodified-since', time0]],
+    }, {
+      // 3rd post of 43 bytes in a new batch, not yet committing it.
+      nbytes: 43,
+      commit: false,
+      batch: "true",
+      headers: [['x-if-unmodified-since', time1]],
+    },{
+      // 4th post of 22 bytes in second batch, committing it.
+      nbytes: 22,
+      commit: true,
+      batch: 5678,
+      headers: [['x-if-unmodified-since', time1]],
+    },
+  ]);
+
+  equal(pq.lastModified, time1 + 200);
+
+  run_next_test();
+});
+
+// Test we split up the posts when we exceed the record limit when batch semantics
+// are in place.
+add_test(function test_max_post_bytes_batch() {
+  let config = {
+    max_post_bytes: 1000,
+    max_post_records: 2,
+    max_batch_bytes: 5000,
+    max_batch_records: 100,
+  }
+
+  const time = 11111111;
+  function* responseGenerator() {
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': time, 'x-weave-timestamp': time + 100 },
+    };
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': time + 200, 'x-weave-timestamp': time + 200 },
+   };
+  }
+
+  let { pq, stats } = makePostQueue(config, time, responseGenerator());
+  ok(pq.enqueue(makeRecord(20)).enqueued); // total size now 22 bytes - "[" + record + "]"
+  ok(pq.enqueue(makeRecord(20)).enqueued); // total size now 43 bytes - "[" + record + "," + record + "]"
+  ok(pq.enqueue(makeRecord(20)).enqueued); // will exceed record limit, so will be in 2nd post.
+  pq.flush(true);
+
+  deepEqual(stats.posts, [
+    {
+      nbytes: 43, // 43 for the first post
+      commit: false,
+      batch: "true",
+      headers: [['x-if-unmodified-since', time]],
+    },{
+      nbytes: 22,
+      commit: true,
+      batch: 1234,
+      headers: [['x-if-unmodified-since', time]],
+    }
+  ]);
+
+  equal(pq.lastModified, time + 200);
+
+  run_next_test();
+});
+
+// Test that a single huge record fails to enqueue
+add_test(function test_huge_record() {
+  let config = {
+    max_post_bytes: 50,
+    max_post_records: 100,
+    max_batch_bytes: 5000,
+    max_batch_records: 100,
+  }
+
+  const time = 11111111;
+  function* responseGenerator() {
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': time, 'x-weave-timestamp': time + 100 },
+    };
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': time + 200, 'x-weave-timestamp': time + 200 },
+   };
+  }
+
+  let { pq, stats } = makePostQueue(config, time, responseGenerator());
+  ok(pq.enqueue(makeRecord(20)).enqueued);
+
+  let { enqueued, error } = pq.enqueue(makeRecord(1000));
+  ok(!enqueued);
+  notEqual(error, undefined);
+
+  // make sure that we keep working, skipping the bad record entirely
+  // (handling the error the queue reported is left up to caller)
+  ok(pq.enqueue(makeRecord(20)).enqueued);
+  ok(pq.enqueue(makeRecord(20)).enqueued);
+
+  pq.flush(true);
+
+  deepEqual(stats.posts, [
+    {
+      nbytes: 43, // 43 for the first post
+      commit: false,
+      batch: "true",
+      headers: [['x-if-unmodified-since', time]],
+    },{
+      nbytes: 22,
+      commit: true,
+      batch: 1234,
+      headers: [['x-if-unmodified-since', time]],
+    }
+  ]);
+
+  equal(pq.lastModified, time + 200);
+
+  run_next_test();
+});
+
+// Test we do the right thing when the batch record limit is exceeded.
+add_test(function test_max_records_batch() {
+  let config = {
+    max_post_bytes: 1000,
+    max_post_records: 3,
+    max_batch_bytes: 10000,
+    max_batch_records: 5,
+  }
+
+  const time0 = 11111111;
+  const time1 = 22222222;
+  function* responseGenerator() {
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': time0, 'x-weave-timestamp': time0 + 100 },
+    };
+    yield { success: true, status: 202, obj: { batch: 1234 },
+            headers: { 'x-last-modified': time1, 'x-weave-timestamp': time1 },
+    };
+    yield { success: true, status: 202, obj: { batch: 5678 },
+            headers: { 'x-last-modified': time1, 'x-weave-timestamp': time1 + 100 },
+    };
+    yield { success: true, status: 202, obj: { batch: 5678 },
+            headers: { 'x-last-modified': time1 + 200, 'x-weave-timestamp': time1 + 200 },
+    };
+  }
+
+  let { pq, stats } = makePostQueue(config, time0, responseGenerator());
+
+  ok(pq.enqueue(makeRecord(20)).enqueued);
+  ok(pq.enqueue(makeRecord(20)).enqueued);
+  ok(pq.enqueue(makeRecord(20)).enqueued);
+
+  ok(pq.enqueue(makeRecord(20)).enqueued);
+  ok(pq.enqueue(makeRecord(20)).enqueued);
+
+  ok(pq.enqueue(makeRecord(20)).enqueued);
+  ok(pq.enqueue(makeRecord(20)).enqueued);
+  ok(pq.enqueue(makeRecord(20)).enqueued);
+
+  ok(pq.enqueue(makeRecord(20)).enqueued);
+
+  pq.flush(true);
+
+  deepEqual(stats.posts, [
+    { // 3 records
+      nbytes: 64,
+      commit: false,
+      batch: "true",
+      headers: [['x-if-unmodified-since', time0]],
+    },{ // 2 records -- end batch1
+      nbytes: 43,
+      commit: true,
+      batch: 1234,
+      headers: [['x-if-unmodified-since', time0]],
+    }, { // 3 records
+      nbytes: 64,
+      commit: false,
+      batch: "true",
+      headers: [['x-if-unmodified-since', time1]],
+    },{ // 1 record -- end batch2
+      nbytes: 22,
+      commit: true,
+      batch: 5678,
+      headers: [['x-if-unmodified-since', time1]],
+    },
+  ]);
+
+  equal(pq.lastModified, time1 + 200);
+
+  run_next_test();
+});
\ No newline at end of file
--- a/services/sync/tests/unit/test_syncengine_sync.js
+++ b/services/sync/tests/unit/test_syncengine_sync.js
@@ -1426,29 +1426,41 @@ add_task(function *test_uploadOutgoing_f
     do_check_eq(engine._tracker.changedIDs['scotsman'], SCOTSMAN_CHANGED);
     do_check_eq(engine._tracker.changedIDs['peppercorn'], PEPPERCORN_CHANGED);
 
   } finally {
     yield promiseClean(server);
   }
 });
 
-
+/* A couple of "functional" tests to ensure we split records into appropriate
+   POST requests. More comprehensive unit-tests for this "batching" are in
+   test_postqueue.js.
+*/
 add_test(function test_uploadOutgoing_MAX_UPLOAD_RECORDS() {
   _("SyncEngine._uploadOutgoing uploads in batches of MAX_UPLOAD_RECORDS");
 
   Service.identity.username = "foo";
   let collection = new ServerCollection();
 
   // Let's count how many times the client posts to the server
   var noOfUploads = 0;
   collection.post = (function(orig) {
-    return function() {
+    return function(data, request) {
+      // This test doesn't arrange for batch semantics - so we expect the
+      // first request to come in with batch=true and the others to have no
+      // batch related headers at all (as the first response did not provide
+      // a batch ID)
+      if (noOfUploads == 0) {
+        do_check_eq(request.queryString, "batch=true");
+      } else {
+        do_check_eq(request.queryString, "");
+      }
       noOfUploads++;
-      return orig.apply(this, arguments);
+      return orig.call(this, data, request);
     };
   }(collection.post));
 
   // Create a bunch of records (and server side handlers)
   let engine = makeRotaryEngine();
   for (var i = 0; i < 234; i++) {
     let id = 'record-no-' + i;
     engine._store.items[id] = "Record No. " + i;
@@ -1483,104 +1495,49 @@ add_test(function test_uploadOutgoing_MA
     // Ensure that the uploads were performed in batches of MAX_UPLOAD_RECORDS.
     do_check_eq(noOfUploads, Math.ceil(234/MAX_UPLOAD_RECORDS));
 
   } finally {
     cleanAndGo(server);
   }
 });
 
-add_test(function test_uploadOutgoing_MAX_UPLOAD_BYTES() {
-  _("SyncEngine._uploadOutgoing uploads in batches of MAX_UPLOAD_BYTES");
+add_test(function test_uploadOutgoing_largeRecords() {
+  _("SyncEngine._uploadOutgoing throws on records larger than MAX_UPLOAD_BYTES");
 
   Service.identity.username = "foo";
   let collection = new ServerCollection();
 
-  // Let's count how many times the client posts to the server
-  let uploadCounts = [];
-  collection.post = (function(orig) {
-    return function(data) {
-      uploadCounts.push(JSON.parse(data).length);
-      return orig.call(this, data);
-    };
-  }(collection.post));
-
   let engine = makeRotaryEngine();
 
-  // A helper function that calculates the overhead of a record as uploaded
-  // to the server - it returns the size of a record with an empty string.
-  // This is so we can calculate exactly how many records we can fit into a
-  // batch (ie, we expect the record size that's actually uploaded to be the
-  // result of this function + the length of the data)
-  let calculateRecordOverhead = function() {
-    engine._store.items["string-no-x"] = "";
-    let x = engine._createRecord("string-no-x");
-    x.encrypt(Service.collectionKeys.keyForCollection(engine.name));
-    delete engine._store.items["string-no-x"];
-    return JSON.stringify(x).length;
-  }
-
-  let allIds = [];
-  // Create a bunch of records (and server side handlers) - make 20 that will
-  // fit inside our byte limit.
-  let fullItemSize = (MAX_UPLOAD_BYTES - 2) / 20;
-  // fullItemSize includes the "," between records and quote characters (as we
-  // will use strings)
-  let itemSize = fullItemSize - calculateRecordOverhead() - (3 * 20);
-  // Add 21 of this size - the first 20 should fit in the first batch.
-  for (let i = 0; i < 21; i++) {
-    let id = 'string-no-' + i;
-    engine._store.items[id] = "X".repeat(itemSize);
-    engine._tracker.addChangedID(id, 0);
-    collection.insert(id);
-    allIds.push(id);
-  }
-  // Now a single large item that's greater than MAX_UPLOAD_BYTES. This should
-  // cause the 1 item that didn't fit in the previous batch to be uploaded
-  // by itself, then this large one by itself.
   engine._store.items["large-item"] = "Y".repeat(MAX_UPLOAD_BYTES*2);
   engine._tracker.addChangedID("large-item", 0);
   collection.insert("large-item");
-    allIds.push("large-item");
-  // And a few more small items - these should all be uploaded together.
-  for (let i = 0; i < 20; i++) {
-    let id = 'small-no-' + i;
-    engine._store.items[id] = "ZZZZ";
-    engine._tracker.addChangedID(id, 0);
-    collection.insert(id);
-    allIds.push(id);
-  }
+
 
   let meta_global = Service.recordManager.set(engine.metaURL,
                                               new WBORecord(engine.metaURL));
   meta_global.payload.engines = {rotary: {version: engine.version,
                                          syncID: engine.syncID}};
 
   let server = sync_httpd_setup({
       "/1.1/foo/storage/rotary": collection.handler()
   });
 
   let syncTesting = new SyncTestingInfrastructure(server);
 
   try {
-
-    // Confirm initial environment.
-    do_check_eq(uploadCounts.length, 0);
-
     engine._syncStartup();
-    engine._uploadOutgoing();
-
-    // Ensure all records have been uploaded.
-    for (let checkId of allIds) {
-      do_check_true(!!collection.payload(checkId));
+    let error = null;
+    try {
+      engine._uploadOutgoing();
+    } catch (e) {
+      error = e;
     }
-
-    // Ensure that the uploads were performed in the batch sizes we expect.
-    Assert.deepEqual(uploadCounts, [20, 1, 1, 20]);
-
+    ok(!!error);
   } finally {
     cleanAndGo(server);
   }
 });
 
 
 add_test(function test_syncFinish_noDelete() {
   _("SyncEngine._syncFinish resets tracker's score");
--- a/services/sync/tests/unit/xpcshell.ini
+++ b/services/sync/tests/unit/xpcshell.ini
@@ -176,16 +176,17 @@ skip-if = debug
 [test_prefs_store.js]
 support-files = prefs_test_prefs_store.js
 [test_prefs_tracker.js]
 [test_tab_engine.js]
 [test_tab_store.js]
 [test_tab_tracker.js]
 
 [test_warn_on_truncated_response.js]
+[test_postqueue.js]
 
 # FxA migration
 [test_fxa_migration.js]
 
 # Synced tabs.
 [test_syncedtabs.js]
 
 [test_telemetry.js]