Bug 656513: part 1: provide a way for record handlers to abort incoming sync. r=philiKON
authorRichard Newman <rnewman@mozilla.com>
Wed, 15 Jun 2011 00:03:32 -0700
changeset 71445 769eb1758cdafb41ec86950daf71003cd987254e
parent 71104 0a409e965d390996b0f58424c6d5fc3935857435
child 71446 bb9de4e7d77aa649a6f0734f7e9bbecc61110070
push idunknown
push userunknown
push dateunknown
reviewersphiliKON
bugs656513
milestone7.0a1
Bug 656513: part 1: provide a way for record handlers to abort incoming sync. r=philiKON
services/sync/modules/engines.js
services/sync/tests/unit/test_engine_abort.js
services/sync/tests/unit/xpcshell.ini
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -206,16 +206,21 @@ Store.prototype = {
     Async.waitForSyncCallback(cb);
   },
 
   applyIncomingBatch: function applyIncomingBatch(records) {
     let failed = [];
     for each (let record in records) {
       try {
         this.applyIncoming(record);
+      } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
+        // This kind of exception should have a 'cause' attribute, which is an
+        // originating exception.
+        // ex.cause will carry its stack with it when rethrown.
+        throw ex.cause;
       } catch (ex) {
         this._log.warn("Failed to apply incoming record " + record.id);
         this._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
         failed.push(record.id);
       }
     };
     return failed;
   },
@@ -357,16 +362,20 @@ function Engine(name) {
   this._tracker; // initialize tracker to load previously changed IDs
   this._log.debug("Engine initialized");
 }
 Engine.prototype = {
   // _storeObj, and _trackerObj should to be overridden in subclasses
   _storeObj: Store,
   _trackerObj: Tracker,
 
+  // Local 'constant'.
+  // Signal to the engine that processing further records is pointless.
+  eEngineAbortApplyIncoming: "error.engine.abort.applyincoming",
+
   get prefName() this.name,
   get enabled() Svc.Prefs.get("engine." + this.prefName, false),
   set enabled(val) Svc.Prefs.set("engine." + this.prefName, !!val),
 
   get score() this._tracker.score,
 
   get _store() {
     let store = new this._storeObj(this.Name);
@@ -654,19 +663,32 @@ SyncEngine.prototype = {
     let handled = [];
     let applyBatch = [];
     let failed = [];
     let failedInPreviousSync = this.previousFailed;
     let fetchBatch = Utils.arrayUnion(this.toFetch, failedInPreviousSync);
     // Reset previousFailed for each sync since previously failed items may not fail again.
     this.previousFailed = [];
 
+    // Used (via exceptions) to allow the record handler/reconciliation/etc.
+    // methods to signal that they would like processing of incoming records to
+    // cease.
+    let aborting = undefined;
+
     function doApplyBatch() {
       this._tracker.ignoreAll = true;
-      failed = failed.concat(this._store.applyIncomingBatch(applyBatch));
+      try {
+        failed = failed.concat(this._store.applyIncomingBatch(applyBatch));
+      } catch (ex) {
+        // Catch any error that escapes from applyIncomingBatch. At present
+        // those will all be abort events.
+        this._log.warn("Got exception " + Utils.exceptionStr(ex) +
+                       ", aborting processIncoming.");
+        aborting = ex;
+      }
       this._tracker.ignoreAll = false;
       applyBatch = [];
     }
 
     function doApplyBatchAndPersistFailed() {
       // Apply remaining batch.
       if (applyBatch.length) {
         doApplyBatch.call(this);
@@ -679,16 +701,20 @@ SyncEngine.prototype = {
         failed = [];
       }
     }
 
     // Not binding this method to 'this' for performance reasons. It gets
     // called for every incoming record.
     let self = this;
     newitems.recordHandler = function(item) {
+      if (aborting) {
+        return;
+      }
+
       // Grab a later last modified if possible
       if (self.lastModified == null || item.modified > self.lastModified)
         self.lastModified = item.modified;
 
       // Track the collection for the WBO.
       item.collection = self.name;
       
       // Remember which records were processed
@@ -732,16 +758,20 @@ SyncEngine.prototype = {
         self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex));
         failed.push(item.id);
         return;
       }
 
       let shouldApply;
       try {
         shouldApply = self._reconcile(item);
+      } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
+        self._log.warn("Reconciliation failed: aborting incoming processing.");
+        failed.push(item.id);
+        aborting = ex.cause;
       } catch (ex) {
         self._log.warn("Failed to reconcile incoming record " + item.id);
         self._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
         failed.push(item.id);
         return;
       }
 
       if (shouldApply) {
@@ -761,16 +791,20 @@ SyncEngine.prototype = {
     // Only bother getting data from the server if there's new things
     if (this.lastModified == null || this.lastModified > this.lastSync) {
       let resp = newitems.get();
       doApplyBatchAndPersistFailed.call(this);
       if (!resp.success) {
         resp.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw resp;
       }
+
+      if (aborting) {
+        throw aborting;
+      }
     }
 
     // Mobile: check if we got the maximum that we requested; get the rest if so.
     if (handled.length == newitems.limit) {
       let guidColl = new Collection(this.engineURL);
       
       // Sort and limit so that on mobile we only get the last X records.
       guidColl.limit = this.downloadLimit;
@@ -799,17 +833,17 @@ SyncEngine.prototype = {
     }
 
     // Process any backlog of GUIDs.
     // At this point we impose an upper limit on the number of items to fetch
     // in a single request, even for desktop, to avoid hitting URI limits.
     batchSize = isMobile ? this.mobileGUIDFetchBatchSize :
                            this.guidFetchBatchSize;
 
-    while (fetchBatch.length) {
+    while (fetchBatch.length && !aborting) {
       // Reuse the original query, but get rid of the restricting params
       // and batch remaining records.
       newitems.limit = 0;
       newitems.newer = 0;
       newitems.ids = fetchBatch.slice(0, batchSize);
 
       // Reuse the existing record handler set earlier
       let resp = newitems.get();
@@ -823,16 +857,21 @@ SyncEngine.prototype = {
       fetchBatch = fetchBatch.slice(batchSize);
       this.toFetch = Utils.arraySub(this.toFetch, newitems.ids);
       this.previousFailed = Utils.arrayUnion(this.previousFailed, failed);
       if (failed.length) {
         count.failed += failed.length;
         this._log.debug("Records that failed to apply: " + failed);
       }
       failed = [];
+
+      if (aborting) {
+        throw aborting;
+      }
+
       if (this.lastSync < this.lastModified) {
         this.lastSync = this.lastModified;
       }
     }
 
     // Apply remaining items.
     doApplyBatchAndPersistFailed.call(this);
 
new file mode 100644
--- /dev/null
+++ b/services/sync/tests/unit/test_engine_abort.js
@@ -0,0 +1,64 @@
+Cu.import("resource://services-sync/engines.js");
+Cu.import("resource://services-sync/util.js");
+
+add_test(function test_processIncoming_abort() {
+  _("An abort exception, raised in applyIncoming, will abort _processIncoming.");
+  let syncTesting = new SyncTestingInfrastructure();
+  Svc.Prefs.set("clusterURL", "http://localhost:8080/");
+  Svc.Prefs.set("username", "foo");
+  generateNewKeys();
+
+  let engine = new RotaryEngine();
+
+  _("Create some server data.");
+  let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
+  meta_global.payload.engines = {rotary: {version: engine.version,
+                                          syncID: engine.syncID}};
+
+  let collection = new ServerCollection();
+  let id = Utils.makeGUID();
+  let payload = encryptPayload({id: id, denomination: "Record No. " + id});
+  collection.wbos[id] = new ServerWBO(id, payload);
+
+  let server = sync_httpd_setup({
+      "/1.1/foo/storage/rotary": collection.handler()
+  });
+
+  _("Fake applyIncoming to abort.");
+  engine._store.applyIncoming = function (record) {
+    let ex = {code: Engine.prototype.eEngineAbortApplyIncoming,
+              cause: "Nooo"};
+    _("Throwing: " + JSON.stringify(ex));
+    throw ex;
+  };
+  
+  _("Trying _processIncoming. It will throw after aborting.");
+  let err;
+  try {
+    engine._syncStartup();
+    engine._processIncoming();
+  } catch (ex) {
+    err = ex;
+  }
+
+  do_check_eq(err, "Nooo");
+  err = undefined;
+
+  _("Trying engine.sync(). It will abort without error.");
+  try {
+    // This will quietly fail.
+    engine.sync();
+  } catch (ex) {
+    err = ex;
+  }
+
+  do_check_eq(err, undefined);
+
+  server.stop(run_next_test);
+  Svc.Prefs.resetBranch("");
+  Records.clearCache();
+});
+
+function run_test() {
+  run_next_test();
+}
--- a/services/sync/tests/unit/xpcshell.ini
+++ b/services/sync/tests/unit/xpcshell.ini
@@ -18,16 +18,17 @@ tail =
 [test_bookmark_store.js]
 [test_bookmark_tracker.js]
 [test_clients_engine.js]
 [test_clients_escape.js]
 [test_collection_inc_get.js]
 [test_collections_recovery.js]
 [test_corrupt_keys.js]
 [test_engine.js]
+[test_engine_abort.js]
 [test_enginemanager.js]
 [test_forms_store.js]
 [test_forms_tracker.js]
 [test_history_engine.js]
 [test_history_store.js]
 [test_history_tracker.js]
 [test_jpakeclient.js]
 [test_keys.js]