Bug 1057915 - Part 2: Gonk MobileMessage DB read ahead. r=bevistzeng, a=bajaj
authorVicamo Yang <vyang@mozilla.com>
Wed, 24 Sep 2014 20:17:00 -0400
changeset 225194 8d66c0771ce6890114350780d8fe89acda6efeaa
parent 225193 465cb43ed4476445ffe7f57293ef3982e029d890
child 225195 294e6eb3cd56cf2f867edb5188eb4435de5338c2
push id3979
push userraliiev@mozilla.com
push dateMon, 13 Oct 2014 16:35:44 +0000
treeherdermozilla-beta@30f2cc610691 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersbevistzeng, bajaj
bugs1057915
milestone34.0a2
Bug 1057915 - Part 2: Gonk MobileMessage DB read ahead. r=bevistzeng, a=bajaj
b2g/chrome/content/settings.js
dom/mobilemessage/gonk/MobileMessageDB.jsm
modules/libpref/init/all.js
--- a/b2g/chrome/content/settings.js
+++ b/b2g/chrome/content/settings.js
@@ -490,16 +490,20 @@ let settingsToObserve = {
   'ril.sms.requestStatusReport.enabled': {
     prefName: 'dom.sms.requestStatusReport',
     defaultValue: false
   },
   'ril.sms.strict7BitEncoding.enabled': {
     prefName: 'dom.sms.strict7BitEncoding',
     defaultValue: false
   },
+  'ril.sms.maxReadAheadEntries': {
+    prefName: 'dom.sms.maxReadAheadEntries',
+    defaultValue: 7
+  },
   'ui.touch.radius.leftmm': {
     resetToPref: true
   },
   'ui.touch.radius.topmm': {
     resetToPref: true
   },
   'ui.touch.radius.rightmm': {
     resetToPref: true
--- a/dom/mobilemessage/gonk/MobileMessageDB.jsm
+++ b/dom/mobilemessage/gonk/MobileMessageDB.jsm
@@ -60,16 +60,19 @@ const READ_ONLY = "readonly";
 const READ_WRITE = "readwrite";
 const PREV = "prev";
 const NEXT = "next";
 
 const COLLECT_ID_END = 0;
 const COLLECT_ID_ERROR = -1;
 const COLLECT_TIMESTAMP_UNUSED = 0;
 
+// Default value for integer preference "dom.sms.maxReadAheadEntries".
+const DEFAULT_READ_AHEAD_ENTRIES = 7;
+
 XPCOMUtils.defineLazyServiceGetter(this, "gMobileMessageService",
                                    "@mozilla.org/mobilemessage/mobilemessageservice;1",
                                    "nsIMobileMessageService");
 
 XPCOMUtils.defineLazyServiceGetter(this, "gMMSService",
                                    "@mozilla.org/mms/rilmmsservice;1",
                                    "nsIMmsService");
 
@@ -3138,17 +3141,17 @@ MobileMessageDB.prototype = {
     if (aThreadId) {
       filter.threadId = aThreadId;
     }
 
     let cursor = new GetMessagesCursor(this, aCallback);
 
     let self = this;
     self.newTxn(READ_ONLY, function(error, txn, stores) {
-      let collector = cursor.collector;
+      let collector = cursor.collector.idCollector;
       let collect = collector.collect.bind(collector);
       FilterSearcherHelper.transact(self, txn, error, filter, aReverse, collect);
     }, [MESSAGE_STORE_NAME, PARTICIPANT_STORE_NAME]);
 
     return cursor;
   },
 
   markMessageRead: function(messageId, value, aSendReadReport, aRequest) {
@@ -3247,17 +3250,17 @@ MobileMessageDB.prototype = {
     }, [MESSAGE_STORE_NAME, THREAD_STORE_NAME]);
   },
 
   createThreadCursor: function(callback) {
     if (DEBUG) debug("Getting thread list");
 
     let cursor = new GetThreadsCursor(this, callback);
     this.newTxn(READ_ONLY, function(error, txn, threadStore) {
-      let collector = cursor.collector;
+      let collector = cursor.collector.idCollector;
       if (error) {
         collector.collect(null, COLLECT_ID_ERROR, COLLECT_TIMESTAMP_UNUSED);
         return;
       }
       txn.onerror = function(event) {
         if (DEBUG) debug("Caught error on transaction ", event.target.error.name);
         collector.collect(null, COLLECT_ID_ERROR, COLLECT_TIMESTAMP_UNUSED);
       };
@@ -3484,21 +3487,316 @@ let FilterSearcherHelper = {
           this.filterIndex("participantIds", range, direction, txn,
                            unionCollector.newContext());
         }
       }).bind(this));
     }
   }
 };
 
-function ResultsCollector() {
+/**
+ * Collector class for read-ahead result objects. Mmdb may now try to fetch
+ * message/thread records before it's requested explicitly.
+ *
+ * The read ahead behavior can be controlled by an integer mozSettings entry
+ * "ril.sms.maxReadAheadEntries" as well as an integer holding preference
+ * "dom.sms.maxReadAheadEntries". The meanings are:
+ *
+ *   positive: finite read-ahead entries,
+ *   0: don't read ahead unless explicitly requested, (default)
+ *   negative: read ahead all IDs if possible.
+ *
+ * The order of ID filtering objects are now:
+ *
+ *   [UnionResultsCollector]
+ *   +-> [IntersectionResultsCollector]
+ *       +-> IDsCollector
+ *           +-> ResultsCollector
+ *
+ * ResultsCollector has basically similar behaviour with IDsCollector. When
+ * RC::squeeze() is called, either RC::drip() is called instantly if we have
+ * already fetched results available, or the request is kept and IC::squeeze()
+ * is called.
+ *
+ * When RC::collect is called by IC::drip, it proceeds to fetch the
+ * corresponding record given that collected ID is neither an error nor an end
+ * mark. After the message/thread record being fetched, ResultsCollector::drip
+ * is called if we have pending request. Anyway, RC::maybeSqueezeIdCollector is
+ * called to determine whether we need to call IC::squeeze again.
+ *
+ * RC::squeeze is called when nsICursorContinueCallback::handleContinue() is
+ * called. ResultsCollector::drip will call to
+ * nsIMobileMessageCursorCallback::notifyFoo.
+ *
+ * In summary, the major call paths are:
+ *
+ *   RC::squeeze
+ *   o-> RC::drip
+ *       +-> RC::notifyCallback
+ *           +-> nsIMobileMessageCursorCallback::notifyFoo
+ *   +-> RC::maybeSqueezeIdCollector
+ *       o-> IC::squeeze
+ *           o-> IC::drip
+ *               +-> RC::collect
+ *                   o-> RC::readAhead
+ *                       +-> RC::notifyResult
+ *                           o-> RC::drip ...
+ *                           +-> RC::maybeSqueezeIdCollector ...
+ *                   o-> RC::notifyResult ...
+ */
+function ResultsCollector(readAheadFunc) {
+  this.idCollector = new IDsCollector();
+  this.results = [];
+  this.readAhead = readAheadFunc;
+
+  this.maxReadAhead = DEFAULT_READ_AHEAD_ENTRIES;
+  try {
+    // positive: finite read-ahead entries,
+    // 0: don't read ahead unless explicitly requested,
+    // negative: read ahead all IDs if possible.
+    this.maxReadAhead =
+      Services.prefs.getIntPref("dom.sms.maxReadAheadEntries");
+  } catch (e) {}
+}
+ResultsCollector.prototype = {
+  /**
+   * Underlying ID collector object.
+   */
+  idCollector: null,
+
+  /**
+   * An array keeping fetched result objects. Replaced by a new empty array
+   * every time when |this.drip| is called.
+   */
+  results: null,
+
+  /**
+   * A function that takes (<txn>, <id>, <collector>). It fetches the object
+   * specified by <id> and notify <collector> with that by calling
+   * |<collector>.notifyResult()|. If <txn> is null, this function should
+   * create a new read-only transaction itself. The returned result object may
+   * be null to indicate an error during the fetch process.
+   */
+  readAhead: null,
+
+  /**
+   * A boolean value inidicating a readAhead call is ongoing. Set before calling
+   * |this.readAhead| and reset in |this.notifyResult|.
+   */
+  readingAhead: false,
+
+  /**
+   * A numeric value read from preference "dom.sms.maxReadAheadEntries".
+   */
+  maxReadAhead: 0,
+
+  /**
+   * An active IDBTransaction object to be reused.
+   */
+  activeTxn: null,
+
+  /**
+   * A nsIMobileMessageCursorCallback.
+   */
+  requestWaiting: null,
+
+  /**
+   * A boolean value indicating either a COLLECT_ID_END or COLLECT_ID_ERROR has
+   * been received.
+   */
+  done: false,
+
+  /**
+   * When |this.done|, it's either COLLECT_ID_END or COLLECT_ID_ERROR.
+   */
+  lastId: null,
+
+  /**
+   * Receive collected id from IDsCollector and fetch the correspond result
+   * object if necessary.
+   *
+   * @param txn
+   *        An IDBTransaction object. Null if there is no active transaction in
+   *        IDsCollector. That is, the ID collecting transaction is completed.
+   * @param id
+   *        A positive numeric id, COLLECT_ID_END(0), or COLLECT_ID_ERROR(-1).
+   */
+  collect: function(txn, id) {
+    if (this.done) {
+      // If this callector has been terminated because of previous errors in
+      // |this.readAhead|, ignore any further IDs from IDsCollector.
+      return;
+    }
+
+    if (DEBUG) debug("ResultsCollector::collect ID = " + id);
+
+    // Reuse the active transaction cached if IDsCollector has no active
+    // transaction.
+    txn = txn || this.activeTxn;
+
+    if (id > 0) {
+      this.readingAhead = true;
+      this.readAhead(txn, id, this);
+    } else {
+      this.notifyResult(txn, id, null);
+    }
+  },
+
+  /**
+   * Callback function for |this.readAhead|.
+   *
+   * This function pushes result object to |this.results| or updates
+   * |this.done|, |this.lastId| if an end mark or an error is found. Since we
+   * have already a valid result entry, check |this.requestWaiting| and deal
+   * with it. At last, call to |this.maybeSqueezeIdCollector| to ask more id
+   * again if necessary.
+   *
+   * @param txn
+   *        An IDBTransaction object. Null if caller has no active transaction.
+   * @param id
+   *        A positive numeric id, COLLECT_ID_END(0), or COLLECT_ID_ERROR(-1).
+   * @param result
+   *        An object associated with id. Null if |this.readAhead| failed.
+   */
+  notifyResult: function(txn, id, result) {
+    if (DEBUG) debug("notifyResult(txn, " + id + ", <result>)");
+
+    this.readingAhead = false;
+
+    if (id > 0) {
+      if (result != null) {
+        this.results.push(result);
+      } else {
+        id = COLLECT_ID_ERROR;
+      }
+    }
+
+    if (id <= 0) {
+      this.lastId = id;
+      this.done = true;
+    }
+
+    if (!this.requestWaiting) {
+      if (DEBUG) debug("notifyResult: cursor.continue() not called yet");
+    } else {
+      let callback = this.requestWaiting;
+      this.requestWaiting = null;
+
+      this.drip(callback);
+    }
+
+    this.maybeSqueezeIdCollector(txn);
+  },
+
+  /**
+   * Request for one more ID if necessary.
+   *
+   * @param txn
+   *        An IDBTransaction object. Null if caller has no active transaction.
+   */
+  maybeSqueezeIdCollector: function(txn) {
+    if (this.done || // Nothing to be read.
+        this.readingAhead || // Already in progress.
+        this.idCollector.requestWaiting) { // Already requested.
+      return;
+    }
+
+    let max = this.maxReadAhead;
+    if (!max && this.requestWaiting) {
+      // If |this.requestWaiting| is set, try to read ahead at least once.
+      max = 1;
+    }
+    if (max >= 0 && this.results.length >= max) {
+      // More-equal than <max> entries has been read. Stop.
+      if (DEBUG) debug("maybeSqueezeIdCollector: max " + max + " entries read. Stop.");
+      return;
+    }
+
+    // A hack to pass current txn to |this.collect| when it's called directly by
+    // |IDsCollector.squeeze|.
+    this.activeTxn = txn;
+    this.idCollector.squeeze(this.collect.bind(this));
+    this.activeTxn = null;
+  },
+
+  /**
+   * Request to pass available results or wait.
+   *
+   * @param callback
+   *        A nsIMobileMessageCursorCallback.
+   */
+  squeeze: function(callback) {
+    if (this.requestWaiting) {
+      throw new Error("Already waiting for another request!");
+    }
+
+    if (this.results.length || this.done) {
+      // If |this.results.length| is non-zero, we have already some results to
+      // pass. Otherwise, if |this.done| evaluates to true, we have also a
+      // confirmed result to pass.
+      this.drip(callback);
+    } else {
+      this.requestWaiting = callback;
+    }
+
+    // If we called |this.drip| in the last step, the fetched results have been
+    // consumed and we should ask some more for read-ahead now.
+    //
+    // Otherwise, kick start read-ahead again because it may be stopped
+    // previously because of |this.maxReadAhead| had been reached.
+    this.maybeSqueezeIdCollector(null);
+  },
+
+  /**
+   * Consume fetched resutls.
+   *
+   * @param callback
+   *        A nsIMobileMessageCursorCallback.
+   */
+  drip: function(callback) {
+    let results = this.results;
+    this.results = [];
+
+    let func = this.notifyCallback.bind(this, callback, results, this.lastId);
+    Services.tm.currentThread.dispatch(func, Ci.nsIThread.DISPATCH_NORMAL);
+  },
+
+  /**
+   * Notify a nsIMobileMessageCursorCallback.
+   *
+   * @param callback
+   *        A nsIMobileMessageCursorCallback.
+   * @param results
+   *        An array of result objects.
+   * @param lastId
+   *        Since we only call |this.drip| when either there are results
+   *        available or the read-ahead has done, so lastId here will be
+   *        COLLECT_ID_END or COLLECT_ID_ERROR when results is empty and null
+   *        otherwise.
+   */
+  notifyCallback: function(callback, results, lastId) {
+    if (DEBUG) {
+      debug("notifyCallback(results[" + results.length + "], " + lastId + ")");
+    }
+
+    if (results.length) {
+      callback.notifyCursorResult(results, results.length);
+    } else if (lastId == COLLECT_ID_END) {
+      callback.notifyCursorDone();
+    } else {
+      callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
+    }
+  }
+};
+
+function IDsCollector() {
   this.results = [];
   this.done = false;
 }
-ResultsCollector.prototype = {
+IDsCollector.prototype = {
   results: null,
   requestWaiting: null,
   done: null,
 
   /**
    * Queue up passed id, reply if necessary.
    *
    * @param txn
@@ -3512,30 +3810,26 @@ ResultsCollector.prototype = {
    *
    * @return true if expects more. false otherwise.
    */
   collect: function(txn, id, timestamp) {
     if (this.done) {
       return false;
     }
 
-    if (DEBUG) {
-      debug("collect: message ID = " + id);
-    }
-    if (id) {
-      // Queue up any id but '0' and replies later accordingly.
-      this.results.push(id);
-    }
+    if (DEBUG) debug("IDsCollector::collect ID = " + id);
+    // Queue up any id.
+    this.results.push(id);
     if (id <= 0) {
       // No more processing on '0' or negative values passed.
       this.done = true;
     }
 
     if (!this.requestWaiting) {
-      if (DEBUG) debug("Cursor.continue() not called yet");
+      if (DEBUG) debug("IDsCollector::squeeze() not called yet");
       return !this.done;
     }
 
     // We assume there is only one request waiting throughout the message list
     // retrieving process. So we don't bother continuing to process further
     // waiting requests here. This assumption comes from DOMCursor::Continue()
     // implementation.
     let callback = this.requestWaiting;
@@ -3570,32 +3864,21 @@ ResultsCollector.prototype = {
 
   /**
    * @param txn
    *        Ongoing IDBTransaction context object or null.
    * @param callback
    *        A callback function that accepts a numeric id.
    */
   drip: function(txn, callback) {
-    if (!this.results.length) {
-      if (DEBUG) debug("No messages matching the filter criteria");
-      callback(txn, COLLECT_ID_END);
-      return;
+    let firstId = this.results[0];
+    if (firstId > 0) {
+      this.results.shift();
     }
-
-    if (this.results[0] < 0) {
-      // An previous error found. Keep the answer in results so that we can
-      // reply INTERNAL_ERROR for further requests.
-      if (DEBUG) debug("An previous error found");
-      callback(txn, COLLECT_ID_ERROR);
-      return;
-    }
-
-    let firstMessageId = this.results.shift();
-    callback(txn, firstMessageId);
+    callback(txn, firstId);
   }
 };
 
 function IntersectionResultsCollector(collect, reverse) {
   this.cascadedCollect = collect;
   this.reverse = reverse;
   this.contexts = [];
 }
@@ -3779,165 +4062,153 @@ UnionResultsCollector.prototype = {
     this.contexts[1].processing++;
     return this.collect.bind(this, 1);
   }
 };
 
 function GetMessagesCursor(mmdb, callback) {
   this.mmdb = mmdb;
   this.callback = callback;
-  this.collector = new ResultsCollector();
+  this.collector = new ResultsCollector(this.getMessage.bind(this));
 
   this.handleContinue(); // Trigger first run.
 }
 GetMessagesCursor.prototype = {
   classID: RIL_GETMESSAGESCURSOR_CID,
   QueryInterface: XPCOMUtils.generateQI([Ci.nsICursorContinueCallback]),
 
   mmdb: null,
   callback: null,
   collector: null,
 
-  getMessageTxn: function(messageStore, messageId) {
+  getMessageTxn: function(txn, messageStore, messageId, collector) {
     if (DEBUG) debug ("Fetching message " + messageId);
 
     let getRequest = messageStore.get(messageId);
     let self = this;
     getRequest.onsuccess = function(event) {
       if (DEBUG) {
         debug("notifyNextMessageInListGot - messageId: " + messageId);
       }
       let domMessage =
         self.mmdb.createDomMessageFromRecord(event.target.result);
-      self.callback.notifyCursorResult([domMessage], 1);
+      collector.notifyResult(txn, messageId, domMessage);
     };
     getRequest.onerror = function(event) {
+      // Error reporting is done in ResultsCollector.notifyCallback.
+      event.stopPropagation();
+      event.preventDefault();
+
       if (DEBUG) {
         debug("notifyCursorError - messageId: " + messageId);
       }
-      self.callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
+      collector.notifyResult(txn, messageId, null);
     };
   },
 
-  notify: function(txn, messageId) {
-    if (!messageId) {
-      this.callback.notifyCursorDone();
-      return;
-    }
-
-    if (messageId < 0) {
-      this.callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
-      return;
-    }
-
+  getMessage: function(txn, messageId, collector) {
     // When filter transaction is not yet completed, we're called with current
     // ongoing transaction object.
     if (txn) {
       let messageStore = txn.objectStore(MESSAGE_STORE_NAME);
-      this.getMessageTxn(messageStore, messageId);
+      this.getMessageTxn(txn, messageStore, messageId, collector);
       return;
     }
 
     // Or, we have to open another transaction ourselves.
     let self = this;
     this.mmdb.newTxn(READ_ONLY, function(error, txn, messageStore) {
       if (error) {
-        self.callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
-        return;
+        debug("getMessage: failed to create new transaction");
+        collector.notifyResult(null, messageId, null);
+      } else {
+        self.getMessageTxn(txn, messageStore, messageId, collector);
       }
-      self.getMessageTxn(messageStore, messageId);
     }, [MESSAGE_STORE_NAME]);
   },
 
   // nsICursorContinueCallback
 
   handleContinue: function() {
     if (DEBUG) debug("Getting next message in list");
-    this.collector.squeeze(this.notify.bind(this));
+    this.collector.squeeze(this.callback);
   }
 };
 
 function GetThreadsCursor(mmdb, callback) {
   this.mmdb = mmdb;
   this.callback = callback;
-  this.collector = new ResultsCollector();
+  this.collector = new ResultsCollector(this.getThread.bind(this));
 
   this.handleContinue(); // Trigger first run.
 }
 GetThreadsCursor.prototype = {
   classID: RIL_GETTHREADSCURSOR_CID,
   QueryInterface: XPCOMUtils.generateQI([Ci.nsICursorContinueCallback]),
 
   mmdb: null,
   callback: null,
   collector: null,
 
-  getThreadTxn: function(threadStore, threadId) {
+  getThreadTxn: function(txn, threadStore, threadId, collector) {
     if (DEBUG) debug ("Fetching thread " + threadId);
 
     let getRequest = threadStore.get(threadId);
-    let self = this;
     getRequest.onsuccess = function(event) {
       let threadRecord = event.target.result;
       if (DEBUG) {
         debug("notifyCursorResult: " + JSON.stringify(threadRecord));
       }
       let thread =
         gMobileMessageService.createThread(threadRecord.id,
                                            threadRecord.participantAddresses,
                                            threadRecord.lastTimestamp,
                                            threadRecord.lastMessageSubject || "",
                                            threadRecord.body,
                                            threadRecord.unreadCount,
                                            threadRecord.lastMessageType);
-      self.callback.notifyCursorResult([thread], 1);
+      collector.notifyResult(txn, threadId, thread);
     };
     getRequest.onerror = function(event) {
+      // Error reporting is done in ResultsCollector.notifyCallback.
+      event.stopPropagation();
+      event.preventDefault();
+
       if (DEBUG) {
         debug("notifyCursorError - threadId: " + threadId);
       }
-      self.callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
+      collector.notifyResult(txn, threadId, null);
     };
   },
 
-  notify: function(txn, threadId) {
-    if (!threadId) {
-      this.callback.notifyCursorDone();
-      return;
-    }
-
-    if (threadId < 0) {
-      this.callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
-      return;
-    }
-
+  getThread: function(txn, threadId, collector) {
     // When filter transaction is not yet completed, we're called with current
     // ongoing transaction object.
     if (txn) {
       let threadStore = txn.objectStore(THREAD_STORE_NAME);
-      this.getThreadTxn(threadStore, threadId);
+      this.getThreadTxn(txn, threadStore, threadId, collector);
       return;
     }
 
     // Or, we have to open another transaction ourselves.
     let self = this;
     this.mmdb.newTxn(READ_ONLY, function(error, txn, threadStore) {
       if (error) {
-        self.callback.notifyCursorError(Ci.nsIMobileMessageCallback.INTERNAL_ERROR);
-        return;
+        collector.notifyResult(null, threadId, null);
+      } else {
+        self.getThreadTxn(txn, threadStore, threadId, collector);
       }
-      self.getThreadTxn(threadStore, threadId);
     }, [THREAD_STORE_NAME]);
   },
 
   // nsICursorContinueCallback
 
   handleContinue: function() {
     if (DEBUG) debug("Getting next thread in list");
-    this.collector.squeeze(this.notify.bind(this));
+    this.collector.squeeze(this.callback);
   }
 }
 
 this.EXPORTED_SYMBOLS = [
   'MobileMessageDB'
 ];
 
 function debug() {
--- a/modules/libpref/init/all.js
+++ b/modules/libpref/init/all.js
@@ -3960,16 +3960,22 @@ pref("dom.image.picture.enabled", false)
 pref("dom.sms.enabled", false);
 // Enable Latin characters replacement with corresponding ones in GSM SMS
 // 7-bit default alphabet.
 pref("dom.sms.strict7BitEncoding", false);
 pref("dom.sms.requestStatusReport", true);
 // Numeric default service id for SMS API calls with |serviceId| parameter
 // omitted.
 pref("dom.sms.defaultServiceId", 0);
+// MobileMessage GetMessages/GetThreads read ahead aggressiveness.
+//
+// positive: finite read-ahead entries,
+// 0: don't read ahead unless explicitly requested, (default)
+// negative: read ahead all IDs if possible.
+pref("dom.sms.maxReadAheadEntries", 0);
 
 // WebContacts
 pref("dom.mozContacts.enabled", false);
 
 // WebAlarms
 pref("dom.mozAlarms.enabled", false);
 
 // SimplePush