Bug 615284 - Download chunking needs to be more resilient against app shutdowns. r=mconnor
authorPhilipp von Weitershausen <philipp@weitershausen.de>
Wed, 26 Jan 2011 21:34:31 -0800
changeset 61481 eff3225f51e95d6f610dd9a306de44cc3e9cd31d
parent 61480 1c9bdb06d07af6863735848255907f0f223ae4fb
child 61482 13c45cd92b477fb51d08c5b67c282af87f26bf8b
push id1
push userroot
push dateTue, 10 Dec 2013 15:46:25 +0000
reviewersmconnor
bugs615284
Bug 615284 - Download chunking needs to be more resilient against app shutdowns. r=mconnor
services/sync/modules/engines.js
services/sync/tests/unit/test_syncengine.js
services/sync/tests/unit/test_syncengine_sync.js
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -298,17 +298,16 @@ EngineManagerSvc.prototype = {
     }
     catch(ex) {
       let mesg = ex.message ? ex.message : ex;
       let name = engineObject || "";
       name = name.prototype || "";
       name = name.name || "";
 
       let out = "Could not initialize engine '" + name + "': " + mesg;
-      dump(out);
       this._log.error(out);
 
       return engineObject;
     }
   },
   unregister: function EngMgr_unregister(val) {
     let name = val;
     if (val instanceof Engine)
@@ -463,16 +462,17 @@ Engine.prototype = {
 
   wipeClient: function Engine_wipeClient() {
     this._notify("wipe-client", this.name, this._wipeClient)();
   }
 };
 
 function SyncEngine(name) {
   Engine.call(this, name || "SyncEngine");
+  this.loadToFetch();
 }
 SyncEngine.prototype = {
   __proto__: Engine.prototype,
   _recordObj: CryptoWrapper,
   version: 1,
 
   get storageURL() Svc.Prefs.get("clusterURL") + Svc.Prefs.get("storageAPI") +
     "/" + ID.get("WeaveID").username + "/storage/",
@@ -506,16 +506,32 @@ SyncEngine.prototype = {
   },
   resetLastSync: function SyncEngine_resetLastSync() {
     this._log.debug("Resetting " + this.name + " last sync time");
     Svc.Prefs.reset(this.name + ".lastSync");
     Svc.Prefs.set(this.name + ".lastSync", "0");
     this.lastSyncLocal = 0;
   },
 
+  get toFetch() this._toFetch,
+  set toFetch(val) {
+    this._toFetch = val;
+    Utils.delay(function () {
+      Utils.jsonSave("toFetch/" + this.name, this, val);
+    }, 0, this, "_toFetchDelay");
+  },
+
+  loadToFetch: function loadToFetch() {
+    // Initialize to empty if there's no file
+    this._toFetch = [];
+    Utils.jsonLoad("toFetch/" + this.name, this, function(toFetch) {
+      this._toFetch = toFetch;
+    });
+  },
+
   /*
    * lastSyncLocal is a timestamp in local time.
    */
   get lastSyncLocal() {
     return parseInt(Svc.Prefs.get(this.name + ".lastSyncLocal", "0"), 10);
   },
   set lastSyncLocal(value) {
     // Store as a string because pref can only store C longs as numbers.
@@ -678,59 +694,65 @@ SyncEngine.prototype = {
       let resp = newitems.get();
       if (!resp.success) {
         resp.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw resp;
       }
     }
 
     // Mobile: check if we got the maximum that we requested; get the rest if so.
-    let toFetch = [];
     if (handled.length == newitems.limit) {
       let guidColl = new Collection(this.engineURL);
       
       // Sort and limit so that on mobile we only get the last X records.
       guidColl.limit = this.downloadLimit;
       guidColl.newer = this.lastSync;
-      
+
       // index: Orders by the sortindex descending (highest weight first).
       guidColl.sort  = "index";
 
       let guids = guidColl.get();
       if (!guids.success)
         throw guids;
 
       // Figure out which guids weren't just fetched then remove any guids that
       // were already waiting and prepend the new ones
       let extra = Utils.arraySub(guids.obj, handled);
       if (extra.length > 0)
-        toFetch = extra.concat(Utils.arraySub(toFetch, extra));
+        this.toFetch = extra.concat(Utils.arraySub(this.toFetch, extra));
+    }
+
+    // Fast-foward the lastSync timestamp since we have stored the
+    // remaining items in toFetch.
+    if (this.lastSync < this.lastModified) {
+      this.lastSync = this.lastModified;
     }
 
     // Mobile: process any backlog of GUIDs
-    while (toFetch.length) {
+    while (this.toFetch.length) {
       // Reuse the original query, but get rid of the restricting params
+      // and batch remaining records.
       newitems.limit = 0;
       newitems.newer = 0;
-
-      // Get the first bunch of records and save the rest for later
-      newitems.ids = toFetch.slice(0, batchSize);
-      toFetch = toFetch.slice(batchSize);
+      newitems.ids = this.toFetch.slice(0, batchSize);
 
       // Reuse the existing record handler set earlier
       let resp = newitems.get();
       if (!resp.success) {
         resp.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw resp;
       }
+
+      // This batch was successfully applied.
+      this.toFetch = this.toFetch.slice(batchSize);
+      if (this.lastSync < this.lastModified) {
+        this.lastSync = this.lastModified;
+      }
     }
 
-    if (this.lastSync < this.lastModified)
-      this.lastSync = this.lastModified;
-
     this._log.info(["Records:", count.applied, "applied,", count.reconciled,
       "reconciled."].join(" "));
   },
 
   /**
    * Find a GUID of an item that is a duplicate of the incoming item but happens
    * to have a different GUID
    *
@@ -964,16 +986,17 @@ SyncEngine.prototype = {
       this._log.debug("Failed test decrypt: " + Utils.exceptionStr(ex));
     }
 
     return canDecrypt;
   },
 
   _resetClient: function SyncEngine__resetClient() {
     this.resetLastSync();
+    this.toFetch = [];
   },
 
   wipeServer: function wipeServer() {
     new Resource(this.engineURL).delete();
     this._resetClient();
   },
   
   handleHMACMismatch: function handleHMACMismatch() {
--- a/services/sync/tests/unit/test_syncengine.js
+++ b/services/sync/tests/unit/test_syncengine.js
@@ -1,10 +1,11 @@
 Cu.import("resource://services-sync/engines.js");
 Cu.import("resource://services-sync/util.js");
+Cu.import("resource://services-sync/ext/Sync.js");
 
 function makeSteamEngine() {
   return new SyncEngine('Steam');
 }
 
 var syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
 
 function test_url_attributes() {
@@ -64,30 +65,62 @@ function test_lastSync() {
     engine.resetLastSync();
     do_check_eq(engine.lastSync, 0);
     do_check_eq(Svc.Prefs.get("steam.lastSync"), "0");
   } finally {
     Svc.Prefs.resetBranch("");
   }
 }
 
+function test_toFetch() {
+  _("SyncEngine.toFetch corresponds to file on disk");
+  const filename = "weave/toFetch/steam.json";
+  let engine = makeSteamEngine();
+  try {
+    // Ensure pristine environment
+    do_check_eq(engine.toFetch.length, 0);
+
+    // Write file to disk
+    let toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
+    engine.toFetch = toFetch;
+    do_check_eq(engine.toFetch, toFetch);
+    // toFetch is written asynchronously
+    Sync.sleep(0);
+    let fakefile = syncTesting.fakeFilesystem.fakeContents[filename];
+    do_check_eq(fakefile, JSON.stringify(toFetch));
+
+    // Read file from disk
+    toFetch = [Utils.makeGUID(), Utils.makeGUID()];
+    syncTesting.fakeFilesystem.fakeContents[filename] = JSON.stringify(toFetch);
+    engine.loadToFetch();
+    do_check_eq(engine.toFetch.length, 2);
+    do_check_eq(engine.toFetch[0], toFetch[0]);
+    do_check_eq(engine.toFetch[1], toFetch[1]);
+  } finally {
+    syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
+  }
+}
+
 function test_resetClient() {
-  _("SyncEngine.resetClient resets lastSync");
+  _("SyncEngine.resetClient resets lastSync and toFetch");
   let engine = makeSteamEngine();
   try {
     // Ensure pristine environment
     do_check_eq(Svc.Prefs.get("steam.lastSync"), undefined);
     do_check_eq(Svc.Prefs.get("steam.lastSyncLocal"), undefined);
+    do_check_eq(engine.toFetch.length, 0);
 
     engine.lastSync = 123.45;
     engine.lastSyncLocal = 67890;
+    engine.toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
 
     engine.resetClient();
     do_check_eq(engine.lastSync, 0);
     do_check_eq(engine.lastSyncLocal, 0);
+    do_check_eq(engine.toFetch.length, 0);
   } finally {
     syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
     Svc.Prefs.resetBranch("");
   }
 }
 
 function test_wipeServer() {
   _("SyncEngine.wipeServer deletes server data and resets the client.");
@@ -99,28 +132,31 @@ function test_wipeServer() {
   let server = httpd_setup({
     "/1.0/foo/storage/steam": steamCollection.handler()
   });
   do_test_pending();
 
   try {
     // Some data to reset.
     engine.lastSync = 123.45;
+    engine.toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
 
     _("Wipe server data and reset client.");
     engine.wipeServer();
     do_check_eq(steamCollection.payload, undefined);
     do_check_eq(engine.lastSync, 0);
+    do_check_eq(engine.toFetch.length, 0);
 
   } finally {
     server.stop(do_test_finished);
     syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
     Svc.Prefs.resetBranch("");
   }
 }
 
 function run_test() {
   test_url_attributes();
   test_syncID();
   test_lastSync();
+  test_toFetch();
   test_resetClient();
   test_wipeServer();
 }
--- a/services/sync/tests/unit/test_syncengine_sync.js
+++ b/services/sync/tests/unit/test_syncengine_sync.js
@@ -457,17 +457,16 @@ function test_processIncoming_reconcile(
 
 
 function test_processIncoming_mobile_batchSize() {
   _("SyncEngine._processIncoming doesn't fetch everything at once on mobile clients");
 
   Svc.Prefs.set("clusterURL", "http://localhost:8080/");
   Svc.Prefs.set("username", "foo");
   Svc.Prefs.set("client.type", "mobile");
-  let crypto_steam = new ServerWBO('steam');
 
   // A collection that logs each GET
   let collection = new ServerCollection();
   collection.get_log = [];
   collection._get = collection.get;
   collection.get = function (options) {
     this.get_log.push(options);
     return this._get(options);
@@ -524,16 +523,147 @@ function test_processIncoming_mobile_bat
   } finally {
     server.stop(do_test_finished);
     Svc.Prefs.resetBranch("");
     Records.clearCache();
     syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
   }
 }
 
+
+function test_processIncoming_store_toFetch() {
+  _("If processIncoming fails in the middle of a batch on mobile, state is saved in toFetch and lastSync.");
+  Svc.Prefs.set("clusterURL", "http://localhost:8080/");
+  Svc.Prefs.set("username", "foo");
+  Svc.Prefs.set("client.type", "mobile");
+
+  // A collection that throws at the fourth get.
+  let collection = new ServerCollection();
+  collection._get_calls = 0;
+  collection._get = collection.get;
+  collection.get = function() {
+    this._get_calls += 1;
+    if (this._get_calls > 3) {
+      throw "Abort on fourth call!";
+    }
+    return this._get.apply(this, arguments);
+  };
+
+  // Let's create three batches worth of server side records.
+  for (var i = 0; i < MOBILE_BATCH_SIZE * 3; i++) {
+    let id = 'record-no-' + i;
+    let payload = encryptPayload({id: id, denomination: "Record No. " + id});
+    let wbo = new ServerWBO(id, payload);
+    wbo.modified = Date.now()/1000 + 60 * (i - MOBILE_BATCH_SIZE * 3);
+    collection.wbos[id] = wbo;
+  }
+
+  let engine = makeSteamEngine();
+  engine.enabled = true;
+
+  let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
+  meta_global.payload.engines = {steam: {version: engine.version,
+                                         syncID: engine.syncID}};
+  let server = sync_httpd_setup({
+      "/1.0/foo/storage/steam": collection.handler()
+  });
+  do_test_pending();
+
+  try {
+
+    // Confirm initial environment
+    do_check_eq(engine.lastSync, 0);
+    do_check_eq([id for (id in engine._store.items)].length, 0);
+
+    let error;
+    try {
+      engine.sync();
+    } catch (ex) {
+      error = ex;
+    }
+    do_check_true(!!error);
+
+    // Only the first two batches have been applied.
+    do_check_eq([id for (id in engine._store.items)].length,
+                MOBILE_BATCH_SIZE * 2);
+
+    // The third batch is stuck in toFetch. lastSync has been moved forward to
+    // the last successful item's timestamp.
+    do_check_eq(engine.toFetch.length, MOBILE_BATCH_SIZE);
+    do_check_eq(engine.lastSync, collection.wbos["record-no-99"].modified);
+
+  } finally {
+    server.stop(do_test_finished);
+    Svc.Prefs.resetBranch("");
+    Records.clearCache();
+    syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
+  }
+}
+
+
+function test_processIncoming_resume_toFetch() {
+  _("toFetch items left over from previous syncs are fetched on the next sync, along with new items.");
+  Svc.Prefs.set("clusterURL", "http://localhost:8080/");
+  Svc.Prefs.set("username", "foo");
+
+  const LASTSYNC = Date.now() / 1000;
+
+  // Server records that will be downloaded
+  let collection = new ServerCollection();
+  collection.wbos.flying = new ServerWBO(
+      'flying', encryptPayload({id: 'flying',
+                                denomination: "LNER Class A3 4472"}));
+  collection.wbos.scotsman = new ServerWBO(
+      'scotsman', encryptPayload({id: 'scotsman',
+                                  denomination: "Flying Scotsman"}));
+  collection.wbos.rekolok = new ServerWBO(
+      'rekolok', encryptPayload({id: 'rekolok',
+                                 denomination: "Rekonstruktionslokomotive"}));
+
+  collection.wbos.flying.modified = collection.wbos.scotsman.modified
+    = LASTSYNC - 10;
+  collection.wbos.rekolok.modified = LASTSYNC + 10;
+
+  // Time travel 10 seconds into the future but still download the above WBOs.
+  let engine = makeSteamEngine();
+  engine.lastSync = LASTSYNC;
+  engine.toFetch = ["flying", "scotsman"];
+
+  let meta_global = Records.set(engine.metaURL, new WBORecord(engine.metaURL));
+  meta_global.payload.engines = {steam: {version: engine.version,
+                                         syncID: engine.syncID}};
+  let server = sync_httpd_setup({
+      "/1.0/foo/storage/steam": collection.handler()
+  });
+  do_test_pending();
+
+  try {
+
+    // Confirm initial environment
+    do_check_eq(engine._store.items.flying, undefined);
+    do_check_eq(engine._store.items.scotsman, undefined);
+    do_check_eq(engine._store.items.rekolok, undefined);
+
+    engine._syncStartup();
+    engine._processIncoming();
+
+    // Local records have been created from the server data.
+    do_check_eq(engine._store.items.flying, "LNER Class A3 4472");
+    do_check_eq(engine._store.items.scotsman, "Flying Scotsman");
+    do_check_eq(engine._store.items.rekolok, "Rekonstruktionslokomotive");
+
+  } finally {
+    server.stop(do_test_finished);
+    Svc.Prefs.resetBranch("");
+    Records.clearCache();
+    syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
+  }
+}
+
+
 function test_uploadOutgoing_toEmptyServer() {
   _("SyncEngine._uploadOutgoing uploads new records to server");
 
   Svc.Prefs.set("clusterURL", "http://localhost:8080/");
   Svc.Prefs.set("username", "foo");
   let collection = new ServerCollection();
   collection.wbos.flying = new ServerWBO('flying');
   collection.wbos.scotsman = new ServerWBO('scotsman');
@@ -845,20 +975,18 @@ function test_syncFinish_deleteLotsInBat
 
 
 function test_sync_partialUpload() {
   _("SyncEngine.sync() keeps changedIDs that couldn't be uploaded.");
 
   Svc.Prefs.set("clusterURL", "http://localhost:8080/");
   Svc.Prefs.set("username", "foo");
 
-  let crypto_steam = new ServerWBO('steam');
   let collection = new ServerCollection();
   let server = sync_httpd_setup({
-      "/1.0/foo/storage/crypto/steam": crypto_steam.handler(),
       "/1.0/foo/storage/steam": collection.handler()
   });
   do_test_pending();
   CollectionKeys.generateNewKeys();
 
   let engine = makeSteamEngine();
   engine.lastSync = 123; // needs to be non-zero so that tracker is queried
   engine.lastSyncLocal = 456;
@@ -993,16 +1121,18 @@ function run_test() {
 
   test_syncStartup_emptyOrOutdatedGlobalsResetsSync();
   test_syncStartup_serverHasNewerVersion();
   test_syncStartup_syncIDMismatchResetsClient();
   test_processIncoming_emptyServer();
   test_processIncoming_createFromServer();
   test_processIncoming_reconcile();
   test_processIncoming_mobile_batchSize();
+  test_processIncoming_store_toFetch();
+  test_processIncoming_resume_toFetch();
   test_uploadOutgoing_toEmptyServer();
   test_uploadOutgoing_failed();
   test_uploadOutgoing_MAX_UPLOAD_RECORDS();
   test_syncFinish_noDelete();
   test_syncFinish_deleteByIds();
   test_syncFinish_deleteLotsInBatches();
   test_sync_partialUpload();
   test_canDecrypt_noCryptoKeys();