Bug 609398 - Get rid of partial sync [r=mconnor]
authorPhilipp von Weitershausen <philipp@weitershausen.de>
Tue, 09 Nov 2010 13:51:19 -0800
changeset 57540 3594e3cc6077ad326e67634a1538df7120d42bc7
parent 57539 2475105f3ba29b8d5e65eb6244e30826d1393295
child 57541 03632b58255d53a4edd0d5778480c7ac9666ecdb
push idunknown
push userunknown
push dateunknown
reviewersmconnor
bugs609398
Bug 609398 - Get rid of partial sync [r=mconnor]
services/sync/modules/constants.js
services/sync/modules/engines.js
services/sync/tests/unit/test_syncengine.js
services/sync/tests/unit/test_syncengine_sync.js
--- a/services/sync/modules/constants.js
+++ b/services/sync/modules/constants.js
@@ -57,16 +57,20 @@ PWDMGR_PASSWORD_REALM:                 "
 PWDMGR_PASSPHRASE_REALM:               "Mozilla Services Encryption Passphrase",
 
 // Sync intervals for various clients configurations
 SINGLE_USER_SYNC:                      24 * 60 * 60 * 1000, // 1 day
 MULTI_DESKTOP_SYNC:                    60 * 60 * 1000, // 1 hour
 MULTI_MOBILE_SYNC:                     5 * 60 * 1000, // 5 minutes
 PARTIAL_DATA_SYNC:                     60 * 1000, // 1 minute
 
+// 50 is hardcoded here because of URL length restrictions.
+// (GUIDs can be up to 64 chars long)
+MOBILE_BATCH_SIZE:                     50,
+
 // score thresholds for early syncs
 SINGLE_USER_THRESHOLD:                 1000,
 MULTI_DESKTOP_THRESHOLD:               500,
 MULTI_MOBILE_THRESHOLD:                100,
 
 // File IO Flags
 MODE_RDONLY:                           0x01,
 MODE_WRONLY:                           0x02,
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -277,17 +277,16 @@ Engine.prototype = {
 
   wipeClient: function Engine_wipeClient() {
     this._notify("wipe-client", this.name, this._wipeClient)();
   }
 };
 
 function SyncEngine(name) {
   Engine.call(this, name || "SyncEngine");
-  this.loadToFetch();
 }
 SyncEngine.prototype = {
   __proto__: Engine.prototype,
   _recordObj: CryptoWrapper,
   version: 1,
 
   get storageURL() Svc.Prefs.get("clusterURL") + Svc.Prefs.get("storageAPI") +
     "/" + ID.get("WeaveID").username + "/storage/",
@@ -317,29 +316,16 @@ SyncEngine.prototype = {
     Svc.Prefs.set(this.name + ".lastSync", value.toString());
   },
   resetLastSync: function SyncEngine_resetLastSync() {
     this._log.debug("Resetting " + this.name + " last sync time");
     Svc.Prefs.reset(this.name + ".lastSync");
     Svc.Prefs.set(this.name + ".lastSync", "0");
   },
 
-  get toFetch() this._toFetch,
-  set toFetch(val) {
-    this._toFetch = val;
-    Utils.jsonSave("toFetch/" + this.name, this, val);
-  },
-
-  loadToFetch: function loadToFetch() {
-    // Initialize to empty if there's no file
-    this._toFetch = [];
-    Utils.jsonLoad("toFetch/" + this.name, this, Utils.bind2(this, function(o)
-      this._toFetch = o));
-  },
-
   // Create a new record using the store and add in crypto fields
   _createRecord: function SyncEngine__createRecord(id) {
     let record = this._store.createRecord(id, this.engineURL + "/" + id);
     record.id = id;
     record.encryption = this.cryptoMetaURL;
     return record;
   },
 
@@ -434,33 +420,29 @@ SyncEngine.prototype = {
 
     let outnum = [i for (i in this._tracker.changedIDs)].length;
     this._log.info(outnum + " outgoing items pre-reconciliation");
 
     // Keep track of what to delete at the end of sync
     this._delete = {};
   },
 
-  // Generate outgoing records
+  // Process incoming records
   _processIncoming: function SyncEngine__processIncoming() {
     this._log.trace("Downloading & applying server changes");
 
     // Figure out how many total items to fetch this sync; do less on mobile.
-    // 50 is hardcoded here because of URL length restrictions.
-    // (GUIDs can be up to 64 chars long)
-    let fetchNum = Infinity;
-
+    let batchSize = Infinity;
     let newitems = new Collection(this.engineURL, this._recordObj);
     if (Svc.Prefs.get("client.type") == "mobile") {
-      fetchNum = 50;
-      newitems.sort = "index";
+      batchSize = MOBILE_BATCH_SIZE;
     }
     newitems.newer = this.lastSync;
     newitems.full = true;
-    newitems.limit = fetchNum;
+    newitems.limit = batchSize;
 
     let count = {applied: 0, reconciled: 0};
     let handled = [];
     newitems.recordHandler = Utils.bind2(this, function(item) {
       // Grab a later last modified if possible
       if (this.lastModified == null || item.modified > this.lastModified)
         this.lastModified = item.modified;
 
@@ -497,63 +479,58 @@ SyncEngine.prototype = {
 
     // Only bother getting data from the server if there's new things
     if (this.lastModified == null || this.lastModified > this.lastSync) {
       let resp = newitems.get();
       if (!resp.success) {
         resp.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw resp;
       }
-
-      // Subtract out the number of items we just got
-      fetchNum -= handled.length;
     }
 
-    // Check if we got the maximum that we requested; get the rest if so
+    // Mobile: check if we got the maximum that we requested; get the rest if so.
+    let toFetch = [];
     if (handled.length == newitems.limit) {
       let guidColl = new Collection(this.engineURL);
       guidColl.newer = this.lastSync;
-      guidColl.sort = "index";
 
       let guids = guidColl.get();
       if (!guids.success)
         throw guids;
 
       // Figure out which guids weren't just fetched then remove any guids that
       // were already waiting and prepend the new ones
       let extra = Utils.arraySub(guids.obj, handled);
       if (extra.length > 0)
-        this.toFetch = extra.concat(Utils.arraySub(this.toFetch, extra));
+        toFetch = extra.concat(Utils.arraySub(toFetch, extra));
     }
 
-    // Process any backlog of GUIDs if we haven't fetched too many this sync
-    while (this.toFetch.length > 0 && fetchNum > 0) {
+    // Mobile: process any backlog of GUIDs
+    while (toFetch.length) {
       // Reuse the original query, but get rid of the restricting params
       newitems.limit = 0;
       newitems.newer = 0;
 
       // Get the first bunch of records and save the rest for later
-      let minFetch = Math.min(150, this.toFetch.length, fetchNum);
-      newitems.ids = this.toFetch.slice(0, minFetch);
-      this.toFetch = this.toFetch.slice(minFetch);
-      fetchNum -= minFetch;
+      newitems.ids = toFetch.slice(0, batchSize);
+      toFetch = toFetch.slice(batchSize);
 
       // Reuse the existing record handler set earlier
       let resp = newitems.get();
       if (!resp.success) {
         resp.failureCode = ENGINE_DOWNLOAD_FAIL;
         throw resp;
       }
     }
 
     if (this.lastSync < this.lastModified)
       this.lastSync = this.lastModified;
 
     this._log.info(["Records:", count.applied, "applied,", count.reconciled,
-      "reconciled,", this.toFetch.length, "left to fetch"].join(" "));
+      "reconciled."].join(" "));
   },
 
   /**
    * Find a GUID of an item that is a duplicate of the incoming item but happens
    * to have a different GUID
    *
    * @return GUID of the similar item; falsy otherwise
    */
@@ -783,17 +760,16 @@ SyncEngine.prototype = {
       this._log.debug("Failed test decrypt: " + Utils.exceptionStr(ex));
     }
 
     return canDecrypt;
   },
 
   _resetClient: function SyncEngine__resetClient() {
     this.resetLastSync();
-    this.toFetch = [];
   },
 
   wipeServer: function wipeServer(ignoreCrypto) {
     new Resource(this.engineURL).delete();
     if (!ignoreCrypto)
       new Resource(this.cryptoMetaURL).delete();
     this._resetClient();
   }
--- a/services/sync/tests/unit/test_syncengine.js
+++ b/services/sync/tests/unit/test_syncengine.js
@@ -59,58 +59,27 @@ function test_lastSync() {
     engine.resetLastSync();
     do_check_eq(engine.lastSync, 0);
     do_check_eq(Svc.Prefs.get("steam.lastSync"), "0");
   } finally {
     Svc.Prefs.resetBranch("");
   }
 }
 
-function test_toFetch() {
-  _("SyncEngine.toFetch corresponds to file on disk");
-  let engine = makeSteamEngine();
-  try {
-    // Ensure pristine environment
-    do_check_eq(engine.toFetch.length, 0);
-
-    // Write file to disk
-    let toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
-    engine.toFetch = toFetch;
-    do_check_eq(engine.toFetch, toFetch);
-    let fakefile = syncTesting.fakeFilesystem.fakeContents[
-        "weave/toFetch/steam.json"];
-    do_check_eq(fakefile, JSON.stringify(toFetch));
-
-    // Read file from disk
-    toFetch = [Utils.makeGUID(), Utils.makeGUID()];
-    syncTesting.fakeFilesystem.fakeContents["weave/toFetch/steam.json"]
-        = JSON.stringify(toFetch);
-    engine.loadToFetch();
-    do_check_eq(engine.toFetch.length, 2);
-    do_check_eq(engine.toFetch[0], toFetch[0]);
-    do_check_eq(engine.toFetch[1], toFetch[1]);
-  } finally {
-    syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
-  }
-}
-
 function test_resetClient() {
-  _("SyncEngine.resetClient resets lastSync and toFetch");
+  _("SyncEngine.resetClient resets lastSync");
   let engine = makeSteamEngine();
   try {
     // Ensure pristine environment
     do_check_eq(Svc.Prefs.get("steam.lastSync"), undefined);
-    do_check_eq(engine.toFetch.length, 0);
 
-    engine.toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
     engine.lastSync = 123.45;
 
     engine.resetClient();
     do_check_eq(engine.lastSync, 0);
-    do_check_eq(engine.toFetch.length, 0);
   } finally {
     syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
     Svc.Prefs.resetBranch("");
   }
 }
 
 function test_wipeServer() {
   _("SyncEngine.wipeServer deletes server data and resets the client.");
@@ -123,24 +92,22 @@ function test_wipeServer() {
   let server = httpd_setup({
     "/1.0/foo/storage/crypto/steam": steamCrypto.handler(),
     "/1.0/foo/storage/steam": steamCollection.handler()
   });
   do_test_pending();
 
   try {
     // Some data to reset.
-    engine.toFetch = [Utils.makeGUID(), Utils.makeGUID(), Utils.makeGUID()];
     engine.lastSync = 123.45;
 
     _("Wipe server data and reset client.");
     engine.wipeServer(true);
     do_check_eq(steamCollection.payload, undefined);
     do_check_eq(engine.lastSync, 0);
-    do_check_eq(engine.toFetch.length, 0);
 
     _("We passed a truthy arg earlier in which case it doesn't wipe the crypto collection.");
     do_check_eq(steamCrypto.payload, PAYLOAD);
     engine.wipeServer();
     do_check_eq(steamCrypto.payload, undefined);
 
   } finally {
     server.stop(do_test_finished);
@@ -148,12 +115,11 @@ function test_wipeServer() {
     Svc.Prefs.resetBranch("");
   }
 }
 
 function run_test() {
   test_url_attributes();
   test_syncID();
   test_lastSync();
-  test_toFetch();
   test_resetClient();
   test_wipeServer();
 }
--- a/services/sync/tests/unit/test_syncengine_sync.js
+++ b/services/sync/tests/unit/test_syncengine_sync.js
@@ -468,17 +468,16 @@ function test_processIncoming_emptyServe
   createAndUploadKeypair();
 
   let engine = makeSteamEngine();
   try {
 
     // Merely ensure that this code path is run without any errors
     engine._processIncoming();
     do_check_eq(engine.lastSync, 0);
-    do_check_eq(engine.toFetch.length, 0);
 
   } finally {
     server.stop(do_test_finished);
     Svc.Prefs.resetBranch("");
     Records.clearCache();
     CryptoMetas.clearCache();
     syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
   }
@@ -671,24 +670,32 @@ function test_processIncoming_reconcile(
     Svc.Prefs.resetBranch("");
     Records.clearCache();
     CryptoMetas.clearCache();
     syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
   }
 }
 
 
-function test_processIncoming_fetchNum() {
-  _("SyncEngine._processIncoming doesn't fetch everything at ones on mobile clients");
+function test_processIncoming_mobile_batchSize() {
+  _("SyncEngine._processIncoming doesn't fetch everything at once on mobile clients");
 
   Svc.Prefs.set("clusterURL", "http://localhost:8080/");
   Svc.Prefs.set("username", "foo");
   Svc.Prefs.set("client.type", "mobile");
   let crypto_steam = new ServerWBO('steam');
+
+  // A collection that logs each GET
   let collection = new ServerCollection();
+  collection.get_log = [];
+  collection._get = collection.get;
+  collection.get = function (options) {
+    this.get_log.push(options);
+    return this._get(options);
+  };
 
   // Let's create some 234 server side records. They're all at least
   // 10 minutes old.
   for (var i = 0; i < 234; i++) {
     let id = 'record-no-' + i;
     let payload = encryptPayload({id: id, denomination: "Record No. " + i});
     let wbo = new ServerWBO(id, payload);
     wbo.modified = Date.now()/1000 - 60*(i+10);
@@ -702,81 +709,40 @@ function test_processIncoming_fetchNum()
   do_test_pending();
   createAndUploadKeypair();
   createAndUploadSymKey("http://localhost:8080/1.0/foo/storage/crypto/steam");
 
   let engine = makeSteamEngine();
 
   try {
 
-    // On a mobile client, the first sync will only get the first 50
-    // objects from the server
+    // On a mobile client, we get new records from the server in batches of 50.
     engine._processIncoming();
-    do_check_eq([id for (id in engine._store.items)].length, 50);
+    do_check_eq([id for (id in engine._store.items)].length, 234);
     do_check_true('record-no-0' in engine._store.items);
     do_check_true('record-no-49' in engine._store.items);
-    do_check_eq(engine.toFetch.length, 234 - 50);
-
-
-    // The next sync will get another 50 objects, assuming the server
-    // hasn't got any new data.
-    engine._processIncoming();
-    do_check_eq([id for (id in engine._store.items)].length, 100);
     do_check_true('record-no-50' in engine._store.items);
-    do_check_true('record-no-99' in engine._store.items);
-    do_check_eq(engine.toFetch.length, 234 - 100);
-
-
-    // Now let's say there are some new items on the server
-    for (i=0; i < 5; i++) {
-      let id = 'new-record-no-' + i;
-      let payload = encryptPayload({id: id, denomination: "New record No. " + i});
-      let wbo = new ServerWBO(id, payload);
-      wbo.modified = Date.now()/1000 - 60*i;
-      collection.wbos[id] = wbo;
-    }
-    // Let's tell the engine the server has got newer data.  This is
-    // normally done by the WeaveSvc after retrieving info/collections.
-    engine.lastModified = Date.now() / 1000 + 1;
+    do_check_true('record-no-233' in engine._store.items);
 
-    // Now we'll fetch another 50 items, but 5 of those are the new
-    // ones, so we've only fetched another 45 of the older ones.
-    engine._processIncoming();
-    do_check_eq([id for (id in engine._store.items)].length, 150);
-    do_check_true('new-record-no-0' in engine._store.items);
-    do_check_true('new-record-no-4' in engine._store.items);
-    do_check_true('record-no-100' in engine._store.items);
-    do_check_true('record-no-144' in engine._store.items);
-    do_check_eq(engine.toFetch.length, 234 - 100 - 45);
-
-
-    // Now let's modify a few existing records on the server so that
-    // they have to be refetched.
-    collection.wbos['record-no-3'].modified = Date.now()/1000 + 1;
-    collection.wbos['record-no-41'].modified = Date.now()/1000 + 1;
-    collection.wbos['record-no-122'].modified = Date.now()/1000 + 1;
-
-    // Once again we'll tell the engine that the server's got newer data
-    // and once again we'll fetch 50 items, but 3 of those are the
-    // existing records, so we're only fetching 47 new ones.
-    engine.lastModified = Date.now() / 1000 + 2;
-    engine._processIncoming();
-    do_check_eq([id for (id in engine._store.items)].length, 197);
-    do_check_true('record-no-145' in engine._store.items);
-    do_check_true('record-no-191' in engine._store.items);
-    do_check_eq(engine.toFetch.length, 234 - 100 - 45 - 47);
-
-
-    // Finally let's fetch the rest, making sure that will fetch
-    // everything up to the last record.
-    while(engine.toFetch.length) {
-      engine._processIncoming();
+    // Verify that the right number of GET requests with the right
+    // kind of parameters were made.
+    do_check_eq(collection.get_log.length,
+                Math.ceil(234 / MOBILE_BATCH_SIZE) + 1);
+    do_check_eq(collection.get_log[0].full, 1);
+    do_check_eq(collection.get_log[0].limit, MOBILE_BATCH_SIZE);
+    do_check_eq(collection.get_log[1].full, undefined);
+    do_check_eq(collection.get_log[1].limit, undefined);
+    for (let i = 1; i <= Math.floor(234 / MOBILE_BATCH_SIZE); i++) {
+      do_check_eq(collection.get_log[i+1].full, 1);
+      do_check_eq(collection.get_log[i+1].limit, undefined);
+      if (i < Math.floor(234 / MOBILE_BATCH_SIZE))
+        do_check_eq(collection.get_log[i+1].ids.length, MOBILE_BATCH_SIZE);
+      else
+        do_check_eq(collection.get_log[i+1].ids.length, 234 % MOBILE_BATCH_SIZE);
     }
-    do_check_eq([id for (id in engine._store.items)].length, 234 + 5);
-    do_check_true('record-no-233' in engine._store.items);
 
   } finally {
     server.stop(do_test_finished);
     Svc.Prefs.resetBranch("");
     Records.clearCache();
     CryptoMetas.clearCache();
     syncTesting = new SyncTestingInfrastructure(makeSteamEngine);
   }
@@ -1157,17 +1123,17 @@ function run_test() {
   test_syncStartup_metaGet404();
   test_syncStartup_failedMetaGet();
   test_syncStartup_serverHasNewerVersion();
   test_syncStartup_syncIDMismatchResetsClient();
   test_syncStartup_badKeyWipesServerData();
   test_processIncoming_emptyServer();
   test_processIncoming_createFromServer();
   test_processIncoming_reconcile();
-  test_processIncoming_fetchNum();
+  test_processIncoming_mobile_batchSize();
   test_uploadOutgoing_toEmptyServer();
   test_uploadOutgoing_failed();
   test_uploadOutgoing_MAX_UPLOAD_RECORDS();
   test_syncFinish_noDelete();
   test_syncFinish_deleteByIds();
   test_syncFinish_deleteLotsInBatches();
   test_canDecrypt_noCryptoMeta();
   test_canDecrypt_true();