Bug 454740 - Asynchronous storage should batch/chunk results
authorShawn Wilsher <sdwilsh@shawnwilsher.com>
Wed, 29 Oct 2008 13:13:32 -0400
changeset 21054 d0bfac57c9edd5973adf453b9c7638420df273b4
parent 21053 a62df7b5b9bb08433501f3651627e32d240b48f5
child 21055 8feeaada71ebae163b664c2ae7729619c13afe1a
push id1
push userroot
push dateTue, 26 Apr 2011 22:38:44 +0000
treeherdermozilla-beta@bfdb6e623a36 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
bugs454740
milestone1.9.1b2pre
Bug 454740 - Asynchronous storage should batch/chunk results This changeset batches results obtained by the async storage API so we are not flooding the calling thread with so many events. r=asuth
storage/src/mozStorageEvents.cpp
storage/src/mozStorageResultSet.h
--- a/storage/src/mozStorageEvents.cpp
+++ b/storage/src/mozStorageEvents.cpp
@@ -36,28 +36,43 @@
  * the terms of any one of the MPL, the GPL or the LGPL.
  *
  * ***** END LICENSE BLOCK ***** */
 
 #include "nsThreadUtils.h"
 #include "nsAutoPtr.h"
 #include "nsAutoLock.h"
 #include "nsCOMArray.h"
+#include "prtime.h"
 
 #include "sqlite3.h"
 
 #include "mozIStorageStatementCallback.h"
 #include "mozIStoragePendingStatement.h"
 #include "mozStorageHelper.h"
 #include "mozStorageResultSet.h"
 #include "mozStorageRow.h"
 #include "mozStorageConnection.h"
 #include "mozStorageError.h"
 #include "mozStorageEvents.h"
 
+/**
+ * The following constants help batch rows into result sets.
+ * MAX_MILLISECONDS_BETWEEN_RESULTS was chosen because any user-based task that
+ * takes less than 200 milliseconds is considered to feel instantaneous to end
+ * users.  MAX_ROWS_PER_RESULT was arbitrarily chosen to reduce the number of
+ * dispatches to calling thread, while also providing reasonably-sized sets of
+ * data for consumers.  Both of these constants are used because we assume that
+ * consumers are trying to avoid blocking their execution thread for long
+ * periods of time, and dispatching many small events to the calling thread will
+ * end up blocking it.
+ */
+#define MAX_MILLISECONDS_BETWEEN_RESULTS 100
+#define MAX_ROWS_PER_RESULT 15
+
 ////////////////////////////////////////////////////////////////////////////////
 //// Asynchronous Statement Execution
 
 /**
  * Enum used to describe the state of execution.
  */
 enum ExecutionState {
     PENDING = -1
@@ -89,16 +104,18 @@ public:
       mCallback(aCallback)
     , mResults(aResults)
     , mEventStatus(aEventStatus)
   {
   }
 
   NS_IMETHOD Run()
   {
+    NS_ASSERTION(mCallback, "Trying to notify about results without a callback!");
+
     if (mEventStatus->runEvent())
       (void)mCallback->HandleResult(mResults);
 
     return NS_OK;
   }
 
 private:
   CallbackResultNotifier() { }
@@ -208,16 +225,18 @@ public:
    */
   AsyncExecute(nsTArray<sqlite3_stmt *> &aStatements,
                mozIStorageConnection *aConnection,
                mozIStorageStatementCallback *aCallback) :
       mConnection(aConnection)
     , mTransactionManager(nsnull)
     , mCallback(aCallback)
     , mCallingThread(do_GetCurrentThread())
+    , mMaxIntervalWait(PR_MicrosecondsToInterval(MAX_MILLISECONDS_BETWEEN_RESULTS))
+    , mIntervalStart(PR_IntervalNow())
     , mState(PENDING)
     , mCancelRequested(PR_FALSE)
     , mLock(nsAutoLock::NewLock("AsyncExecute::mLock"))
   {
     (void)mStatements.SwapElements(aStatements);
     NS_ASSERTION(mStatements.Length(), "We weren't given any statements!");
   }
 
@@ -245,115 +264,26 @@ public:
     // statements doesn't make a whole lot of sense.
     if (mStatements.Length() > 1) {
       // We don't error if this failed because it's not terrible if it does.
       mTransactionManager = new mozStorageTransaction(mConnection, PR_FALSE,
                                                       mozIStorageConnection::TRANSACTION_IMMEDIATE);
     }
 
     // Execute each statement, giving the callback results if it returns any.
-    nsresult rv = NS_OK;
     for (PRUint32 i = 0; i < mStatements.Length(); i++) {
-      // We need to hold a lock for statement execution so we can properly
-      // reflect state in case we are canceled.  We unlock in a few areas in
-      // order to allow for cancelation to occur.
-      nsAutoLock mutex(mLock);
-
-      while (PR_TRUE) {
-        int rc = sqlite3_step(mStatements[i]);
-        // Break out if we have no more results
-        if (rc == SQLITE_DONE)
-          break;
-
-        // Some errors are not fatal, and we can handle them and continue.
-        if (rc != SQLITE_OK && rc != SQLITE_ROW) {
-          if (rc == SQLITE_BUSY) {
-            // We do not want to hold our lock while we yield.
-            nsAutoUnlock cancelationScope(mLock);
-
-            // Yield, and try again
-            PR_Sleep(PR_INTERVAL_NO_WAIT);
-            continue;
-          }
-
-          // Set error state
-          mState = ERROR;
-
-          // No longer need to hold our mutex
-          mutex.unlock();
-
-          // Notify
-          sqlite3 *db = sqlite3_db_handle(mStatements[i]);
-          (void)NotifyError(rc, sqlite3_errmsg(db));
-
-          // And complete
-          return NotifyComplete();
-        }
-
-        // If we do not have a callback, there's no point in executing this
-        // statement anymore.
-        if (!mCallback)
-          break;
-
-        // If we have been canceled, there is no point in going on...
-        if (mCancelRequested) {
-          mState = CANCELED;
-          mutex.unlock();
-          return NotifyComplete();
-        }
+      PRBool finished = (i == (mStatements.Length() - 1));
+      if (!ExecuteAndProcessStatement(mStatements[i], finished))
+        break;
+    }
 
-        // For the rest of this loop, it is safe to not hold the lock and allow
-        // for cancelation.  We may add an event to the calling thread, but that
-        // thread will not end up running when it checks back with us to see if
-        // it should run.
-        nsAutoUnlock cancelationScope(mLock);
-
-        // Build result object
-        // XXX bug 454740 chunk these results better
-        nsRefPtr<mozStorageResultSet> results(new mozStorageResultSet());
-        if (!results) {
-          rv = NS_ERROR_OUT_OF_MEMORY;
-          break;
-        }
-
-        nsRefPtr<mozStorageRow> row(new mozStorageRow());
-        if (!row) {
-          rv = NS_ERROR_OUT_OF_MEMORY;
-          break;
-        }
-
-        rv = row->initialize(mStatements[i]);
-        if (NS_FAILED(rv))
-          break;
-
-        rv = results->add(row);
-        if (NS_FAILED(rv))
-          break;
-
-        // Notify caller
-        (void)NotifyResults(results);
-      }
-
-      // If we have an error that we have not already notified about, set our
-      // state accordingly, and notify.
-      if (NS_FAILED(rv)) {
-        mState = ERROR;
-
-        // We no longer need to hold our mutex
-        mutex.unlock();
-        (void)NotifyError(mozIStorageError::ERROR, "");
-        break;
-      }
-
-      // If we are done, we need to set our state accordingly while we still
-      // hold our lock.  We would have already dropped out of the loop if we
-      // were canceled or had an error at this point.
-      if (i == (mStatements.Length() - 1))
-        mState = COMPLETED;
-    }
+    // If we still have results that we haven't notified about, take care of
+    // them now.
+    if (mResultSet)
+      (void)NotifyResults();
 
     // Notify about completion
     return NotifyComplete();
   }
 
   NS_IMETHOD Cancel(PRBool *_successful)
   {
 #ifdef DEBUG
@@ -399,25 +329,171 @@ public:
 
     // We do not need to acquire mLock here because it can only ever be written
     // to on the calling thread, and the only thread that can call us is the
     // calling thread, so we know that our access is serialized.
     return !mCancelRequested;
   }
 
 private:
-  AsyncExecute() { }
+  AsyncExecute() : mMaxIntervalWait(0) { }
 
   ~AsyncExecute()
   {
     nsAutoLock::DestroyLock(mLock);
   }
 
   /**
+   * Executes a given statement until completion, an error occurs, or we are
+   * canceled.  If aFinished is true, we know that we are the last statement,
+   * and should set mState accordingly.
+   *
+   * @pre mLock is not held
+   *
+   * @param aStatement
+   *        The statement to execute and then process.
+   * @param aFinished
+   *        Indicates if this is the last statement or not.  If it is, we have
+   *        to set the proper state.
+   * @returns true if we should continue to process statements, false otherwise.
+   */
+  PRBool ExecuteAndProcessStatement(sqlite3_stmt *aStatement, PRBool aFinished)
+  {
+    // We need to hold a lock for statement execution so we can properly
+    // reflect state in case we are canceled.  We unlock in a few areas in
+    // order to allow for cancelation to occur.
+    nsAutoLock mutex(mLock);
+
+    nsresult rv = NS_OK;
+    while (PR_TRUE) {
+      int rc = sqlite3_step(aStatement);
+      // Break out if we have no more results
+      if (rc == SQLITE_DONE)
+        break;
+
+      // Some errors are not fatal, and we can handle them and continue.
+      if (rc != SQLITE_OK && rc != SQLITE_ROW) {
+        if (rc == SQLITE_BUSY) {
+          // We do not want to hold our lock while we yield.
+          nsAutoUnlock cancelationScope(mLock);
+
+          // Yield, and try again
+          PR_Sleep(PR_INTERVAL_NO_WAIT);
+          continue;
+        }
+
+        // Set error state
+        mState = ERROR;
+
+        // Drop our mutex - NotifyError doesn't want it held
+        mutex.unlock();
+
+        // Notify
+        sqlite3 *db = sqlite3_db_handle(aStatement);
+        (void)NotifyError(rc, sqlite3_errmsg(db));
+
+        // And stop processing statements
+        return PR_FALSE;
+      }
+
+      // If we do not have a callback, there's no point in executing this
+      // statement anymore, but we wish to continue to execute statements.  We
+      // also need to update our state if we are finished, so break out of the
+      // while loop.
+      if (!mCallback)
+        break;
+
+      // If we have been canceled, there is no point in going on...
+      if (mCancelRequested) {
+        mState = CANCELED;
+        return PR_FALSE;
+      }
+
+      // Build our results and notify if it's time.
+      rv = BuildAndNotifyResults(aStatement);
+      if (NS_FAILED(rv))
+        break;
+    }
+
+    // If we have an error that we have not already notified about, set our
+    // state accordingly, and notify.
+    if (NS_FAILED(rv)) {
+      mState = ERROR;
+
+      // Drop our mutex - NotifyError doesn't want it held
+      mutex.unlock();
+
+      // Notify, and stop processing statements.
+      (void)NotifyError(mozIStorageError::ERROR, "");
+      return PR_FALSE;
+    }
+
+    // If we are done, we need to set our state accordingly while we still
+    // hold our lock.  We would have already returned if we were canceled or had
+    // an error at this point.
+    if (aFinished)
+      mState = COMPLETED;
+
+    return PR_TRUE;
+  }
+
+  /**
+   * Builds a result set up with a row from a given statement.  If we meet the
+   * right criteria, go ahead and notify about this results too.
+   *
+   * @pre mLock is held
+   *
+   * @param aStatement
+   *        The statement to get the row data from.
+   */
+  nsresult BuildAndNotifyResults(sqlite3_stmt *aStatement)
+  {
+    NS_ASSERTION(mCallback, "Trying to dispatch results without a callback!");
+
+    // At this point, it is safe to not hold the lock and allow for cancelation.
+    // We may add an event to the calling thread, but that thread will not end
+    // up running when it checks back with us to see if it should run.
+    nsAutoUnlock cancelationScope(mLock);
+
+    // Build result object if we need it.
+    if (!mResultSet)
+      mResultSet = new mozStorageResultSet();
+    NS_ENSURE_TRUE(mResultSet, NS_ERROR_OUT_OF_MEMORY);
+
+    nsRefPtr<mozStorageRow> row(new mozStorageRow());
+    NS_ENSURE_TRUE(row, NS_ERROR_OUT_OF_MEMORY);
+
+    nsresult rv = row->initialize(aStatement);
+    NS_ENSURE_SUCCESS(rv, rv);
+
+    rv = mResultSet->add(row);
+    NS_ENSURE_SUCCESS(rv, rv);
+
+    // If we have hit our maximum number of allowed results, or if we have hit
+    // the maximum amount of time we want to wait for results, notify the
+    // calling thread about it.
+    PRIntervalTime now = PR_IntervalNow();
+    PRIntervalTime delta = now - mIntervalStart;
+    if (mResultSet->rows() >= MAX_ROWS_PER_RESULT || delta > mMaxIntervalWait) {
+      // Notify the caller
+      rv = NotifyResults();
+      if (NS_FAILED(rv))
+        return NS_OK; // we'll try again with the next result
+
+      // Reset our start time
+      mIntervalStart = now;
+    }
+
+    return NS_OK;
+  }
+
+  /**
    * Notifies callback about completion, and does any necessary cleanup.
+   *
+   * @pre mLock is not held
    */
   nsresult NotifyComplete()
   {
     NS_ASSERTION(mState != PENDING,
                  "Still in a pending state when calling Complete!");
 
     // Handle our transaction, if we have one
     if (mTransactionManager) {
@@ -455,16 +531,18 @@ private:
     }
 
     return NS_OK;
   }
 
   /**
    * Notifies callback about an error.
    *
+   * @pre mLock is not held
+   *
    * @param aErrorCode
    *        The error code defined in mozIStorageError for the error.
    * @param aMessage
    *        The error string, if any.
    */
   nsresult NotifyError(PRInt32 aErrorCode, const char *aMessage)
   {
     if (!mCallback)
@@ -479,35 +557,49 @@ private:
     NS_ENSURE_TRUE(notifier, NS_ERROR_OUT_OF_MEMORY);
 
     return mCallingThread->Dispatch(notifier, NS_DISPATCH_NORMAL);
   }
 
   /**
    * Notifies the callback about a result set.
    *
-   * @param aResultSet
-   *        The mozIStorageResultSet to notify the callback about.
+   * @pre mLock is not held
    */
-  nsresult NotifyResults(mozStorageResultSet *aResultSet)
+  nsresult NotifyResults()
   {
     NS_ASSERTION(mCallback, "NotifyResults called without a callback!");
 
     nsRefPtr<CallbackResultNotifier> notifier =
-      new CallbackResultNotifier(mCallback, aResultSet, this);
+      new CallbackResultNotifier(mCallback, mResultSet, this);
     NS_ENSURE_TRUE(notifier, NS_ERROR_OUT_OF_MEMORY);
 
-    return mCallingThread->Dispatch(notifier, NS_DISPATCH_NORMAL);
+    nsresult rv = mCallingThread->Dispatch(notifier, NS_DISPATCH_NORMAL);
+    if (NS_SUCCEEDED(rv))
+      mResultSet = nsnull; // we no longer own it on success
+    return rv;
   };
 
   nsTArray<sqlite3_stmt *> mStatements;
   mozIStorageConnection *mConnection;
   mozStorageTransaction *mTransactionManager;
   mozIStorageStatementCallback *mCallback;
   nsCOMPtr<nsIThread> mCallingThread;
+  nsRefPtr<mozStorageResultSet> mResultSet;
+
+  /**
+   * The maximum amount of time we want to wait between results.  Defined by
+   * MAX_MILLISECONDS_BETWEEN_RESULTS and set at construction.
+   */
+  const PRIntervalTime mMaxIntervalWait;
+
+  /**
+   * The start time since our last set of results.
+   */
+  PRIntervalTime mIntervalStart;
 
   /**
    * Indicates the state the object is currently in.
    */
   ExecutionState mState;
 
   /**
    * Indicates if we should try to cancel at a cancelation point or not.
--- a/storage/src/mozStorageResultSet.h
+++ b/storage/src/mozStorageResultSet.h
@@ -53,16 +53,21 @@ public:
   mozStorageResultSet();
   ~mozStorageResultSet();
 
   /**
    * Adds a tuple to this result set.
    */
   nsresult add(mozIStorageRow *aTuple);
 
+  /**
+   * @returns the number of rows this result set holds.
+   */
+  PRInt32 rows() const { return mData.Count(); }
+
 private:
   /**
    * Stores the current index of the active result set.
    */
   PRInt32 mCurrentIndex;
   /**
    * Stores the tuples.
    */