--- a/dom/mobilemessage/gonk/MobileMessageDB.jsm
+++ b/dom/mobilemessage/gonk/MobileMessageDB.jsm
@@ -60,16 +60,19 @@ const READ_ONLY = "readonly";
const READ_WRITE = "readwrite";
const PREV = "prev";
const NEXT = "next";
const COLLECT_ID_END = 0;
const COLLECT_ID_ERROR = -1;
const COLLECT_TIMESTAMP_UNUSED = 0;
+// Default value for integer preference "dom.sms.maxReadAheadEntries".
+const DEFAULT_READ_AHEAD_ENTRIES = 7;
+
XPCOMUtils.defineLazyServiceGetter(this, "gMobileMessageService",
"@mozilla.org/mobilemessage/mobilemessageservice;1",
"nsIMobileMessageService");
XPCOMUtils.defineLazyServiceGetter(this, "gMMSService",
"@mozilla.org/mms/rilmmsservice;1",
"nsIMmsService");
@@ -3138,17 +3141,17 @@ MobileMessageDB.prototype = {
if (aThreadId) {
filter.threadId = aThreadId;
}
let cursor = new GetMessagesCursor(this, aCallback);
let self = this;
self.newTxn(READ_ONLY, function(error, txn, stores) {
- let collector = cursor.collector;
+ let collector = cursor.collector.idCollector;
let collect = collector.collect.bind(collector);
FilterSearcherHelper.transact(self, txn, error, filter, aReverse, collect);
}, [MESSAGE_STORE_NAME, PARTICIPANT_STORE_NAME]);
return cursor;
},
markMessageRead: function(messageId, value, aSendReadReport, aRequest) {
@@ -3247,17 +3250,17 @@ MobileMessageDB.prototype = {
}, [MESSAGE_STORE_NAME, THREAD_STORE_NAME]);
},
createThreadCursor: function(callback) {
if (DEBUG) debug("Getting thread list");
let cursor = new GetThreadsCursor(this, callback);
this.newTxn(READ_ONLY, function(error, txn, threadStore) {
- let collector = cursor.collector;
+ let collector = cursor.collector.idCollector;
if (error) {
collector.collect(null, COLLECT_ID_ERROR, COLLECT_TIMESTAMP_UNUSED);
return;
}
txn.onerror = function(event) {
if (DEBUG) debug("Caught error on transaction ", event.target.error.name);
collector.collect(null, COLLECT_ID_ERROR, COLLECT_TIMESTAMP_UNUSED);
};
@@ -3484,21 +3487,316 @@ let FilterSearcherHelper = {
this.filterIndex("participantIds", range, direction, txn,
unionCollector.newContext());
}
}).bind(this));
}
}
};
-function ResultsCollector() {
+/**
+ * Collector class for read-ahead result objects. Mmdb may now try to fetch
+ * message/thread records before it's requested explicitly.
+ *
+ * The read ahead behavior can be controlled by an integer mozSettings entry
+ * "ril.sms.maxReadAheadEntries" as well as an integer holding preference
+ * "dom.sms.maxReadAheadEntries". The meanings are:
+ *
+ * positive: finite read-ahead entries,
+ * 0: don't read ahead unless explicitly requested, (default)
+ * negative: read ahead all IDs if possible.
+ *
+ * The order of ID filtering objects are now:
+ *
+ * [UnionResultsCollector]
+ * +-> [IntersectionResultsCollector]
+ * +-> IDsCollector
+ * +-> ResultsCollector
+ *
+ * ResultsCollector has basically similar behaviour with IDsCollector. When
+ * RC::squeeze() is called, either RC::drip() is called instantly if we have
+ * already fetched results available, or the request is kept and IC::squeeze()
+ * is called.
+ *
+ * When RC::collect is called by IC::drip, it proceeds to fetch the
+ * corresponding record given that collected ID is neither an error nor an end
+ * mark. After the message/thread record being fetched, ResultsCollector::drip
+ * is called if we have pending request. Anyway, RC::maybeSqueezeIdCollector is
+ * called to determine whether we need to call IC::squeeze again.
+ *
+ * RC::squeeze is called when nsICursorContinueCallback::handleContinue() is
+ * called. ResultsCollector::drip will call to
+ * nsIMobileMessageCursorCallback::notifyFoo.
+ *
+ * In summary, the major call paths are:
+ *
+ * RC::squeeze
+ * o-> RC::drip
+ * +-> RC::notifyCallback
+ * +-> nsIMobileMessageCursorCallback::notifyFoo
+ * +-> RC::maybeSqueezeIdCollector
+ * o-> IC::squeeze
+ * o-> IC::drip
+ * +-> RC::collect
+ * o-> RC::readAhead
+ * +-> RC::notifyResult
+ * o-> RC::drip ...
+ * +-> RC::maybeSqueezeIdCollector ...
+ * o-> RC::notifyResult ...
+ */
+function ResultsCollector(readAheadFunc) {
+ this.idCollector = new IDsCollector();
+ this.results = [];
+ this.readAhead = readAheadFunc;
+
+ this.maxReadAhead = DEFAULT_READ_AHEAD_ENTRIES;
+ try {
+ // positive: finite read-ahead entries,
+ // 0: don't read ahead unless explicitly requested,
+ // negative: read ahead all IDs if possible.
+ this.maxReadAhead =
+ Services.prefs.getIntPref("dom.sms.maxReadAheadEntries");
+ } catch (e) {}
+}
+ResultsCollector.prototype = {
+ /**
+ * Underlying ID collector object.
+ */
+ idCollector: null,
+
+ /**
+ * An array keeping fetched result objects. Replaced by a new empty array
+ * every time when |this.drip| is called.
+ */
+ results: null,
+
+ /**
+ * A function that takes (<txn>, <id>, <collector>). It fetches the object
+ * specified by <id> and notify <collector> with that by calling
+ * |<collector>.notifyResult()|. If <txn> is null, this function should
+ * create a new read-only transaction itself. The returned result object may
+ * be null to indicate an error during the fetch process.
+ */
+ readAhead: null,
+
+ /**
+ * A boolean value inidicating a readAhead call is ongoing. Set before calling
+ * |this.readAhead| and reset in |this.notifyResult|.
+ */
+ readingAhead: false,
+
+ /**
+ * A numeric value read from preference "dom.sms.maxReadAheadEntries".
+ */
+ maxReadAhead: 0,
+
+ /**
+ * An active IDBTransaction object to be reused.
+ */
+ activeTxn: null,
+
+ /**
+ * A nsIMobileMessageCursorCallback.
+ */
+ requestWaiting: null,
+
+ /**
+ * A boolean value indicating either a COLLECT_ID_END or COLLECT_ID_ERROR has
+ * been received.
+ */
+ done: false,
+
+ /**
+ * When |this.done|, it's either COLLECT_ID_END or COLLECT_ID_ERROR.
+ */
+ lastId: null,
+
+ /**
+ * Receive collected id from IDsCollector and fetch the correspond result
+ * object if necessary.
+ *
+ * @param txn
+ * An IDBTransaction object. Null if there is no active transaction in
+ * IDsCollector. That is, the ID collecting transaction is completed.
+ * @param id
+ * A positive numeric id, COLLECT_ID_END(0), or COLLECT_ID_ERROR(-1).
+ */
+ collect: function(txn, id) {
+ if (this.done) {
+ // If this callector has been terminated because of previous errors in
+ // |this.readAhead|, ignore any further IDs from IDsCollector.
+ return;
+ }
+
+ if (DEBUG) debug("ResultsCollector::collect ID = " + id);
+
+ // Reuse the active transaction cached if IDsCollector has no active
+ // transaction.
+ txn = txn || this.activeTxn;
+
+ if (id > 0) {
+ this.readingAhead = true;
+ this.readAhead(txn, id, this);
+ } else {
+ this.notifyResult(txn, id, null);
+ }
+ },
+
+ /**
+ * Callback function for |this.readAhead|.
+ *
+ * This function pushes result object to |this.results| or updates
+ * |this.done|, |this.lastId| if an end mark or an error is found. Since we
+ * have already a valid result entry, check |this.requestWaiting| and deal
+ * with it. At last, call to |this.maybeSqueezeIdCollector| to ask more id
+ * again if necessary.
+ *
+ * @param txn
+ * An IDBTransaction object. Null if caller has no active transaction.
+ * @param id
+ * A positive numeric id, COLLECT_ID_END(0), or COLLECT_ID_ERROR(-1).
+ * @param result
+ * An object associated with id. Null if |this.readAhead| failed.
+ */
+ notifyResult: function(txn, id, result) {
+ if (DEBUG) debug("notifyResult(txn, " + id + ", <result>)");
+
+ this.readingAhead = false;
+
+ if (id > 0) {
+ if (result != null) {
+ this.results.push(result);
+ } else {
+ id = COLLECT_ID_ERROR;
+ }
+ }
+
+ if (id <= 0) {
+ this.lastId = id;
+ this.done = true;
+ }
+
+ if (!this.requestWaiting) {
+ if (DEBUG) debug("notifyResult: cursor.continue() not called yet");
+ } else {
+ let callback = this.requestWaiting;
+ this.requestWaiting = null;
+
+ this.drip(callback);
+ }
+
+ this.maybeSqueezeIdCollector(txn);
+ },
+
+ /**
+ * Request for one more ID if necessary.
+ *
+ * @param txn
+ * An IDBTransaction object. Null if caller has no active transaction.
+ */
+ maybeSqueezeIdCollector: function(txn) {
+ if (this.done || // Nothing to be read.
+ this.readingAhead || // Already in progress.
+ this.idCollector.requestWaiting) { // Already requested.
+ return;
+ }
+
+ let max = this.maxReadAhead;
+ if (!max && this.requestWaiting) {
+ // If |this.requestWaiting| is set, try to read ahead at least once.
+ max = 1;
+ }
+ if (max >= 0 && this.results.length >= max) {
+ // More-equal than <max> entries has been read. Stop.
+ if (DEBUG) debug("maybeSqueezeIdCollector: max " + max + " entries read. Stop.");
+ return;
+ }
+
+ // A hack to pass current txn to |this.collect| when it's called directly by
+ // |IDsCollector.squeeze|.
+ this.activeTxn = txn;
+ this.idCollector.squeeze(this.collect.bind(this));
+ this.activeTxn = null;
+ },
+
+ /**
+ * Request to pass available results or wait.
+ *
+ * @param callback
+ * A nsIMobileMessageCursorCallback.
+ */
+ squeeze: function(callback) {
+ if (this.requestWaiting) {
+ throw new Error("Already waiting for another request!");
+ }
+
+ if (this.results.length || this.done) {
+ // If |this.results.length| is non-zero, we have already some results to
+ // pass. Otherwise, if |this.done| evaluates to true, we have also a
+ // confirmed result to pass.
+ this.drip(callback);
+ } else {
+ this.requestWaiting = callback;
+ }
+
+ // If we called |this.drip| in the last step, the fetched results have been
+ // consumed and we should ask some more for read-ahead now.
+ //
+ // Otherwise, kick start read-ahead again because it may be stopped
+ // previously because of |this.maxReadAhead| had been reached.
+ this.maybeSqueezeIdCollector(null);
+ },
+
+ /**
+ * Consume fetched resutls.
+ *
+ * @param callback
+ * A nsIMobileMessageCursorCallback.
+ */
+ drip: function(callback) {
+ let results = this.results;
+ this.results = [];
+
+ let func = this.notifyCallback.bind(this, callback, results, this.lastId);
+ Services.tm.currentThread.dispatch(func, Ci.nsIThread.DISPATCH_NORMAL);
+ },
+
+ /**
+ * Notify a nsIMobileMessageCursorCallback.
+ *
+ * @param callback
+ * A nsIMobileMessageCursorCallback.
+ * @param results
+ * An array of result objects.
+ * @param lastId
+ * Since we only call |this.drip| when either there are results
+ * available or the read-ahead has done, so lastId here will be
+ * COLLECT_ID_END or COLLECT_ID_ERROR when results is empty and null
+ * otherwise.
+ */
+ notifyCallback: function(callback, results, lastId) {
+ if (DEBUG) {
+ debug("notifyCallback(results[" + results.length + "], " + lastId + ")");
+ }
+
+ if (results.length) {
+ callback.notifyCursorResult(results, results.length);
+ } else if (lastId == COLLECT_ID_END) {
+ callback.notifyCursorDone();
+ } else {
+ callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
+ }
+ }
+};
+
+function IDsCollector() {
this.results = [];
this.done = false;
}
-ResultsCollector.prototype = {
+IDsCollector.prototype = {
results: null,
requestWaiting: null,
done: null,
/**
* Queue up passed id, reply if necessary.
*
* @param txn
@@ -3512,30 +3810,26 @@ ResultsCollector.prototype = {
*
* @return true if expects more. false otherwise.
*/
collect: function(txn, id, timestamp) {
if (this.done) {
return false;
}
- if (DEBUG) {
- debug("collect: message ID = " + id);
- }
- if (id) {
- // Queue up any id but '0' and replies later accordingly.
- this.results.push(id);
- }
+ if (DEBUG) debug("IDsCollector::collect ID = " + id);
+ // Queue up any id.
+ this.results.push(id);
if (id <= 0) {
// No more processing on '0' or negative values passed.
this.done = true;
}
if (!this.requestWaiting) {
- if (DEBUG) debug("Cursor.continue() not called yet");
+ if (DEBUG) debug("IDsCollector::squeeze() not called yet");
return !this.done;
}
// We assume there is only one request waiting throughout the message list
// retrieving process. So we don't bother continuing to process further
// waiting requests here. This assumption comes from DOMCursor::Continue()
// implementation.
let callback = this.requestWaiting;
@@ -3570,32 +3864,21 @@ ResultsCollector.prototype = {
/**
* @param txn
* Ongoing IDBTransaction context object or null.
* @param callback
* A callback function that accepts a numeric id.
*/
drip: function(txn, callback) {
- if (!this.results.length) {
- if (DEBUG) debug("No messages matching the filter criteria");
- callback(txn, COLLECT_ID_END);
- return;
+ let firstId = this.results[0];
+ if (firstId > 0) {
+ this.results.shift();
}
-
- if (this.results[0] < 0) {
- // An previous error found. Keep the answer in results so that we can
- // reply INTERNAL_ERROR for further requests.
- if (DEBUG) debug("An previous error found");
- callback(txn, COLLECT_ID_ERROR);
- return;
- }
-
- let firstMessageId = this.results.shift();
- callback(txn, firstMessageId);
+ callback(txn, firstId);
}
};
function IntersectionResultsCollector(collect, reverse) {
this.cascadedCollect = collect;
this.reverse = reverse;
this.contexts = [];
}
@@ -3779,165 +4062,153 @@ UnionResultsCollector.prototype = {
this.contexts[1].processing++;
return this.collect.bind(this, 1);
}
};
function GetMessagesCursor(mmdb, callback) {
this.mmdb = mmdb;
this.callback = callback;
- this.collector = new ResultsCollector();
+ this.collector = new ResultsCollector(this.getMessage.bind(this));
this.handleContinue(); // Trigger first run.
}
GetMessagesCursor.prototype = {
classID: RIL_GETMESSAGESCURSOR_CID,
QueryInterface: XPCOMUtils.generateQI([Ci.nsICursorContinueCallback]),
mmdb: null,
callback: null,
collector: null,
- getMessageTxn: function(messageStore, messageId) {
+ getMessageTxn: function(txn, messageStore, messageId, collector) {
if (DEBUG) debug ("Fetching message " + messageId);
let getRequest = messageStore.get(messageId);
let self = this;
getRequest.onsuccess = function(event) {
if (DEBUG) {
debug("notifyNextMessageInListGot - messageId: " + messageId);
}
let domMessage =
self.mmdb.createDomMessageFromRecord(event.target.result);
- self.callback.notifyCursorResult([domMessage], 1);
+ collector.notifyResult(txn, messageId, domMessage);
};
getRequest.onerror = function(event) {
+ // Error reporting is done in ResultsCollector.notifyCallback.
+ event.stopPropagation();
+ event.preventDefault();
+
if (DEBUG) {
debug("notifyCursorError - messageId: " + messageId);
}
- self.callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
+ collector.notifyResult(txn, messageId, null);
};
},
- notify: function(txn, messageId) {
- if (!messageId) {
- this.callback.notifyCursorDone();
- return;
- }
-
- if (messageId < 0) {
- this.callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
- return;
- }
-
+ getMessage: function(txn, messageId, collector) {
// When filter transaction is not yet completed, we're called with current
// ongoing transaction object.
if (txn) {
let messageStore = txn.objectStore(MESSAGE_STORE_NAME);
- this.getMessageTxn(messageStore, messageId);
+ this.getMessageTxn(txn, messageStore, messageId, collector);
return;
}
// Or, we have to open another transaction ourselves.
let self = this;
this.mmdb.newTxn(READ_ONLY, function(error, txn, messageStore) {
if (error) {
- self.callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
- return;
+ debug("getMessage: failed to create new transaction");
+ collector.notifyResult(null, messageId, null);
+ } else {
+ self.getMessageTxn(txn, messageStore, messageId, collector);
}
- self.getMessageTxn(messageStore, messageId);
}, [MESSAGE_STORE_NAME]);
},
// nsICursorContinueCallback
handleContinue: function() {
if (DEBUG) debug("Getting next message in list");
- this.collector.squeeze(this.notify.bind(this));
+ this.collector.squeeze(this.callback);
}
};
function GetThreadsCursor(mmdb, callback) {
this.mmdb = mmdb;
this.callback = callback;
- this.collector = new ResultsCollector();
+ this.collector = new ResultsCollector(this.getThread.bind(this));
this.handleContinue(); // Trigger first run.
}
GetThreadsCursor.prototype = {
classID: RIL_GETTHREADSCURSOR_CID,
QueryInterface: XPCOMUtils.generateQI([Ci.nsICursorContinueCallback]),
mmdb: null,
callback: null,
collector: null,
- getThreadTxn: function(threadStore, threadId) {
+ getThreadTxn: function(txn, threadStore, threadId, collector) {
if (DEBUG) debug ("Fetching thread " + threadId);
let getRequest = threadStore.get(threadId);
- let self = this;
getRequest.onsuccess = function(event) {
let threadRecord = event.target.result;
if (DEBUG) {
debug("notifyCursorResult: " + JSON.stringify(threadRecord));
}
let thread =
gMobileMessageService.createThread(threadRecord.id,
threadRecord.participantAddresses,
threadRecord.lastTimestamp,
threadRecord.lastMessageSubject || "",
threadRecord.body,
threadRecord.unreadCount,
threadRecord.lastMessageType);
- self.callback.notifyCursorResult([thread], 1);
+ collector.notifyResult(txn, threadId, thread);
};
getRequest.onerror = function(event) {
+ // Error reporting is done in ResultsCollector.notifyCallback.
+ event.stopPropagation();
+ event.preventDefault();
+
if (DEBUG) {
debug("notifyCursorError - threadId: " + threadId);
}
- self.callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
+ collector.notifyResult(txn, threadId, null);
};
},
- notify: function(txn, threadId) {
- if (!threadId) {
- this.callback.notifyCursorDone();
- return;
- }
-
- if (threadId < 0) {
- this.callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
- return;
- }
-
+ getThread: function(txn, threadId, collector) {
// When filter transaction is not yet completed, we're called with current
// ongoing transaction object.
if (txn) {
let threadStore = txn.objectStore(THREAD_STORE_NAME);
- this.getThreadTxn(threadStore, threadId);
+ this.getThreadTxn(txn, threadStore, threadId, collector);
return;
}
// Or, we have to open another transaction ourselves.
let self = this;
this.mmdb.newTxn(READ_ONLY, function(error, txn, threadStore) {
if (error) {
- self.callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
- return;
+ collector.notifyResult(null, threadId, null);
+ } else {
+ self.getThreadTxn(txn, threadStore, threadId, collector);
}
- self.getThreadTxn(threadStore, threadId);
}, [THREAD_STORE_NAME]);
},
// nsICursorContinueCallback
handleContinue: function() {
if (DEBUG) debug("Getting next thread in list");
- this.collector.squeeze(this.notify.bind(this));
+ this.collector.squeeze(this.callback);
}
}
this.EXPORTED_SYMBOLS = [
'MobileMessageDB'
];
function debug() {