Refactor indexing to use generators to simplify the logic while changing our stable-with-mods
authorAndrew Sutherland <asutherland@asutherland.org>
Sat, 26 Jul 2008 17:59:41 -0700
changeset 858 08c2e01d2acf8333311f497d050b22b01e72e460
parent 857 b41840410c792a9a86579e340baa860f1ff2922d
child 859 57fe6d540a5852994902b9c0703d8b3c39978313
push idunknown
push userunknown
push dateunknown
Refactor indexing to use generators to simplify the logic while changing our logic to only have one outstanding body fetch at a time. (news.mozilla.org was getting mad at our aggressive fetching).
components/jsmimeemitter.js
modules/datastore.js
modules/gloda.js
modules/indexer.js
--- a/components/jsmimeemitter.js
+++ b/components/jsmimeemitter.js
@@ -48,16 +48,27 @@ Components.utils.import("resource://gre/
  *  provide a javascript-accessible representation of the message.
  *
  * Processing occurs in two passes.  During the first pass, libmime is parsing
  *  the stream it is receiving, and generating header and body events for all
  *  MimeMessage instances it encounters.  This provides us with the knowledge
  *  of each nested message in addition to the top level message, their headers
  *  and sort-of their bodies.  The sort-of is that we may get more than
  *  would normally be displayed in cases involving multipart/alternatives.
+ * During the second pass, the libmime object model is traversed, generating
+ *  attachment notifications for all leaf nodes.  From our perspective, this
+ *  means file attachments and embedded messages (message/rfc822).  We use this
+ *  pass to create the attachment objects and properly structure the MIME part
+ *  hierarchy.  We extract the 'part name' (ex: 1.2.2.1) from the URL provided
+ *  with the attacment and rely on the fact that the attachment notifications
+ *  are generated as the result of an in-order traversal of the hierarchy.  We
+ *  generate MimeUnknown instances for apparent leaf nodes (nodes for whom
+ *  we did not hear about and do not know of any of their children), and
+ *  MimeContainer instances for apparent container nodes (nodes for whom we
+ *  know about one or more children).
  */
 function MimeMessageEmitter() {
   this._mimeMsg = {};
   Components.utils.import("resource://gloda/modules/mimemsg.js",
                           this._mimeMsg);
 
   this._url = null;
   this._channel = null;
--- a/modules/datastore.js
+++ b/modules/datastore.js
@@ -515,19 +515,20 @@ let GlodaDatastore = {
       "UPDATE folderLocations SET folderURI = :newFolderURI \
               WHERE folderURI = :oldFolderURI");
     this.__defineGetter__("_updateFolderLocationStatement",
       function() statement);
     return this._updateFolderLocationStatement;
   },
   
   renameFolder: function gloda_ds_renameFolder(aOldURI, aNewURI) {
-    let folderID = this._folderURIs[aOldURI];
+    let folderID = this._mapFolderURI(aOldURI); // ensure the URI is mapped...
     this._folderURIs[aNewURI] = folderID;
     this._folderIDs[folderID] = aNewURI;
+    this._log.info("renaming folder URI " + aOldURI + " to " + aNewURI);
     this._updateFolderLocationStatement.params.oldFolderURI = aOldURI;
     this._updateFolderLocationStatement.params.newFolderURI = aNewURI;
     this._updateFolderLocationStatement.execute();
     delete this._folderURIs[aOldURI];
   },
   
   deleteFolderByID: function gloda_ds_deleteFolder(aFolderID) {
     let statement = this._createStatement(
--- a/modules/gloda.js
+++ b/modules/gloda.js
@@ -179,16 +179,26 @@ let Gloda = {
   NOUN_IDENTITY: 104,
   
   /** Next Noun ID to hand out, these don't need to be persisted (for now). */
   _nextNounID: 1000,
 
   _nounNameToNounID: {},
   _nounIDToMeta: {},
   
+  /**
+   * Define a noun.  Takes a dictionary with the following keys/values:
+   *
+   * @param name The name of the noun.  This is not a display name (anything
+   *     being displayed needs to be localized, after all), but simply the
+   *     canonical name for debugging purposes and for people to pass to
+   *     lookupNoun.  The suggested convention is lower-case-dash-delimited,
+   *     with names being singular (since it's a single noun we are referring
+   *     to.)
+   */
   defineNoun: function gloda_ns_defineNoun(aNounMeta) {
     let nounID = this._nextNounID++;
     this._nounNameToNounID[aNounDef.name] = nounID; 
     this._nounIDToMeta[nounID] = aNounMeta;
   },
   
   /**
    * Lookup a noun (ID) suitable for passing to defineAttribute's various
--- a/modules/indexer.js
+++ b/modules/indexer.js
@@ -128,16 +128,29 @@ function IndexingJob(aJobType, aDeltaTyp
   this.jobType = aJobType;
   this.deltaType = aDeltaType;
   this.id = aID;
   this.items = [];
   this.offset = 0;
   this.goal = null;
 }
 
+/** Synchronous activities performed, you can drive us more. */
+const kWorkSync = 0;
+/**
+ * Asynchronous activity performed, you need to relinquish flow control and
+ *  trust us to call callbackDriver later.
+ */
+const kWorkAsync = 1;
+/**
+ * We are all done with our task, close us and figure out something else to do.
+ */
+const kWorkDone = 2;
+
+
 let GlodaIndexer = {
   _datastore: GlodaDatastore,
   _log: Log4Moz.Service.getLogger("gloda.indexer"),
   _strBundle: null,
   _messenger: null,
   _msgwindow: null,
   _domWindow: null,
 
@@ -198,17 +211,19 @@ let GlodaIndexer = {
   /** Track whether indexing is active (we have timers in-flight). */
   _indexingActive: false,
   get indexing() { return this._indexingActive; },
   /** You can turn on indexing, but you can't turn it off! */
   set indexing(aShouldIndex) {
     if (!this._indexingActive && aShouldIndex) {
       this._log.info("+++ Indexing Queue Processing Commencing");
       this._indexingActive = true;
-      this._domWindow.setTimeout(this._wrapIncrementalIndex, this._indexInterval, this);
+      this._domWindow.setTimeout(this._wrapCallbackDriver,
+                                 this._indexInterval,
+                                 this);
     }  
   },
   
   /**
    * Our current job number, out of _indexingJobGoal.  Although our jobs comes
    *  from _indexQueue, this is not an offset into that list because we forget
    *  jobs once we complete them.  As such, this value is strictly for progress
    *  tracking.
@@ -285,32 +300,64 @@ let GlodaIndexer = {
                 null, 0, 1, 0, 1);
     return aListener;
   },
   removeListener: function gloda_index_removeListener(aListener) {
     let index = this._indexListeners.indexOf(aListener);
     if (index != -1)
       this._indexListeners.splice(index, 1);
   },
-  _notifyListeners: function gloda_index_notifyListeners(aStatus, aFolderName,
-      aFolderIndex, aFoldersTotal, aMessageIndex, aMessagesTotal) {
+  _notifyListeners: function gloda_index_notifyListeners() {
+    let status, prettyName, jobIndex, jobTotal, jobItemIndex, jobItemGoal;
+    
+    if (this._indexingActive && this._curIndexingJob) {
+      let job = this._curIndexingJob;
+      if (job.deltaType > 0)
+        status = this._strBundle.getString("actionIndexing");
+      else if (job.deltaType == 0)
+        status = this._strBundle.getString("actionMoving");
+      else
+        status = this._strBundle.getString("actionDeindexing");
+        
+      let prettyName;
+      if (this._indexingFolder !== null)
+        prettyName = this._indexingFolder.prettiestName;
+      else
+        prettyName =
+          this._strBundle.getString("messageIndexingExplanation");
+
+      status = status + " " + prettyName;
+
+      jobIndex = this._indexingJobCount-1;
+      jobTotal = this._indexingJobGoal;
+      jobItemIndex = job.offset;
+      jobItemGoal  = job.goal;
+    }
+    else {
+      status = this._strBundle.getString("actionIdle");
+      folderName = null;
+      jobIndex = 0;
+      jobTotal = 1;
+      jobItemIndex = 0;
+      jobItemGoal = 1;
+    }
+      
     for (let iListener=this._indexListeners.length-1; iListener >= 0; 
          iListener--) {
       let listener = this._indexListeners[iListener];
-      listener(aStatus, aFolderName, aFolderIndex, aFoldersTotal, aMessageIndex,
-               aMessagesTotal);
+      listener(status, prettyName, jobIndex, jobTotal, jobItemIndex,
+               jobItemGoal);
     }
   },
   
   _indexingFolderID: null,
   _indexingFolder: null,
   _indexingDatabase: null,
   _indexingIterator: null,
   
-  _pendingAsyncOps: 0,
   /** folder whose entry we are pending on */
   _pendingFolderEntry: null,
   
   /**
    * Common logic that we want to deal with the given folder ID.  Besides
    *  cutting down on duplicate code, this ensures that we are listening on
    *  the folder in case it tries to go away when we are using it.
    *
@@ -350,27 +397,29 @@ let GlodaIndexer = {
         this._indexingFolder.updateFolder(this._msgWindow);
       }
       catch ( e if e.result == 0xc1f30001) {
         // this means that we need to pend on the update.
         this._log.debug("Pending on folder load...");
         this._pendingFolderEntry = this._indexingFolder;
         this._indexingFolder = null;
         this._indexingFolderID = null;
-        return false;
+        this._indexingDatabase = null;
+        this._indexingIterator = null;
+        return kWorkAsync;
       }
       // we get an nsIMsgDatabase out of this (unsurprisingly) which
       //  explicitly inherits from nsIDBChangeAnnouncer, which has the
       //  AddListener call we want.
       this._indexingDatabase = folder.getMsgDatabase(this._msgWindow);
       if (aNeedIterator)
-        this._indexingIterator = Iterator(fixIterator(
+        this._indexingIterator = fixIterator(
                                    //folder.getMessages(this._msgWindow),
                                    this._indexingDatabase.EnumerateMessages(),
-                                   Ci.nsIMsgDBHdr));
+                                   Ci.nsIMsgDBHdr);
       this._databaseAnnouncerListener.indexer = this;
       this._indexingDatabase.AddListener(this._databaseAnnouncerListener);
     }
     catch (ex) {
       this._log.error("Problem entering folder: " +
                       folder.prettiestName + ", skipping.");
       this._log.error("Error was: " + ex);
       this._indexingFolder = null;
@@ -378,17 +427,17 @@ let GlodaIndexer = {
       this._indexingDatabase = null;
       this._indexingIterator = null;
       
       // re-throw, we just wanted to make sure this junk is cleaned up and
       //  get localized error logging...
       throw ex;
     }
     
-    return true;
+    return kWorkSync;
   },
   
   _indexerLeaveFolder: function gloda_index_indexerLeaveFolder(aExpected) {
     if (this._indexingFolder !== null) {
       // remove our listener!
       this._indexingDatabase.RemoveListener(this._databaseAnnouncerListener);
       // null everyone out
       this._indexingFolder = null;
@@ -411,245 +460,283 @@ let GlodaIndexer = {
    *
    * @param aFolder An nsIMsgFolder, already QI'd.
    */
   _onFolderLoaded: function gloda_index_onFolderLoaded(aFolder) {
     if ((this._pendingFolderEntry !== null) &&
         (aFolder.URI == this._pendingFolderEntry.URI)) {
       this._log.debug("...Folder Loaded!");
       this._pendingFolderEntry = null;
-      this.incrementalIndex();
+      this.callbackDriver();
     }
   },
   
   /**
    * A simple wrapper to make 'this' be right for incrementalIndex.
    */
-  _wrapIncrementalIndex: function gloda_index_wrapIncrementalIndex(aThis) {
-    aThis.incrementalIndex();
+  _wrapCallbackDriver: function gloda_index_wrapCallbackDriver(aThis) {
+    aThis.callbackDriver();
   },
-  
+
+  /**
+   * The current processing 'batch' generator, produced by a call to workBatch()
+   *  and used by callbackDriver to drive execution.
+   */
+  _batch: null,
+  _inCallback: false,
   /**
-   * The incremental indexing core logic, responsible for performing work on
-   *  the current job and de-queueing new jobs as needed, while trying to
-   *  keeping our time-slice reasonable.  We use 'tokens' to track cost/activity
-   *  now, but could move to something more explicit such as using a timer.
+   * The root work-driver.  callbackDriver creates workBatch generator instances
+   *  (stored in _batch) which run until they are done (kWorkDone) or they
+   *  (really the embedded _actualWorker) encounter something asynchronous.
+   *  The convention is that all the callback handlers end up calling us,
+   *  ensuring that control-flow properly resumes.  If the batch completes,
+   *  we re-schedule ourselves after a time delay (controlled by _indexInterval)
+   *  and return.
    */
-  incrementalIndex: function gloda_index_incrementalIndex() {
-    this._log.debug("index wake-up!");
-  
-    GlodaDatastore._beginTransaction();
+  callbackDriver: function gloda_index_callbackDriver() {
+    // it is conceivable that someone we call will call something that in some
+    //  cases might be asynchronous, and in other cases immediately generate
+    //  events without returning.  In the interest of (stack-depth) sanity,
+    //  let's handle this by performing a minimal time-delay callback.
+    if (this._inCallback) {
+      this._domWindow.setTimeout(this._wrapCallbackDriver, 0, this);
+      return;
+    }
+    this._inCallback = true;
+
     try {
-      let job = this._curIndexingJob;
-      for (let tokensLeft=this._indexTokens; tokensLeft > 0; tokensLeft--) {
-        // --- Do we need a job?
-        if (job === null) {
-          // --- Are there any jobs left?
-          if (this._indexQueue.length == 0) {
-            this._log.info("--- Done indexing, disabling timer renewal.");
-            
-            if (this._indexingFolder !== null) {
-              this._indexerLeaveFolder(true);
-            }
-            
-            this._indexingActive = false;
-            this._indexingJobCount = 0;
-            this._indexingJobGoal = 0;
-            this._notifyListeners(this._strBundle.getString("actionIdle"), null,
-                                  0, 1, 0, 1);
-            break;
-          }
-          
-          // --- Get a job
-          else {
-            try {
-              this._log.debug("Pulling job from queue of size " +
-                              this._indexQueue.length);
-              job = this._curIndexingJob = this._indexQueue.shift();
-              this._indexingJobCount++;
-              this._log.debug("Pulled job: " + job.jobType + ", " +
-                              job.deltaType + ", " + job.id);
-              // (Prepare for the job...)
-              if (job.jobType == "folder") {
-                // -- FOLDER ADD
-                if (job.deltaType > 0) {
-                  if(!this._indexerEnterFolder(job.id, true)) {
-                    // so, we need to wait for the folder to load, so we need
-                    //  to back out things a little...
-                    this._log.debug("Pending on folder load; job goes back.");
-                    this._indexingJobCount--;
-                    this._indexQueue.unshift(job);
-                    this._curIndexingJob = null;
-                    return;
-                  }
-                  job.goal = this._indexingFolder.getTotalMessages(false);
-                }
-                // -- FOLDER DELETE
-                else {
-                  // nuke the folder id
-                  this._datastore.deleteFolderByID(job.id);
-                  // and we're done!
-                  job = this._curIndexingJob = null;
-                }
-              }
-              // messages
-              else {
-                // not much to do here; unlink the pending add job if that's him
-                if (job === this._pendingAddJob)
-                  this._pendingAddJob = null;
-                // update our goal from the items length
-                job.goal = job.items.length;
-              }
-            }
-            catch (ex) {
-              this._log.debug("Failed to start job (at " + ex.fileName + ":" +
-                ex.lineNumber + ") because: " + ex);
-              job = this._curIndexingJob = null;
-            }
-          }
-        }
-        // --- Do the job
-        else {
-          try {
-            if (job.jobType == "folder") {
-              // -- FOLDER ADD (steady state)
-              if (job.deltaType > 0) {
-                // this will throw a stopiteration exception when done, so
-                //  we don't need to clean up the job
-                this._indexMessage(this._indexingIterator.next());
-                job.offset++;
-              }
-              // there is no steady-state processing for folder deletion
-            }
-            else if (job.jobType == "message") {
-              let item = job.items[job.offset++];
-              // -- MESSAGE ADD (batch steady state)
-              if (job.deltaType > 0) {
-                // item is either [folder ID, message key] or
-                //                [folder ID, message ID]
+      if (this._batch === null)
+        this._batch = this.workBatch();
+      
+      // only kWorkAsync and kWorkDone are allowed out of our call to the batch.
+      // On kWorkDone, we want to schedule another timer to fire on us if we are
+      //  not done indexing.  (On kWorkAsync, we don't care what happens, because
+      //  someone else will be receiving the callback, and they will call us when
+      //  they are done doing their thing.
+      if (this._batch.next() == kWorkDone) {
+        this._batch.close();
+        this._batch = null;
 
-                // get in the folder
-                if (this._indexingFolderID != item[0]) {
-                  if (!this._indexerEnterFolder(item[0], false)) {
-                    // need to pend on the folder...
-                    job.offset--;
-                    return;
-                  }
-                }
-                let msgHdr;
-                if (typeof item[1] == "number")
-                  msgHdr = this._indexingFolder.GetMessageHeader(item[1]);
-                else
-                  // same deal as in move processing.
-                  // TODO fixme to not assume singular message-id's.
-                  msgHdr = this._indexingDatabase.getMsgHdrForMessageID(item[1]);
-                if (msgHdr)
-                  this._indexMessage(msgHdr);
-              }
-              // -- MESSAGE MOVE (batch steady state)
-              else if (job.deltaType == 0) {
-                // item must be [folder ID, header message-id]
-                
-                // get in the folder
-                if (this._indexingFolderID != item[0]) {
-                  if (!this._indexerEnterFolder(item[0], false)) {
-                    // need to pend on the folder
-                    job.offset--;
-                    return
-                  }
-                }
-                // process everyone with the message-id.  yeck.
-                // uh, except nsIMsgDatabase only thinks there should be one, so
-                //  let's pretend that this assumption is not a bad idea for now
-                // TODO: stop pretending this assumption is not a bad idea
-                let msgHdr = this._indexingDatabase.getMsgHdrForMessageID(item[1]);
-                if (msgHdr) {
-                  this._indexMessage(msgHdr);
-                }
-                else {
-                  this._log.info("Move unable to locate message with header " +
-                    "message-id " + item[1] + ". Folder is known to possess " +
-                    this._indexingFolder.getTotalMessages(false) +" messages.");
-                }
-                // remember to eat extra tokens... when we get more than one...
-              }
-              // -- MESSAGE DELETE (batch steady state)
-              else { // job.deltaType < 0
-                // item is either: a message id
-                //              or [folder ID, message key]
-                let message;
-                if (item instanceof Array) {
-                  if (!this._indexerEnterFolder(item[0], false)) {
-                    // need to pend on the folder
-                    job.offset--;
-                    return;
-                  }
-                  message = this_indexingFolder.GetMessageHeader(item[1]);
-                }
-                else {
-                  message = GlodaDatastore.getMessageByID(item);
-                }
-                // delete the message!
-                if (message !== null)
-                  this._deleteMessage(message);
-              }
-              
-              // we do need to kill the job when we hit the items.length
-              if (job.offset == job.items.length)
-                job = this._curIndexingJob = null;
-            }
-          }
-          catch (ex) {
-            this._log.debug("Bailing on job (at " + ex.fileName + ":" +
-                ex.lineNumber + ") because: " + ex);
-            this._indexerLeaveFolder(true);
-            job = this._curIndexingJob = null;
-          }
+        if (this.indexing)
+          this._domWindow.setTimeout(this._wrapCallbackDriver,
+                                     this._indexInterval,
+                                     this);
+      }
+    }
+    finally {    
+      this._inCallback = false;
+    }
+  },
+
+  /**
+   * The generator we are using to perform processing of the current job
+   *  (this._curIndexingJob).  It differs from this._batch which is a generator
+   *  that takes care of the batching logistics (namely managing database
+   *  transactions and keeping track of how much work can be done with the
+   *  current allocation of processing "tokens".
+   * The generator is created by _hireJobWorker from one of the _worker_*
+   *  generator methods.
+   */
+  _actualWorker: null,
+  /**
+   * The workBatch generator handles a single 'batch' of processing, managing
+   *  the database transaction and keeping track of "tokens".  It drives the
+   *  _actualWorker generator which is doing the work.
+   * workBatch will only produce kWorkAsync and kWorkDone notifications.
+   *  If _actualWorker returns kWorkSync and there are still tokens available,
+   *  workBatch will keep driving _actualWorker until it encounters a
+   *  kWorkAsync (which workBatch will yield to callbackDriver), or it runs
+   *  out of tokens and yields a kWorkDone. 
+   */
+  workBatch: function gloda_index_workBatch() {
+    GlodaDatastore._beginTransaction();
+
+    for (let tokensLeft=this._indexTokens; tokensLeft > 0; tokensLeft--) {
+      if ((this._actualWorker === null) && !this._hireJobWorker())
+        break;
+    
+      // XXX for performance, we may want to move the try outside the for loop
+      //  with a quasi-redundant outer loop that shunts control back inside
+      //  if we left the loop due to an exception (without consuming all the
+      //  tokens.)
+      try {
+        switch (this._actualWorker.next()) {
+          case kWorkSync:
+            break;
+          case kWorkAsync:
+            yield kWorkAsync;
+            break;
+          case kWorkDone:
+            this._actualWorker.close();
+            this._actualWorker = null;
+            break;
         }
-        
-        // perhaps status update
-        if (job !== null) {
-          if (job.offset % 50 == 1) {
-            let actionStr;
-            if (job.deltaType > 0)
-              actionStr = this._strBundle.getString("actionIndexing");
-            else if (job.deltaType == 0)
-              actionStr = this._strBundle.getString("actionMoving");
-            else
-              actionStr = this._strBundle.getString("actionDeindexing");
-            let prettyName;
-            if (this._indexingFolder !== null)
-              prettyName = this._indexingFolder.prettiestName;
-            else
-              prettyName =
-                this._strBundle.getString("messageIndexingExplanation");
-            this._notifyListeners(actionStr + ": " +
-                                  prettyName,
-                                  prettyName,
-                                  this._indexingJobCount-1, // count, not index
-                                  this._indexingJobGoal,
-                                  job.offset,
-                                  job.goal);
-          }
+      }
+      catch (ex) {
+        this._log.debug("Bailing on job (at " + ex.fileName + ":" +
+            ex.lineNumber + ") because: " + ex);
+        this._indexerLeaveFolder(true);
+        this._curIndexingJob = null;
+        if (this._actualWorker !== null) {
+          this._actualWorker.close();
+          this._actualWorker = null;
         }
       }
     }
-    finally {
-      if (this._pendingAsyncOps <= 0) {
-        this._pendingAsyncOpsCompleted();
+    GlodaDatastore._commitTransaction();
+    
+    // try and get a job if we don't have one for the sake of the notification
+    if (this.indexing && (this._actualWorker === null))
+      this._hireJobWorker();
+    this._notifyListeners();
+    
+    yield kWorkDone;
+  },
+
+  /**
+   * Perform the initialization step and return a generator if there is any
+   *  steady-state processing to be had.
+   */
+  _hireJobWorker: function gloda_index_hireJobWorker() {
+    if (this._indexQueue.length == 0) {
+      this._log.info("--- Done indexing, disabling timer renewal.");
+      
+      if (this._indexingFolder !== null) {
+        this._indexerLeaveFolder(true);
       }
+      
+      this._curIndexingJob = null;
+      this._indexingActive = false;
+      this._indexingJobCount = 0;
+      this._indexingJobGoal = 0;
+      return false;
     }
+
+    this._log.debug("Pulling job from queue of size " +
+                    this._indexQueue.length);
+    let job = this._curIndexingJob = this._indexQueue.shift();
+    this._indexingJobCount++;
+    this._log.debug("Pulled job: " + job.jobType + ", " +
+                    job.deltaType + ", " + job.id);
+    let generator = null;
+    
+    if (job.jobType == "folder") {
+      if (job.deltaType > 0)
+        this._actualWorker = this._worker_folderAdd(job);
+      else if (job.deltaType < 0)
+        this._actualWorker = this._worker_folderDelete(job);
+    }
+    else if(job.jobType == "message") {
+      if (job === this._pendingAddJob)
+                  this._pendingAddJob = null;
+      // update our goal from the items length
+      job.goal = job.items.length;
+                  
+      if (job.deltaType > 0)
+        this._actualWorker = this._worker_messageAdd(job);
+      else if (job.deltaType == 0)
+        this._actualWorker = this._worker_messageMove(job);
+      else
+        this._actualWorker = this._worker_messageDelete(job);
+    }
+    
+    return true;
   },
 
-  _pendingAsyncOpsCompleted: function gloda_index_pendingAsyncOpsCompleted() {
-    GlodaDatastore._commitTransaction();
+  _worker_folderAdd: function (aJob) {
+    yield this._indexerEnterFolder(aJob.id, true);
+    aJob.goal = this._indexingFolder.getTotalMessages(false);
+    
+    for (let msgHdr in this._indexingIterator) {
+      aJob.offset++;
+      yield this._indexMessage(msgHdr);
+    }
+    
+    yield kWorkDone;
+  },
+  
+  _worker_folderDelete: function (aJob) {
+    // nuke the folder id
+    this._datastore.deleteFolderByID(aJob.id);
+    yield kWorkDone;
+  },
+  
+  _worker_messageAdd: function (aJob) {
+    for (; aJob.offset < aJob.items.length; aJob.offset++) {
+      let item = aJob.items[aJob.offset];
+      // item is either [folder ID, message key] or
+      //                [folder ID, message ID]
+
+      // get in the folder
+      if (this._indexingFolderID != item[0])
+        yield this._indexerEnterFolder(item[0], false);
+
+      let msgHdr;
+      if (typeof item[1] == "number")
+        msgHdr = this._indexingFolder.GetMessageHeader(item[1]);
+      else
+        // same deal as in move processing.
+        // TODO fixme to not assume singular message-id's.
+        msgHdr = this._indexingDatabase.getMsgHdrForMessageID(item[1]);
+      
+      if (msgHdr)
+        yield this._indexMessage(msgHdr);
+      else
+        yield kWorkSync;
+    }
+    yield kWorkDone;
+  },
   
-    if (this.indexing && (this._pendingFolderEntry === null))
-      this._domWindow.setTimeout(this._wrapIncrementalIndex, this._indexInterval,
-                              this);
+  _worker_messageMove: function (aJob) {
+    for (; aJob.offset < aJob.items.length; aJob.offset++) {
+      let item = aJob.items[aJob.offset];
+      // item must be [folder ID, header message-id]
+      
+      if (this._indexingFolderID != item[0])
+        yield this._indexerEnterFolder(item[0], false);
+      
+      // process everyone with the message-id.  yeck.
+      // uh, except nsIMsgDatabase only thinks there should be one, so
+      //  let's pretend that this assumption is not a bad idea for now
+      // TODO: stop pretending this assumption is not a bad idea
+      let msgHdr = this._indexingDatabase.getMsgHdrForMessageID(item[1]);
+      if (msgHdr)
+        yield this._indexMessage(msgHdr);
+      else {
+        this._log.info("Move unable to locate message with header " +
+          "message-id " + item[1] + ". Folder is known to possess " +
+          this._indexingFolder.getTotalMessages(false) +" messages.");
+        yield kWorkSync;
+      }
+    }
+    
+    yield kWorkDone;
+  },
+  
+  _worker_messageDelete: function (aJob) {
+    for (; aJob.offset < aJob.items.length; aJob.offset++) {
+      let item = aJob.items[aJob.offset];
+
+      // item is either: a message id
+      //              or [folder ID, message key]
+      let message;
+      if (item instanceof Array) {
+        if (this._indexingFolderID != item[0])
+          yield this._indexerEnterFolder(item[0], false);
+        message = this_indexingFolder.GetMessageHeader(item[1]);
+      }
+      else
+        message = GlodaDatastore.getMessageByID(item);
+
+      // delete the message!
+      if (message !== null)
+        this._deleteMessage(message);
+      yield kWorkSync;
+    }
+    yield kWorkDone;
   },
 
   indexEverything: function glodaIndexEverything() {
     this._log.info("Queueing all accounts for indexing.");
     let msgAccountManager = Cc["@mozilla.org/messenger/account-manager;1"].
                             getService(Ci.nsIMsgAccountManager);
     
     GlodaDatastore._beginTransaction();
@@ -861,31 +948,41 @@ let GlodaIndexer = {
      *  you really need to duplicate all those messages?) but is easily dealt
      *  with by queueing the destination folder for initial indexing. 
      */
     folderMoveCopyCompleted: function gloda_indexer_folderMoveCopyCompleted(
                                aMove, aSrcFolder, aDestFolder) {
       this.indexer._log.debug("folderMoveCopy notification (Move: " + aMove
                               + ")");
       if (aMove) {
-        return this.folderRenamed(aSrcFolder, aDestFolder);
+        // TODO handle nested folder ramifications (we don't receive events
+        //  for these, so nested folders will become wrong.)
+        let srcURI = aSrcFolder.URI;
+        let targetURI = aDestFolder.URI +
+                        srcURI.substring(srcURI.lastIndexOf("/"));
+        this.indexer._log.debug("renaming " + srcURI + " to " + targetURI);
+        GlodaDatastore.renameFolder(srcURI, targetURI);
+        return;
       }
       this.indexer._indexingFolderGoal++;
       this.indexer._indexQueue.push(["folder", 1,
         this._mapFolderURI(aDestFolder.URI)]);
       this.indexer.indexing = true;
     },
     
     /**
      * We just need to update the URI <-> ID maps and the row in the database,
      *  all of which is actually done by the datastore for us.
      */
     folderRenamed: function gloda_indexer_folderRenamed(aOrigFolder,
                                                         aNewFolder) {
-      this.indexer._log.debug("folderRenamed notification");
+      // TODO handle nested folder ramifications (we don't receive events for
+      //  these, so nested folders will become wrong)
+      this.indexer._log.debug("folderRenamed notification: " + aOrigFolder.URI +
+                              " to " + aNewFolder.URI);
       GlodaDatastore.renameFolder(aOrigFolder.URI, aNewFolder.URI);
     },
     
     itemEvent: function gloda_indexer_itemEvent(aItem, aEvent, aData) {
       // nop.  this is an expansion method on the part of the interface and has
       //  no known events that we need to handle.
     },
   },
@@ -965,23 +1062,25 @@ let GlodaIndexer = {
    */
   _databaseAnnouncerListener: {
     indexer: null,
     onAnnouncerGoingAway: function gloda_indexer_dbGoingAway(
                                          aDBChangeAnnouncer) {
       this.indexer._indexerLeaveFolder(false);
     },
     
-    onHdrChange: function(aHdrChanged, aOldFlags, aNewFlags, aInstigator) {},
+    onHdrFlagsChanged: function(aHdrChanged, aOldFlags, aNewFlags, aInstigator) {},
     onHdrDeleted: function(aHdrChanged, aParentKey, aFlags, aInstigator) {},
     onHdrAdded: function(aHdrChanged, aParentKey, aFlags, aInstigator) {},
     onParentChanged: function(aKeyChanged, aOldParent, aNewParent, 
                               aInstigator) {},
     onReadChanged: function(aInstigator) {},
-    onJunkScoreChanged: function(aInstigator) {}
+    onJunkScoreChanged: function(aInstigator) {},
+    onHdrPropertyChanged: function (aHdrToChange, aPreChange, aStatus,
+                                    aInstigator) {},
   },
   
   /* ***** MailNews Shutdown ***** */
   // TODO: implement a shutdown/pre-shutdown listener that attempts to either
   //  drain the indexing queue or persist it.
   /**
    * Shutdown task.
    *
@@ -1030,32 +1129,24 @@ let GlodaIndexer = {
    *  for us, and we don't actually need to do any extra work.  Hooray!
    */
   _extractOriginalSubject: function glodaIndexExtractOriginalSubject(aMsgHdr) {
     return aMsgHdr.mime2DecodedSubject;
   },
   
   _indexMessage: function gloda_indexMessage(aMsgHdr) {
     MsgHdrToMimeMessage(aMsgHdr, this, this._indexMessageWithBody);
-    this._pendingAsyncOps++;
+    return kWorkAsync;
   },
   
   _indexMessageWithBody: function gloda_index_indexMessageWithBody(
        aMsgHdr, aMimeMsg) {
     this._log.debug("*** Indexing message: " + aMsgHdr.messageKey + " : " +
                     aMsgHdr.subject);
 
-    /* for now, let's be okay if there's no mime; but the plugins will be sad :(
-    if (aMimeMsg === null) {
-      if (--this._pendingAsyncOps == 0)
-       this._pendingAsyncOpsCompleted();
-      return;
-    } 
-    */
-    
     // -- Find/create the conversation the message belongs to.
     // Our invariant is that all messages that exist in the database belong to
     //  a conversation.
     
     // - See if any of the ancestors exist and have a conversationID...
     // (references are ordered from old [0] to new [n-1])
     let references = [aMsgHdr.getStringReference(i) for each
                       (i in range(0, aMsgHdr.numReferences))];
@@ -1175,18 +1266,17 @@ let GlodaIndexer = {
      else {
         curMsg._folderID = this._datastore._mapFolderURI(aMsgHdr.folder.URI);
         curMsg._messageKey = aMsgHdr.messageKey;
         this._datastore.updateMessage(curMsg);
      }
      
      Gloda.processMessage(curMsg, aMsgHdr, aMimeMsg);
      
-     if (--this._pendingAsyncOps == 0)
-       this._pendingAsyncOpsCompleted();
+     this.callbackDriver();
   },
   
   /**
    * Wipe a message out of existence from our index.  This is slightly more
    *  tricky than one would first expect because there are potentially
    *  attributes not immediately associated with this message that reference
    *  the message.  Not only that, but deletion of messages may leave a
    *  conversation posessing only ghost messages, which we don't want, so we