make engine keep less records in memory by limiting the outgoing queue to a maximum of 100 records, and fetch the rest from the store each time
authorDan Mills <thunder@mozilla.com>
Mon, 29 Dec 2008 23:28:17 -0800
changeset 45145 7cd4d0d6f13b17482a3f2659f0b892e3172d3fa8
parent 45144 30590ed4964197652154756f7ec36fde084924aa
child 45146 a9b97ff3a0f1bde91a0a61e94dbf25887a07f50d
push idunknown
push userunknown
push dateunknown
make engine keep less records in memory by limiting the outgoing queue to a maximum of 100 records, and fetch the rest from the store each time
services/sync/modules/engines.js
services/sync/modules/engines/bookmarks.js
services/sync/modules/stores.js
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -170,28 +170,16 @@ Engine.prototype = {
     this._log.level = Log4Moz.Level[level];
     this._osPrefix = "weave:" + this.name + ":";
 
     this._tracker; // initialize tracker to load previously changed IDs
 
     this._log.debug("Engine initialized");
   },
 
-  _serializeCommands: function Engine__serializeCommands(commands) {
-    let json = this._json.encode(commands);
-    //json = json.replace(/ {action/g, "\n {action");
-    return json;
-  },
-
-  _serializeConflicts: function Engine__serializeConflicts(conflicts) {
-    let json = this._json.encode(conflicts);
-    //json = json.replace(/ {action/g, "\n {action");
-    return json;
-  },
-
   _resetServer: function Engine__resetServer() {
     let self = yield;
     throw "_resetServer needs to be subclassed";
   },
 
   _resetClient: function Engine__resetClient() {
     let self = yield;
     this._log.debug("Resetting client state");
@@ -267,28 +255,19 @@ SyncEngine.prototype = {
     } catch (e) {
       return 0;
     }
   },
   set lastSync(value) {
     Utils.prefs.setCharPref(this.name + ".lastSync", value);
   },
 
-  // XXX these two should perhaps just be a variable inside sync(), but we have
-  //     one or two other methods that use it
-
-  get incoming() {
-    if (!this._incoming)
-      this._incoming = [];
-    return this._incoming;
-  },
-
   get outgoing() {
     if (!this._outgoing)
-      this._outgoing = [];
+      this._outgoing = {};
     return this._outgoing;
   },
 
   // Create a new record by querying the store, and add the engine metadata
   _createRecord: function SyncEngine__createRecord(id) {
     let record = this._store.createRecord(id);
     record.uri = this.engineURL + id;
     record.encryption = this.cryptoMetaURL;
@@ -329,24 +308,16 @@ SyncEngine.prototype = {
         getService(Ci.IWeaveCrypto);
       let symkey = cryptoSvc.generateRandomKey();
       let pubkey = yield PubKeys.getDefaultKey(self.cb);
       meta = new CryptoMeta(this.cryptoMetaURL);
       meta.generateIV();
       yield meta.addUnwrappedKey(self.cb, pubkey, symkey);
       yield meta.put(self.cb);
     }
-    this._tracker.disable();
-  },
-
-  // Generate outgoing records
-  _generateOutgoing: function SyncEngine__generateOutgoing() {
-    let self = yield;
-
-    this._log.debug("Calculating client changes");
 
     // first sync special case: upload all items
     // note that we use a backdoor (of sorts) to the tracker and it
     // won't save to disk this list
     if (!this.lastSync) {
       this._log.info("First sync, uploading all items");
 
       // remove any old ones first
@@ -354,31 +325,17 @@ SyncEngine.prototype = {
 
       // now add all current ones
       let all = this._store.getAllIDs();
       for (let id in all) {
         this._tracker.changedIDs[id] = true;
       }
     }
 
-    // generate queue from changed items list
-
-    // XXX should have a heuristic like this, but then we need to be able to
-    // serialize each item by itself, something our stores can't currently do
-    //if (this._tracker.changedIDs.length >= 30)
-    //this._store.cacheItemsHint();
-
-    // NOTE we want changed items -> outgoing -> server to be as atomic as
-    // possible, so we clear the changed IDs after we upload the changed records
-    // NOTE2 don't encrypt, we'll do that before uploading instead
-    for (let id in this._tracker.changedIDs) {
-      this.outgoing.push(this._createRecord(id));
-    }
-
-    //this._store.clearItemCacheHint();
+    this._tracker.disable(); // FIXME: need finer-grained ignoring
   },
 
   // Generate outgoing records
   _processIncoming: function SyncEngine__processIncoming() {
     let self = yield;
 
     this._log.debug("Downloading & applying server changes");
 
@@ -398,26 +355,24 @@ SyncEngine.prototype = {
         if (mem.isLowMemory()) {
           this._log.warn("Low memory, aborting sync!");
           throw "Low memory";
         }
       }
       yield item.decrypt(self.cb, ID.get('WeaveCryptoID').password);
       if (yield this._reconcile.async(this, self.cb, item))
         yield this._applyIncoming.async(this, self.cb, item);
-      else
-        this._log.debug("Skipping reconciled incoming item");
+      else {
+        this._log.debug("Skipping reconciled incoming item " + item.id);
+        if (this._lastSyncTmp < item.modified)
+          this._lastSyncTmp = item.modified;
+      }
     }
-    if (typeof(this._lastSyncTmp) == "string")
-      this._lastSyncTmp = parseInt(this._lastSyncTmp);
     if (this.lastSync < this._lastSyncTmp)
         this.lastSync = this._lastSyncTmp;
-
-    // removes any holes caused by reconciliation above:
-    this._outgoing = this.outgoing.filter(function(n) n);
   },
 
   // Reconciliation has two steps:
   // 1) Check for the same item (same ID) on both the incoming and outgoing
   // queues.  This means the same item was modified on this profile and another
   // at the same time.  In this case, this client wins (which really means, the
   // last profile you sync wins).
   // 2) Check if any incoming & outgoing items are actually the same, even
@@ -426,52 +381,55 @@ SyncEngine.prototype = {
   // is synced for the first time after having (manually or otherwise) imported
   // bookmarks imported, every bookmark will match this condition.
   // When two items with different IDs are "the same" we change the local ID to
   // match the remote one.
   _reconcile: function SyncEngine__reconcile(item) {
     let self = yield;
     let ret = true;
 
-    this._log.debug("Reconciling incoming item");
+    // Check for the same item (same ID) on both incoming & outgoing queues
+    if (item.id in this._tracker.changedIDs) {
+      // Check to see if client and server were changed in the same way
+      let out = this._createRecord(item.id);
+      if (Utils.deepEquals(item.cleartext, out.cleartext)) {
+        this._tracker.removeChangedID(item.id);
+        delete this.outgoing[item.id];
+      } else {
+        this._log.debug("Discarding server change due to conflict with local change");
+      }
+      self.done(false);
+      return;
+    }
 
-    // Check for the same item (same ID) on both incoming & outgoing queues
-    let conflicts = [];
-    for (let o = 0; o < this.outgoing.length; o++) {
-      if (!this.outgoing[o])
-        continue; // skip previously removed items
-      if (item.id == this.outgoing[o].id) {
-        // Only consider it a conflict if there are actual differences
-        // otherwise, just ignore the outgoing record as well
-        if (!Utils.deepEquals(item.cleartext, this.outgoing[o].cleartext))
-          conflicts.push({in: item, out: this.outgoing[o]});
-        else
-          delete this.outgoing[o];
-
-        self.done(false);
-        return;
-      }
+    // Check for the incoming item's ID otherwise existing locally
+    if (this._store.itemExists(item.id)) {
+      self.done(true);
+      return;
     }
-    if (conflicts.length)
-      this._log.debug("Conflicts found.  Conflicting server changes discarded");
 
     // Check for items with different IDs which we think are the same one
-    for (let o = 0; o < this.outgoing.length; o++) {
-      if (!this.outgoing[o])
-        continue; // skip previously removed items
+    for (let id in this._tracker.changedIDs) {
+      // Generate outgoing record or used a cached one
+      let out = (id in this.outgoing)?
+        this.outgoing[id] : this._createRecord(id);
+
+      // cache the first 100, after that we will throw them away - slower but less memory hungry
+      if ([i for (i in this.outgoing)].length <= 100)
+        this.outgoing[id] = out;
 
-      if (this._recordLike(item, this.outgoing[o])) {
-        // change refs in outgoing queue
-        yield this._changeRecordRefs.async(this, self.cb,
-                                           this.outgoing[o].id,
-                                           item.id);
-        // change actual id of item
-        this._store.changeItemID(this.outgoing[o].id,
-                                 item.id);
-        delete this.outgoing[o];
+      if (this._recordLike(item, out)) {
+        // change refs in outgoing queue, then actual id of local item
+        // XXX might it be better to just clear the outgoing queue?
+        yield this._changeRecordRefs.async(this, self.cb, id, item.id);
+        this._store.changeItemID(id, item.id);
+
+        this._tracker.removeChangedID(item.id);
+        delete this.outgoing[item.id];
+
         self.done(false);
         return;
       }
     }
     self.done(true);
   },
 
   // Apply incoming records
@@ -540,26 +498,18 @@ SyncEngine.prototype = {
     this._tracker.enable();
   },
 
   _sync: function SyncEngine__sync() {
     let self = yield;
 
     try {
       yield this._syncStartup.async(this, self.cb);
-
-      // Populate outgoing queue
-      yield this._generateOutgoing.async(this, self.cb);
-
-      // Fetch incoming records and apply them
       yield this._processIncoming.async(this, self.cb);
-
-      // Upload outgoing records
       yield this._uploadOutgoing.async(this, self.cb);
-
       yield this._syncFinish.async(this, self.cb);
     }
     catch (e) {
       this._log.warn("Sync failed");
       throw e;
     }
     finally {
       this._tracker.enable();
--- a/services/sync/modules/engines/bookmarks.js
+++ b/services/sync/modules/engines/bookmarks.js
@@ -190,17 +190,17 @@ BookmarksStore.prototype = {
       return true;
     if (placeId == this._bms.unfiledBookmarksFolder)
       return true;
     if (this._bms.getFolderIdForItem(placeId) < 0)
       return true;
     return false;
   },
 
-  _itemExists: function BStore__itemExists(id) {
+  itemExists: function BStore_itemExists(id) {
     return this._getItemIdForGUID(id) >= 0;
   },
 
   create: function BStore_create(record) {
     let newId;
     let parentId = this._getItemIdForGUID(record.parentid);
 
     if (parentId < 0) {
@@ -225,24 +225,24 @@ BookmarksStore.prototype = {
                                    this._ans.EXPIRE_NEVER);
       }
 
       if (record.cleartext.type == "microsummary") {
         this._log.debug("   \-> is a microsummary");
         this._ans.setItemAnnotation(newId, "bookmarks/staticTitle",
                                     record.cleartext.staticTitle || "", 0, this._ans.EXPIRE_NEVER);
         let genURI = Utils.makeURI(record.cleartext.generatorURI);
-	if (this._ms == SERVICE_NOT_SUPPORTED) {
-	  this._log.warn("Can't create microsummary -- not supported.");
-	} else {
+	if (this._ms) {
           try {
             let micsum = this._ms.createMicrosummary(uri, genURI);
             this._ms.setMicrosummary(newId, micsum);
           }
           catch(ex) { /* ignore "missing local generator" exceptions */ }
+	} else {
+	  this._log.warn("Can't create microsummary -- not supported.");
 	}
       }
     } break;
     case "folder":
       this._log.debug(" -> creating folder \"" + record.cleartext.title + "\"");
       newId = this._bms.createFolder(parentId,
                                      record.cleartext.title,
                                      record.sortindex);
--- a/services/sync/modules/stores.js
+++ b/services/sync/modules/stores.js
@@ -92,47 +92,32 @@ Store.prototype = {
       else if (!this.itemExists(rec.id))
         this.create(rec);
       else
         this.update(rec);
     };
     fn.async(this, onComplete, record);
   },
 
-  itemExists: function Store_itemExists(id) {
-    if (!this._itemCache)
-      return this._itemExists(id);
-
-    if (id in this._itemCache)
-      return true;
-    else
-      return false;
-  },
-
-  // subclasses probably want to override this one
-  _itemExists: function Store__itemExists(id) {
-    return false;
-  },
-
   cacheItemsHint: function Store_cacheItemsHint() {
     this._itemCache = this.wrap();
   },
 
   clearItemCacheHint: function Store_clearItemCacheHint() {
     this._itemCache = null;
   },
 
   // override these in derived objects
 
-  wrap: function Store_wrap() {
-    throw "override wrap in a subclass";
+  itemExists: function Store_itemExists(id) {
+    throw "override itemExists in a subclass";
   },
 
-  wrapItem: function Store_wrapItem() {
-    throw "override wrapItem in a subclass";
+  createRecord: function Store_createRecord() {
+    throw "override createRecord in a subclass";
   },
 
   wrapDepth: function BStore_wrapDepth(guid, items) {
     if (typeof(items) == "undefined")
       items = {};
     for (let childguid in this._itemCache) {
       if (this._itemCache[childguid].parentid == guid) {
         items[childguid] = this._itemCache[childguid].depth;