Bug 1043863 - OpenedConnection.prototype.executeBeforeShutdown. r=mak
authorDavid Rajchenbach-Teller <dteller@mozilla.com>
Tue, 05 May 2015 12:45:25 +0200
changeset 276902 668d0e7d36e7c063caa874639d46e1dd01864aaa
parent 276901 ffde3b92c081b62593fb28350144491273e40a7b
child 276903 0687b6c670bcbcce2cd28bdeb0e8c208c4ae4f0b
push id4932
push userjlund@mozilla.com
push dateMon, 10 Aug 2015 18:23:06 +0000
treeherdermozilla-beta@6dd5a4f5f745 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersmak
bugs1043863
milestone41.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1043863 - OpenedConnection.prototype.executeBeforeShutdown. r=mak
toolkit/modules/Sqlite.jsm
--- a/toolkit/modules/Sqlite.jsm
+++ b/toolkit/modules/Sqlite.jsm
@@ -214,16 +214,19 @@ function ConnectionData(connection, iden
 
   // A map from statement index to mozIStoragePendingStatement, to allow for
   // canceling prior to finalizing the mozIStorageStatements.
   this._pendingStatements = new Map();
 
   // Increments for each executed statement for the life of the connection.
   this._statementCounter = 0;
 
+  // Increments whenever we request a unique operation id.
+  this._operationsCounter = 0;
+
   this._hasInProgressTransaction = false;
   // Manages a chain of transactions promises, so that new transactions
   // always happen in queue to the previous ones.  It never rejects.
   this._transactionQueue = Promise.resolve();
 
   this._idleShrinkMS = options.shrinkMemoryOnConnectionIdleMS;
   if (this._idleShrinkMS) {
     this._idleShrinkTimer = Cc["@mozilla.org/timer;1"]
@@ -232,16 +235,20 @@ function ConnectionData(connection, iden
     // shrinking now would not do anything.
   }
 
   // Deferred whose promise is resolved when the connection closing procedure
   // is complete.
   this._deferredClose = PromiseUtils.defer();
   this._closeRequested = false;
 
+  // An AsyncShutdown barrier used to make sure that we wait until clients
+  // are done before shutting down the connection.
+  this._barrier = new AsyncShutdown.Barrier(`${this._identifier}: waiting for clients`);
+
   Barriers.connections.client.addBlocker(
     this._identifier + ": waiting for shutdown",
     this._deferredClose.promise,
     () =>  ({
       identifier: this._identifier,
       isCloseRequested: this._closeRequested,
       hasDbConn: !!this._dbConn,
       hasInProgressTransaction: this._hasInProgressTransaction,
@@ -259,57 +266,157 @@ function ConnectionData(connection, iden
  * connections on garbage collection.
  *
  * Key: _identifier of ConnectionData
  * Value: ConnectionData object
  */
 ConnectionData.byId = new Map();
 
 ConnectionData.prototype = Object.freeze({
+  /**
+   * Run a task, ensuring that its execution will not be interrupted by shutdown.
+   *
+   * As the operations of this module are asynchronous, a sequence of operations,
+   * or even an individual operation, can still be pending when the process shuts
+   * down. If any of this operations is a write, this can cause data loss, simply
+   * because the write has not been completed (or even started) by shutdown.
+   *
+   * To avoid this risk, clients are encouraged to use `executeBeforeShutdown` for
+   * any write operation, as follows:
+   *
+   * myConnection.executeBeforeShutdown("Bookmarks: Removing a bookmark",
+   *   Task.async(function*(db) {
+   *     // The connection will not be closed and shutdown will not proceed
+   *     // until this task has completed.
+   *
+   *     // `db` exposes the same API as `myConnection` but provides additional
+   *     // logging support to help debug hard-to-catch shutdown timeouts.
+   *
+   *     yield db.execute(...);
+   * }));
+   *
+   * @param {string} name A human-readable name for the ongoing operation, used
+   *  for logging and debugging purposes.
+   * @param {function(db)} task A function that takes as argument a Sqlite.jsm
+   *  db and returns a Promise.
+   */
+  executeBeforeShutdown: function(parent, name, task) {
+    if (!name) {
+      throw new TypeError("Expected a human-readable name as first argument");
+    }
+    if (typeof task != "function") {
+      throw new TypeError("Expected a function as second argument");
+    }
+    if (this._closeRequested) {
+      throw new Error(`${this._identifier}: cannot execute operation ${name}, the connection is already closing`);
+    }
+
+    // Status, used for AsyncShutdown crash reports.
+    let status = {
+      // The latest command started by `task`, either as a
+      // sql string, or as one of "<not started>" or "<closing>".
+      command: "<not started>",
+
+      // `true` if `command` was started but not completed yet.
+      isPending: false,
+    };
+
+    // An object with the same API as `this` but with
+    // additional logging. To keep logging simple, we
+    // assume that `task` is not running several queries
+    // concurrently.
+    let loggedDb = Object.create(parent, {
+      execute: {
+        value: Task.async(function*(sql, ...rest) {
+          status.isPending = true;
+          status.command = sql;
+          try {
+            return (yield this.execute(sql, ...rest));
+          } finally {
+            status.isPending = false;
+          }
+        }.bind(this))
+      },
+      close: {
+        value: Task.async(function*() {
+          status.isPending = false;
+          status.command = "<close>";
+          try {
+            return (yield this.close());
+          } finally {
+            status.isPending = false;
+          }
+        }.bind(this))
+      },
+      executeCached: {
+        value: Task.async(function*(sql, ...rest) {
+          status.isPending = false;
+          status.command = sql;
+          try {
+            return (yield this.executeCached(sql, ...rest));
+          } finally {
+            status.isPending = false;
+          }
+        }.bind(this))
+      },
+    });
+
+    let promiseResult = task(loggedDb);
+    if (!promiseResult || typeof promiseResult != "object" || !("then" in promiseResult)) {
+      throw new TypeError("Expected a Promise");
+    }
+    let key = `${this._identifier}: ${name} (${this._getOperationId()})`;
+    let promiseComplete = promiseResult.catch(() => {});
+    this._barrier.client.addBlocker(key, promiseComplete, {
+      fetchState: () => status
+    });
+
+    return Task.spawn(function*() {
+      try {
+        return (yield promiseResult);
+      } finally {
+        this._barrier.client.removeBlocker(key, promiseComplete)
+      }
+    }.bind(this));
+  },
   close: function () {
     this._closeRequested = true;
 
     if (!this._dbConn) {
       return this._deferredClose.promise;
     }
 
     this._log.debug("Request to close connection.");
     this._clearIdleShrinkTimer();
 
-    // We need to take extra care with transactions during shutdown.
-    //
-    // If we don't have a transaction in progress, we can proceed with shutdown
-    // immediately.
-    if (!this._hasInProgressTransaction) {
+    return this._barrier.wait().then(() => {
+      if (!this._dbConn) {
+        return;
+      }
       return this._finalize();
-    }
-
-    // If instead we do have a transaction in progress, it might be rollback-ed
-    // automaticall by closing the connection.  Regardless, we wait for its
-    // completion, next enqueued transactions will be rejected.
-    this._log.warn("Transaction in progress at time of close. Rolling back.");
-
-    return this._transactionQueue.then(() => this._finalize());
+    });
   },
 
   clone: function (readOnly=false) {
     this.ensureOpen();
 
     this._log.debug("Request to clone connection.");
 
     let options = {
       connection: this._dbConn,
       readOnly: readOnly,
     };
     if (this._idleShrinkMS)
       options.shrinkMemoryOnConnectionIdleMS = this._idleShrinkMS;
 
     return cloneStorageConnection(options);
   },
-
+  _getOperationId: function() {
+    return this._operationsCounter++;
+  },
   _finalize: function () {
     this._log.debug("Finalizing connection.");
     // Cancel any pending statements.
     for (let [k, statement] of this._pendingStatements) {
       statement.cancel();
     }
     this._pendingStatements.clear();
 
@@ -423,16 +530,19 @@ ConnectionData.prototype = Object.freeze
     });
   },
 
   get transactionInProgress() {
     return this._open && this._hasInProgressTransaction;
   },
 
   executeTransaction: function (func, type) {
+    if (typeof type == "undefined") {
+      throw new Error("Internal error: expected a type");
+    }
     this.ensureOpen();
 
     this._log.debug("Beginning transaction");
 
     let promise = this._transactionQueue.then(() => {
       if (this._closeRequested) {
         throw new Error("Transaction canceled due to a closed connection.");
       }
@@ -524,16 +634,20 @@ ConnectionData.prototype = Object.freeze
         setTimeout(() => reject(new Error("Transaction timeout, most likely caused by unresolved pending work.")),
                    TRANSACTIONS_QUEUE_TIMEOUT_MS);
       });
       return Promise.race([transactionPromise, timeoutPromise]);
     });
     // Atomically update the queue before anyone else has a chance to enqueue
     // further transactions.
     this._transactionQueue = promise.catch(ex => { console.error(ex) });
+
+    // Make sure that we do not shutdown the connection during a transaction.
+    this._barrier.client.addBlocker(`Transaction (${this._getOperationId()})`,
+      this._transactionQueue);
     return promise;
   },
 
   shrinkMemory: function () {
     this._log.info("Shrinking memory usage.");
     let onShrunk = this._clearIdleShrinkTimer.bind(this);
     return this.execute("PRAGMA shrink_memory").then(onShrunk, onShrunk);
   },
@@ -796,18 +910,18 @@ function openConnection(options) {
     let dbOptions = null;
     if (!sharedMemoryCache) {
       dbOptions = Cc["@mozilla.org/hash-property-bag;1"].
         createInstance(Ci.nsIWritablePropertyBag);
       dbOptions.setProperty("shared", false);
     }
     Services.storage.openAsyncDatabase(file, dbOptions, (status, connection) => {
       if (!connection) {
-        log.warn("Could not open connection: " + status);
-        reject(new Error("Could not open connection: " + status));
+        log.warn(`Could not open connection to ${path}: ${status}`);
+        reject(new Error(`Could not open connection to ${path}: ${status}`));
         return;
       }
       log.info("Connection opened");
       try {
         resolve(
           new OpenedConnection(connection.QueryInterface(Ci.mozIStorageAsyncConnection),
                               identifier, openedOptions));
       } catch (ex) {
@@ -1090,16 +1204,20 @@ OpenedConnection.prototype = Object.free
    *        the original connection.
    *
    * @return Promise<OpenedConnection>
    */
   clone: function (readOnly=false) {
     return this._connectionData.clone(readOnly);
   },
 
+  executeBeforeShutdown: function(name, task) {
+    return this._connectionData.executeBeforeShutdown(this, name, task);
+  },
+
   /**
    * Execute a SQL statement and cache the underlying statement object.
    *
    * This function executes a SQL statement and also caches the underlying
    * derived statement object so subsequent executions are faster and use
    * less resources.
    *
    * This function optionally binds parameters to the statement as well as