make engine keep less records in memory by limiting the outgoing queue to a maximum of 100 records, and fetch the rest from the store each time
make engine keep less records in memory by limiting the outgoing queue to a maximum of 100 records, and fetch the rest from the store each time
--- a/services/sync/modules/engines.js
+++ b/services/sync/modules/engines.js
@@ -170,28 +170,16 @@ Engine.prototype = {
this._log.level = Log4Moz.Level[level];
this._osPrefix = "weave:" + this.name + ":";
this._tracker; // initialize tracker to load previously changed IDs
this._log.debug("Engine initialized");
},
- _serializeCommands: function Engine__serializeCommands(commands) {
- let json = this._json.encode(commands);
- //json = json.replace(/ {action/g, "\n {action");
- return json;
- },
-
- _serializeConflicts: function Engine__serializeConflicts(conflicts) {
- let json = this._json.encode(conflicts);
- //json = json.replace(/ {action/g, "\n {action");
- return json;
- },
-
_resetServer: function Engine__resetServer() {
let self = yield;
throw "_resetServer needs to be subclassed";
},
_resetClient: function Engine__resetClient() {
let self = yield;
this._log.debug("Resetting client state");
@@ -267,28 +255,19 @@ SyncEngine.prototype = {
} catch (e) {
return 0;
}
},
set lastSync(value) {
Utils.prefs.setCharPref(this.name + ".lastSync", value);
},
- // XXX these two should perhaps just be a variable inside sync(), but we have
- // one or two other methods that use it
-
- get incoming() {
- if (!this._incoming)
- this._incoming = [];
- return this._incoming;
- },
-
get outgoing() {
if (!this._outgoing)
- this._outgoing = [];
+ this._outgoing = {};
return this._outgoing;
},
// Create a new record by querying the store, and add the engine metadata
_createRecord: function SyncEngine__createRecord(id) {
let record = this._store.createRecord(id);
record.uri = this.engineURL + id;
record.encryption = this.cryptoMetaURL;
@@ -329,24 +308,16 @@ SyncEngine.prototype = {
getService(Ci.IWeaveCrypto);
let symkey = cryptoSvc.generateRandomKey();
let pubkey = yield PubKeys.getDefaultKey(self.cb);
meta = new CryptoMeta(this.cryptoMetaURL);
meta.generateIV();
yield meta.addUnwrappedKey(self.cb, pubkey, symkey);
yield meta.put(self.cb);
}
- this._tracker.disable();
- },
-
- // Generate outgoing records
- _generateOutgoing: function SyncEngine__generateOutgoing() {
- let self = yield;
-
- this._log.debug("Calculating client changes");
// first sync special case: upload all items
// note that we use a backdoor (of sorts) to the tracker and it
// won't save to disk this list
if (!this.lastSync) {
this._log.info("First sync, uploading all items");
// remove any old ones first
@@ -354,31 +325,17 @@ SyncEngine.prototype = {
// now add all current ones
let all = this._store.getAllIDs();
for (let id in all) {
this._tracker.changedIDs[id] = true;
}
}
- // generate queue from changed items list
-
- // XXX should have a heuristic like this, but then we need to be able to
- // serialize each item by itself, something our stores can't currently do
- //if (this._tracker.changedIDs.length >= 30)
- //this._store.cacheItemsHint();
-
- // NOTE we want changed items -> outgoing -> server to be as atomic as
- // possible, so we clear the changed IDs after we upload the changed records
- // NOTE2 don't encrypt, we'll do that before uploading instead
- for (let id in this._tracker.changedIDs) {
- this.outgoing.push(this._createRecord(id));
- }
-
- //this._store.clearItemCacheHint();
+ this._tracker.disable(); // FIXME: need finer-grained ignoring
},
// Generate outgoing records
_processIncoming: function SyncEngine__processIncoming() {
let self = yield;
this._log.debug("Downloading & applying server changes");
@@ -398,26 +355,24 @@ SyncEngine.prototype = {
if (mem.isLowMemory()) {
this._log.warn("Low memory, aborting sync!");
throw "Low memory";
}
}
yield item.decrypt(self.cb, ID.get('WeaveCryptoID').password);
if (yield this._reconcile.async(this, self.cb, item))
yield this._applyIncoming.async(this, self.cb, item);
- else
- this._log.debug("Skipping reconciled incoming item");
+ else {
+ this._log.debug("Skipping reconciled incoming item " + item.id);
+ if (this._lastSyncTmp < item.modified)
+ this._lastSyncTmp = item.modified;
+ }
}
- if (typeof(this._lastSyncTmp) == "string")
- this._lastSyncTmp = parseInt(this._lastSyncTmp);
if (this.lastSync < this._lastSyncTmp)
this.lastSync = this._lastSyncTmp;
-
- // removes any holes caused by reconciliation above:
- this._outgoing = this.outgoing.filter(function(n) n);
},
// Reconciliation has two steps:
// 1) Check for the same item (same ID) on both the incoming and outgoing
// queues. This means the same item was modified on this profile and another
// at the same time. In this case, this client wins (which really means, the
// last profile you sync wins).
// 2) Check if any incoming & outgoing items are actually the same, even
@@ -426,52 +381,55 @@ SyncEngine.prototype = {
// is synced for the first time after having (manually or otherwise) imported
// bookmarks imported, every bookmark will match this condition.
// When two items with different IDs are "the same" we change the local ID to
// match the remote one.
_reconcile: function SyncEngine__reconcile(item) {
let self = yield;
let ret = true;
- this._log.debug("Reconciling incoming item");
+ // Check for the same item (same ID) on both incoming & outgoing queues
+ if (item.id in this._tracker.changedIDs) {
+ // Check to see if client and server were changed in the same way
+ let out = this._createRecord(item.id);
+ if (Utils.deepEquals(item.cleartext, out.cleartext)) {
+ this._tracker.removeChangedID(item.id);
+ delete this.outgoing[item.id];
+ } else {
+ this._log.debug("Discarding server change due to conflict with local change");
+ }
+ self.done(false);
+ return;
+ }
- // Check for the same item (same ID) on both incoming & outgoing queues
- let conflicts = [];
- for (let o = 0; o < this.outgoing.length; o++) {
- if (!this.outgoing[o])
- continue; // skip previously removed items
- if (item.id == this.outgoing[o].id) {
- // Only consider it a conflict if there are actual differences
- // otherwise, just ignore the outgoing record as well
- if (!Utils.deepEquals(item.cleartext, this.outgoing[o].cleartext))
- conflicts.push({in: item, out: this.outgoing[o]});
- else
- delete this.outgoing[o];
-
- self.done(false);
- return;
- }
+ // Check for the incoming item's ID otherwise existing locally
+ if (this._store.itemExists(item.id)) {
+ self.done(true);
+ return;
}
- if (conflicts.length)
- this._log.debug("Conflicts found. Conflicting server changes discarded");
// Check for items with different IDs which we think are the same one
- for (let o = 0; o < this.outgoing.length; o++) {
- if (!this.outgoing[o])
- continue; // skip previously removed items
+ for (let id in this._tracker.changedIDs) {
+ // Generate outgoing record or used a cached one
+ let out = (id in this.outgoing)?
+ this.outgoing[id] : this._createRecord(id);
+
+ // cache the first 100, after that we will throw them away - slower but less memory hungry
+ if ([i for (i in this.outgoing)].length <= 100)
+ this.outgoing[id] = out;
- if (this._recordLike(item, this.outgoing[o])) {
- // change refs in outgoing queue
- yield this._changeRecordRefs.async(this, self.cb,
- this.outgoing[o].id,
- item.id);
- // change actual id of item
- this._store.changeItemID(this.outgoing[o].id,
- item.id);
- delete this.outgoing[o];
+ if (this._recordLike(item, out)) {
+ // change refs in outgoing queue, then actual id of local item
+ // XXX might it be better to just clear the outgoing queue?
+ yield this._changeRecordRefs.async(this, self.cb, id, item.id);
+ this._store.changeItemID(id, item.id);
+
+ this._tracker.removeChangedID(item.id);
+ delete this.outgoing[item.id];
+
self.done(false);
return;
}
}
self.done(true);
},
// Apply incoming records
@@ -540,26 +498,18 @@ SyncEngine.prototype = {
this._tracker.enable();
},
_sync: function SyncEngine__sync() {
let self = yield;
try {
yield this._syncStartup.async(this, self.cb);
-
- // Populate outgoing queue
- yield this._generateOutgoing.async(this, self.cb);
-
- // Fetch incoming records and apply them
yield this._processIncoming.async(this, self.cb);
-
- // Upload outgoing records
yield this._uploadOutgoing.async(this, self.cb);
-
yield this._syncFinish.async(this, self.cb);
}
catch (e) {
this._log.warn("Sync failed");
throw e;
}
finally {
this._tracker.enable();
--- a/services/sync/modules/engines/bookmarks.js
+++ b/services/sync/modules/engines/bookmarks.js
@@ -190,17 +190,17 @@ BookmarksStore.prototype = {
return true;
if (placeId == this._bms.unfiledBookmarksFolder)
return true;
if (this._bms.getFolderIdForItem(placeId) < 0)
return true;
return false;
},
- _itemExists: function BStore__itemExists(id) {
+ itemExists: function BStore_itemExists(id) {
return this._getItemIdForGUID(id) >= 0;
},
create: function BStore_create(record) {
let newId;
let parentId = this._getItemIdForGUID(record.parentid);
if (parentId < 0) {
@@ -225,24 +225,24 @@ BookmarksStore.prototype = {
this._ans.EXPIRE_NEVER);
}
if (record.cleartext.type == "microsummary") {
this._log.debug(" \-> is a microsummary");
this._ans.setItemAnnotation(newId, "bookmarks/staticTitle",
record.cleartext.staticTitle || "", 0, this._ans.EXPIRE_NEVER);
let genURI = Utils.makeURI(record.cleartext.generatorURI);
- if (this._ms == SERVICE_NOT_SUPPORTED) {
- this._log.warn("Can't create microsummary -- not supported.");
- } else {
+ if (this._ms) {
try {
let micsum = this._ms.createMicrosummary(uri, genURI);
this._ms.setMicrosummary(newId, micsum);
}
catch(ex) { /* ignore "missing local generator" exceptions */ }
+ } else {
+ this._log.warn("Can't create microsummary -- not supported.");
}
}
} break;
case "folder":
this._log.debug(" -> creating folder \"" + record.cleartext.title + "\"");
newId = this._bms.createFolder(parentId,
record.cleartext.title,
record.sortindex);
--- a/services/sync/modules/stores.js
+++ b/services/sync/modules/stores.js
@@ -92,47 +92,32 @@ Store.prototype = {
else if (!this.itemExists(rec.id))
this.create(rec);
else
this.update(rec);
};
fn.async(this, onComplete, record);
},
- itemExists: function Store_itemExists(id) {
- if (!this._itemCache)
- return this._itemExists(id);
-
- if (id in this._itemCache)
- return true;
- else
- return false;
- },
-
- // subclasses probably want to override this one
- _itemExists: function Store__itemExists(id) {
- return false;
- },
-
cacheItemsHint: function Store_cacheItemsHint() {
this._itemCache = this.wrap();
},
clearItemCacheHint: function Store_clearItemCacheHint() {
this._itemCache = null;
},
// override these in derived objects
- wrap: function Store_wrap() {
- throw "override wrap in a subclass";
+ itemExists: function Store_itemExists(id) {
+ throw "override itemExists in a subclass";
},
- wrapItem: function Store_wrapItem() {
- throw "override wrapItem in a subclass";
+ createRecord: function Store_createRecord() {
+ throw "override createRecord in a subclass";
},
wrapDepth: function BStore_wrapDepth(guid, items) {
if (typeof(items) == "undefined")
items = {};
for (let childguid in this._itemCache) {
if (this._itemCache[childguid].parentid == guid) {
items[childguid] = this._itemCache[childguid].depth;