Bug 591126 - Handle upload interruption gracefully [r=mconnor]
authorPhilipp von Weitershausen <philipp@weitershausen.de>
Tue, 23 Nov 2010 21:21:31 -0800
changeset 58408 6672cd9ddd0bd06cba7a94b0f8347a1d0897e521
parent 58407 780ccd59ff460df8de3d6404cca2f75c36b50943
child 58409 9d7a96749d81820085f3a2598df723409285d18b
push id1
push usershaver@mozilla.com
push dateTue, 04 Jan 2011 17:58:04 +0000
reviewersmconnor
bugs591126
Bug 591126 - Handle upload interruption gracefully [r=mconnor]
services/sync/modules/engines.js
services/sync/modules/service.js
services/sync/tests/unit/test_syncengine_sync.js
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -431,33 +431,41 @@ SyncEngine.prototype = {
         resp.failureCode = ENGINE_METARECORD_UPLOAD_FAIL;
         throw resp;
       }
 
       // Cache the cryto meta that we just put on the server
       CryptoMetas.set(meta.uri, meta);
     }
 
-    this._maybeLastSyncLocal = Date.now();
+    // Save objects that need to be uploaded in this._modified. We also save
+    // the timestamp of this fetch in this.lastSyncLocal. As we successfully
+    // upload objects we remove them from this._modified. If an error occurs
+    // or any objects fail to upload, they will remain in this._modified. At
+    // the end of a sync, or after an error, we add all objects remaining in
+    // this._modified to the tracker.
+    this.lastSyncLocal = Date.now();
     if (this.lastSync) {
       this._modified = this.getChangedIDs();
-      // Clear the tracker now but remember the changed IDs in case we
-      // need to roll back.
-      this._backupChangedIDs = this._tracker.changedIDs;
-      this._tracker.clearChangedIDs();
     } else {
       // Mark all items to be uploaded, but treat them as changed from long ago
       this._log.debug("First sync, uploading all items");
       this._modified = {};
       for (let id in this._store.getAllIDs())
         this._modified[id] = 0;
     }
-
-    let outnum = [i for (i in this._modified)].length;
-    this._log.info(outnum + " outgoing items pre-reconciliation");
+    // Clear the tracker now. If the sync fails we'll add the ones we failed
+    // to upload back.
+    this._tracker.clearChangedIDs();
+ 
+    // Array of just the IDs from this._modified. This is what we iterate over
+    // so we can modify this._modified during the iteration.
+    this._modifiedIDs = [id for (id in this._modified)];
+    this._log.info(this._modifiedIDs.length +
+                   " outgoing items pre-reconciliation");
 
     // Keep track of what to delete at the end of sync
     this._delete = {};
   },
 
   // Process incoming records
   _processIncoming: function SyncEngine__processIncoming() {
     this._log.trace("Downloading & applying server changes");
@@ -617,17 +625,17 @@ SyncEngine.prototype = {
   _reconcile: function SyncEngine__reconcile(item) {
     if (this._log.level <= Log4Moz.Level.Trace)
       this._log.trace("Incoming: " + item);
 
     this._log.trace("Reconcile step 1: Check for conflicts");
     if (item.id in this._modified) {
       // If the incoming and local changes are the same, skip
       if (this._isEqual(item)) {
-        this._tracker.removeChangedID(item.id);
+        delete this._modified[item.id];
         return false;
       }
 
       // Records differ so figure out which to take
       let recordAge = Resource.serverTime - item.modified;
       let localAge = Date.now() / 1000 - this._modified[item.id];
       this._log.trace("Record age vs local age: " + [recordAge, localAge]);
 
@@ -649,56 +657,55 @@ SyncEngine.prototype = {
       this._handleDupe(item, dupeId);
 
     // Apply the incoming item (now that the dupe is the right id)
     return true;
   },
 
   // Upload outgoing records
   _uploadOutgoing: function SyncEngine__uploadOutgoing() {
-    let failed = {};
-    let outnum = [i for (i in this._modified)].length;
-    if (outnum) {
-      this._log.trace("Preparing " + outnum + " outgoing records");
+    if (this._modifiedIDs.length) {
+      this._log.trace("Preparing " + this._modifiedIDs.length +
+                      " outgoing records");
 
       // collection we'll upload
       let up = new Collection(this.engineURL);
       let count = 0;
 
       // Upload what we've got so far in the collection
       let doUpload = Utils.bind2(this, function(desc) {
-        this._log.info("Uploading " + desc + " of " + outnum + " records");
+        this._log.info("Uploading " + desc + " of " +
+                       this._modifiedIDs.length + " records");
         let resp = up.post();
         if (!resp.success) {
           this._log.debug("Uploading records failed: " + resp);
           resp.failureCode = ENGINE_UPLOAD_FAIL;
           throw resp;
         }
 
         // Update server timestamp from the upload.
         let modified = resp.headers["x-weave-timestamp"];
         if (modified > this.lastSync)
           this.lastSync = modified;
 
-        // Remember changed IDs and timestamp of failed items so we
-        // can mark them changed again.
-        let failed_ids = [];
-        for (let id in resp.obj.failed) {
-          failed[id] = this._modified[id];
-          failed_ids.push(id);
-        }
+        let failed_ids = [id for (id in resp.obj.failed)];
         if (failed_ids.length)
           this._log.debug("Records that will be uploaded again because "
                           + "the server couldn't store them: "
                           + failed_ids.join(", "));
 
+        // Clear successfully uploaded objects.
+        for each (let id in resp.obj.success) {
+          delete this._modified[id];
+        }
+
         up.clearRecords();
       });
 
-      for (let id in this._modified) {
+      for each (let id in this._modifiedIDs) {
         try {
           let out = this._createRecord(id);
           if (this._log.level <= Log4Moz.Level.Trace)
             this._log.trace("Outgoing: " + out);
 
           out.encrypt(ID.get("WeaveCryptoID"));
           up.pushData(out);
         }
@@ -712,26 +719,16 @@ SyncEngine.prototype = {
 
         Sync.sleep(0);
       }
 
       // Final upload
       if (count % MAX_UPLOAD_RECORDS > 0)
         doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all");
     }
-
-    // Update local timestamp.
-    this.lastSyncLocal = this._maybeLastSyncLocal;
-    delete this._modified;
-    delete this._backupChangedIDs;
-
-    // Mark failed WBOs as changed again so they are reuploaded next time.
-    for (let id in failed) {
-      this._tracker.addChangedID(id, failed[id]);
-    }
   },
 
   // Any cleanup necessary.
   // Save the current snapshot so as to calculate changes at next sync
   _syncFinish: function SyncEngine__syncFinish() {
     this._log.trace("Finishing up sync");
     this._tracker.resetScore();
 
@@ -753,38 +750,38 @@ SyncEngine.prototype = {
         while (val.length > 0) {
           doDelete(key, val.slice(0, 100));
           val = val.slice(100);
         }
       }
     }
   },
 
-  _rollback: function _rollback() {
-    if (!this._backupChangedIDs)
+  _syncCleanup: function _syncCleanup() {
+    if (!this._modified)
       return;
 
-    for (let [id, when] in Iterator(this._backupChangedIDs)) {
+    // Mark failed WBOs as changed again so they are reuploaded next time.
+    for (let [id, when] in Iterator(this._modified)) {
       this._tracker.addChangedID(id, when);
     }
+    delete this._modified;
+    delete this._modifiedIDs;
   },
 
   _sync: function SyncEngine__sync() {
     try {
       this._syncStartup();
       Observers.notify("weave:engine:sync:status", "process-incoming");
       this._processIncoming();
       Observers.notify("weave:engine:sync:status", "upload-outgoing");
       this._uploadOutgoing();
       this._syncFinish();
-    }
-    catch (e) {
-      this._rollback();
-      this._log.warn("Sync failed");
-      throw e;
+    } finally {
+      this._syncCleanup();
     }
   },
 
   canDecrypt: function canDecrypt() {
     // Report failure even if there's nothing to decrypt
     let canDecrypt = false;
 
     // Fetch the most recently uploaded record and try to decrypt it
--- a/services/sync/modules/service.js
+++ b/services/sync/modules/service.js
@@ -1567,17 +1567,17 @@ WeaveSvc.prototype = {
       if (e.status == 401 && this._updateCluster())
         return this._syncEngine(engine);
 
       this._checkServerError(e);
 
       Status.engines = [engine.name, e.failureCode || ENGINE_UNKNOWN_FAIL];
 
       this._syncError = true;
-      this._log.debug(Utils.exceptionStr(e));
+      this._log.debug(engine.name + " failed: " + Utils.exceptionStr(e));
       return true;
     }
     finally {
       // If this engine has more to fetch, remember that globally
       if (engine.toFetch != null && engine.toFetch.length > 0)
         Status.partial = true;
     }
   },
--- a/services/sync/tests/unit/test_syncengine_sync.js
+++ b/services/sync/tests/unit/test_syncengine_sync.js
@@ -186,17 +186,16 @@ function test_syncStartup_emptyOrOutdate
 
     // The meta/global WBO has been filled with data about the engine
     let engineData = metaGlobal.payload.engines["steam"];
     do_check_eq(engineData.version, engine.version);
     do_check_eq(engineData.syncID, engine.syncID);
 
     // Sync was reset and server data was wiped
     do_check_eq(engine.lastSync, 0);
-    do_check_eq(engine.lastSyncLocal, 0);
     do_check_eq(collection.wbos.flying.payload, undefined);
     do_check_eq(collection.wbos.scotsman.payload, undefined);
 
     // Bulk key was uploaded
     do_check_true(!!crypto_steam.payload);
     do_check_true(!!crypto_steam.data.keyring);
 
   } finally {
@@ -247,17 +246,16 @@ function test_syncStartup_metaGet404() {
     do_check_true(!!collection.wbos.scotsman.payload);
 
     engine.lastSync = Date.now() / 1000;
     engine.lastSyncLocal = Date.now();
     engine._syncStartup();
 
     _("Sync was reset and server data was wiped");
     do_check_eq(engine.lastSync, 0);
-    do_check_eq(engine.lastSyncLocal, 0);
     do_check_eq(collection.wbos.flying.payload, undefined);
     do_check_eq(collection.wbos.scotsman.payload, undefined);
 
     _("New bulk key was uploaded");
     let key = crypto_steam.data.keyring["../keys/pubkey"];
     do_check_eq(key.wrapped, "fake-symmetric-key-0");
     do_check_eq(key.hmac, "fake-symmetric-key-0                                            ");
 
@@ -367,17 +365,16 @@ function test_syncStartup_syncIDMismatch
     engine.lastSyncLocal = Date.now();
     engine._syncStartup();
 
     // The engine has assumed the server's syncID 
     do_check_eq(engine.syncID, 'foobar');
 
     // Sync was reset
     do_check_eq(engine.lastSync, 0);
-    do_check_eq(engine.lastSyncLocal, 0);
 
   } finally {
     server.stop(do_test_finished);
     Svc.Prefs.resetBranch("");
     Records.clearCache();
     CryptoMetas.clearCache();
     syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
   }
@@ -433,17 +430,16 @@ function test_syncStartup_badKeyWipesSer
     do_check_true(!!collection.wbos.scotsman.payload);
 
     engine.lastSync = Date.now() / 1000;
     engine.lastSyncLocal = Date.now();
     engine._syncStartup();
 
     // Sync was reset and server data was wiped
     do_check_eq(engine.lastSync, 0);
-    do_check_eq(engine.lastSyncLocal, 0);
     do_check_eq(collection.wbos.flying.payload, undefined);
     do_check_eq(collection.wbos.scotsman.payload, undefined);
 
     // New bulk key was uploaded
     key = crypto_steam.data.keyring["../keys/pubkey"];
     do_check_eq(key.wrapped, "fake-symmetric-key-1");
     do_check_eq(key.hmac, "fake-symmetric-key-1                                            ");
 
@@ -872,18 +868,18 @@ function test_uploadOutgoing_failed() {
 
     // Confirm initial environment
     do_check_eq(engine.lastSyncLocal, 0);
     do_check_eq(collection.wbos.flying.payload, undefined);
     do_check_eq(engine._tracker.changedIDs['flying'], FLYING_CHANGED);
     do_check_eq(engine._tracker.changedIDs['scotsman'], SCOTSMAN_CHANGED);
     do_check_eq(engine._tracker.changedIDs['peppercorn'], PEPPERCORN_CHANGED);
 
-    engine._syncStartup();
-    engine._uploadOutgoing();
+    engine.enabled = true;
+    engine.sync();
 
     // Local timestamp has been set.
     do_check_true(engine.lastSyncLocal > 0);
 
     // Ensure the 'flying' record has been uploaded and is no longer marked.
     do_check_true(!!collection.wbos.flying.payload);
     do_check_eq(engine._tracker.changedIDs['flying'], undefined);
 
@@ -1090,72 +1086,87 @@ function test_syncFinish_deleteLotsInBat
   } finally {
     server.stop(do_test_finished);
     Svc.Prefs.resetBranch("");
     Records.clearCache();
     syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
   }
 }
 
-function test_sync_rollback() {
-  _("SyncEngine.sync() rolls back tracker's changedIDs when syncing fails.");
+
+function test_sync_partialUpload() {
+  _("SyncEngine.sync() keeps changedIDs that couldn't be uploaded.");
+
   Svc.Prefs.set("clusterURL", "http://localhost:8080/");
   Svc.Prefs.set("username", "foo");
 
-  // Set up a server without a "steam" collection handler, so sync will fail.
   let crypto_steam = new ServerWBO('steam');
+  let collection = new ServerCollection();
   let server = sync_httpd_setup({
-      "/1.0/foo/storage/crypto/steam": crypto_steam.handler()
+      "/1.0/foo/storage/crypto/steam": crypto_steam.handler(),
+      "/1.0/foo/storage/steam": collection.handler()
   });
   do_test_pending();
   createAndUploadKeypair();
   createAndUploadSymKey("http://localhost:8080/1.0/foo/storage/crypto/steam");
 
   let engine = makeSteamEngine();
   engine.lastSync = 123; // needs to be non-zero so that tracker is queried
   engine.lastSyncLocal = 456;
-  engine._store.items = {flying: "LNER Class A3 4472",
-                         scotsman: "Flying Scotsman",
-                         peppercorn: "Peppercorn Class"};
 
-  // Mark these records as changed 
-  const FLYING_CHANGED = 12345;
-  const SCOTSMAN_CHANGED = 23456;
-  const PEPPERCORN_CHANGED = 34567;
-  engine._tracker.addChangedID('flying', FLYING_CHANGED);
-  engine._tracker.addChangedID('scotsman', SCOTSMAN_CHANGED);
-  engine._tracker.addChangedID('peppercorn', PEPPERCORN_CHANGED);
+  // Let the third upload fail completely
+  var noOfUploads = 0;
+  collection.post = (function(orig) {
+    return function() {
+      if (noOfUploads == 2)
+        throw "FAIL!";
+      noOfUploads++;
+      return orig.apply(this, arguments);
+    };
+  }(collection.post));
+
+  // Create a bunch of records (and server side handlers)
+  for (let i = 0; i < 234; i++) {
+    let id = 'record-no-' + i;
+    engine._store.items[id] = "Record No. " + i;
+    engine._tracker.addChangedID(id, i);
+    // Let two items in the first upload batch fail.
+    if ((i != 23) && (i != 42))
+      collection.wbos[id] = new ServerWBO(id);
+  }
 
   let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
   meta_global.payload.engines = {steam: {version: engine.version,
                                          syncID: engine.syncID}};
 
-
   try {
 
-    // Confirm initial environment
-    do_check_eq(engine.lastSyncLocal, 456);
-    do_check_eq(engine._tracker.changedIDs['flying'], FLYING_CHANGED);
-    do_check_eq(engine._tracker.changedIDs['scotsman'], SCOTSMAN_CHANGED);
-    do_check_eq(engine._tracker.changedIDs['peppercorn'], PEPPERCORN_CHANGED);
-
     engine.enabled = true;
     let error;
     try {
       engine.sync();
     } catch (ex) {
       error = ex;
     }
     do_check_true(!!error);
 
-    // Verify that the tracker state and local timestamp has been rolled back.
-    do_check_eq(engine.lastSyncLocal, 456);
-    do_check_eq(engine._tracker.changedIDs['flying'], FLYING_CHANGED);
-    do_check_eq(engine._tracker.changedIDs['scotsman'], SCOTSMAN_CHANGED);
-    do_check_eq(engine._tracker.changedIDs['peppercorn'], PEPPERCORN_CHANGED);
+    // The timestamp has been updated.
+    do_check_true(engine.lastSyncLocal > 456);
+
+    for (let i = 0; i < 234; i++) {
+      let id = 'record-no-' + i;
+      // Ensure failed records are back in the tracker:
+      // * records no. 23 and 42 were rejected by the server,
+      // * records no. 200 and higher couldn't be uploaded because we failed
+      //   hard on the 3rd upload.
+      if ((i == 23) || (i == 42) || (i >= 200))
+        do_check_eq(engine._tracker.changedIDs[id], i);
+      else
+        do_check_false(id in engine._tracker.changedIDs);
+    }
 
   } finally {
     server.stop(do_test_finished);
     Svc.Prefs.resetBranch("");
     Records.clearCache();
     CryptoMetas.clearCache();
     syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
   }
@@ -1238,12 +1249,12 @@ function run_test() {
   test_processIncoming_reconcile();
   test_processIncoming_mobile_batchSize();
   test_uploadOutgoing_toEmptyServer();
   test_uploadOutgoing_failed();
   test_uploadOutgoing_MAX_UPLOAD_RECORDS();
   test_syncFinish_noDelete();
   test_syncFinish_deleteByIds();
   test_syncFinish_deleteLotsInBatches();
-  test_sync_rollback();
+  test_sync_partialUpload();
   test_canDecrypt_noCryptoMeta();
   test_canDecrypt_true();
 }