Bug 985655 - Split AsyncShutdown's mechanism into a lightweight Barrier (that doesn't spin the event loop and doesn't cause crashes) and a heavyweight Spinner (that does). Also, exposing Barrier so as to let services expose lightweight shutdown dependencies without having to spin the event loop. r=froydnj, sr=bsmedberg
authorDavid Rajchenbach-Teller <dteller@mozilla.com>
Mon, 12 May 2014 12:44:00 -0400
changeset 182884 01bc471d7a7f35e19b147c387dfabf1d0c4eb909
parent 182883 fa62732307ee1d4c2756db477ff897c28eb25393
child 182885 a2405cd005f696cb98e7e991fcb6fb3e2f484c8a
push id6769
push userryanvm@gmail.com
push dateTue, 13 May 2014 13:28:53 +0000
treeherderfx-team@7f23305ed404 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersfroydnj, bsmedberg
bugs985655
milestone32.0a1
Bug 985655 - Split AsyncShutdown's mechanism into a lightweight Barrier (that doesn't spin the event loop and doesn't cause crashes) and a heavyweight Spinner (that does). Also, exposing Barrier so as to let services expose lightweight shutdown dependencies without having to spin the event loop. r=froydnj, sr=bsmedberg
toolkit/modules/AsyncShutdown.jsm
toolkit/modules/tests/xpcshell/test_AsyncShutdown.js
--- a/toolkit/modules/AsyncShutdown.jsm
+++ b/toolkit/modules/AsyncShutdown.jsm
@@ -235,41 +235,65 @@ function getPhase(topic) {
      * });
      *
      * AsyncShutdown.profileBeforeChange.addBlocker("Module: trivial callback",
      *     function callback() {
      *       // ...
      *       // Execute this code during profileBeforeChange
      *       // No specific guarantee about completion of profileBeforeChange
      * });
-     *
      */
     addBlocker: function(name, condition, state = null) {
-      if (typeof name != "string") {
-        throw new TypeError("Expected a human-readable name as first argument");
+      spinner.addBlocker(name, condition, state);
+    },
+    /**
+     * Remove the blocker for a condition.
+     *
+     * If several blockers have been registered for the same
+     * condition, remove all these blockers. If no blocker has been
+     * registered for this condition, this is a noop.
+     *
+     * @return {boolean} true if a blocker has been removed, false
+     * otherwise. Note that a result of false may mean either that
+     * the blocker has never been installed or that the phase has
+     * completed and the blocker has already been resolved.
+     */
+    removeBlocker: function(condition) {
+      return spinner.removeBlocker(condition);
+    },
+    /**
+     * Trigger the phase without having to broadcast a
+     * notification. For testing purposes only.
+     */
+    get _trigger() {
+      let accepted = false;
+      try {
+        accepted = Services.prefs.getBoolPref("toolkit.asyncshutdown.testing");
+      } catch (ex) {
+        // Ignore errors
       }
-      if (state && typeof state != "function") {
-        throw new TypeError("Expected nothing or a function as third argument");
+      if (accepted) {
+        return () => spinner.observe();
       }
-      spinner.addBlocker({name: name, condition: condition, state: state});
+      return undefined;
     }
   });
   gPhases.set(topic, phase);
   return phase;
-}
+};
 
 /**
  * Utility class used to spin the event loop until all blockers for a
  * Phase are satisfied.
  *
  * @param {string} topic The xpcom notification for that phase.
  */
 function Spinner(topic) {
+  this._barrier = new Barrier(topic);
   this._topic = topic;
-  this._conditions = new Set(); // set to |null| once it is too late to register
   Services.obs.addObserver(this, topic, false);
 }
 
 Spinner.prototype = {
   /**
    * Register a new condition for this phase.
    *
    * @param {object} condition A Condition that must be fulfilled before
@@ -277,185 +301,421 @@ Spinner.prototype = {
    * Must contain fields:
    * - {string} name The human-readable name of the condition. Used
    * for debugging/error reporting.
    * - {function} action An action that needs to be completed
    * before we proceed to the next runstate. If |action| returns a promise,
    * we wait until the promise is resolved/rejected before proceeding
    * to the next runstate.
    */
-  addBlocker: function(condition) {
-    if (!this._conditions) {
-      throw new Error("Phase " + this._topic +
-                      " has already begun, it is too late to register" +
-                      " completion condition '" + condition.name + "'.");
-    }
-    this._conditions.add(condition);
+  addBlocker: function(name, condition, state) {
+    this._barrier.client.addBlocker(name, condition, state);
+  },
+  /**
+   * Remove the blocker for a condition.
+   *
+   * If several blockers have been registered for the same
+   * condition, remove all these blockers. If no blocker has been
+   * registered for this condition, this is a noop.
+   *
+   * @return {boolean} true if a blocker has been removed, false
+   * otherwise. Note that a result of false may mean either that
+   * the blocker has never been installed or that the phase has
+   * completed and the blocker has already been resolved.
+   */
+  removeBlocker: function(condition) {
+    return this._barrier.client.removeBlocker(condition);
   },
 
+  // nsIObserver.observe
   observe: function() {
     let topic = this._topic;
+    let barrier = this._barrier;
     Services.obs.removeObserver(this, topic);
 
+    let satisfied = false; // |true| once we have satisfied all conditions
+    let promise = this._barrier.wait({
+      warnAfterMS: DELAY_WARNING_MS,
+      crashAfterMS: DELAY_CRASH_MS
+    });
+
+    // Now, spin the event loop
+    promise.then(() => satisfied = true); // This promise cannot reject
+    let thread = Services.tm.mainThread;
+    while (!satisfied) {
+      thread.processNextEvent(true);
+    }
+  }
+};
+
+/**
+ * A mechanism used to register blockers that prevent some action from
+ * happening.
+ *
+ * An instance of |Barrier| provides a capability |client| that
+ * clients can use to register blockers. The barrier is resolved once
+ * all registered blockers have been resolved. The owner of the
+ * |Barrier| may wait for the resolution of the barrier and obtain
+ * information on which blockers have not been resolved yet.
+ *
+ * @param {string} name The name of the blocker. Used mainly for error-
+ *     reporting.
+ */
+function Barrier(name) {
+  /**
+   * The set of conditions registered by clients, as a map.
+   *
+   * Key: condition (function)
+   * Value: Array of {name: string, state: function}
+   */
+  this._conditions = new Map();
+
+  /**
+   * Indirections, used to let clients cancel a blocker when they
+   * call removeBlocker().
+   *
+   * Key: condition (function)
+   * Value: Deferred.
+   */
+  this._indirections = null;
+
+  /**
+   * The name of the barrier.
+   */
+  this._name = name;
+
+  /**
+   * A cache for the promise returned by wait().
+   */
+  this._promise = null;
+
+  /**
+   * An array of objects used to monitor the state of each blocker.
+   */
+  this._monitors = null;
+
+  /**
+   * The capability of adding blockers. This object may safely be returned
+   * or passed to clients.
+   */
+  this.client = {
+    /**
+     * Register a blocker for the completion of this barrier.
+     *
+     * @param {string} name The human-readable name of the blocker. Used
+     * for debugging/error reporting. Please make sure that the name
+     * respects the following model: "Some Service: some action in progress" -
+     * for instance "OS.File: flushing all pending I/O";
+     * @param {function|promise|*} condition A condition blocking the
+     * completion of the phase. Generally, this is a function
+     * returning a promise. This function is evaluated during the
+     * phase and the phase is guaranteed to not terminate until the
+     * resulting promise is either resolved or rejected. If
+     * |condition| is not a function but another value |v|, it behaves
+     * as if it were a function returning |v|.
+     * @param {function*} state Optionally, a function returning
+     * information about the current state of the blocker as an
+     * object. Used for providing more details when logging errors or
+     * crashing.
+     */
+    addBlocker: function(name, condition, state) {
+      if (typeof name != "string") {
+        throw new TypeError("Expected a human-readable name as first argument");
+      }
+      if (state && typeof state != "function") {
+        throw new TypeError("Expected nothing or a function as third argument");
+      }
+      if (!this._conditions) {
+	throw new Error("Phase " + this._name +
+			" has already begun, it is too late to register" +
+			" completion condition '" + name + "'.");
+      }
+      let set = this._conditions.get(condition);
+      if (!set) {
+        set = [];
+        this._conditions.set(condition, set);
+      }
+      set.push({name: name, state: state});
+    }.bind(this),
+
+    /**
+     * Remove the blocker for a condition.
+     *
+     * If several blockers have been registered for the same
+     * condition, remove all these blockers. If no blocker has been
+     * registered for this condition, this is a noop.
+     *
+     * @return {boolean} true if at least one blocker has been
+     * removed, false otherwise.
+     */
+    removeBlocker: function(condition) {
+      if (this._conditions) {
+        // wait() hasn't been called yet.
+        return this._conditions.delete(condition);
+      }
+
+      if (this._indirections) {
+        // wait() is in progress
+        let deferred = this._indirections.get(condition);
+        if (deferred) {
+          // Unlock the blocker
+          deferred.resolve();
+        }
+        return this._indirections.delete(condition);
+      }
+
+      // wait() is complete.
+      return false;
+    }.bind(this),
+  };
+}
+Barrier.prototype = Object.freeze({
+  /**
+   * The current state of the barrier, as a JSON-serializable object
+   * designed for error-reporting.
+   */
+  get state() {
+    if (this._conditions) {
+      return "Not started";
+    }
+    if (!this._monitors) {
+      return "Complete";
+    }
+    let frozen = [];
+    for (let {name, isComplete, state} of this._monitors) {
+      if (!isComplete) {
+        frozen.push({name: name, state: safeGetState(state)});
+      }
+    }
+    return frozen;
+  },
+
+  /**
+   * Wait until all currently registered blockers are complete.
+   *
+   * Once this method has been called, any attempt to register a new blocker
+   * for this barrier will cause an error.
+   *
+   * Successive calls to this method always return the same value.
+   *
+   * @param {object=}  options Optionally, an object  that may contain
+   * the following fields:
+   *  {number} warnAfterMS If provided and > 0, print a warning if the barrier
+   *   has not been resolved after the given number of milliseconds.
+   *  {number} crashAfterMS If provided and > 0, crash the process if the barrier
+   *   has not been resolved after the give number of milliseconds (rounded up
+   *   to the next second). To avoid crashing simply because the computer is busy
+   *   or going to sleep, we actually wait for ceil(crashAfterMS/1000) successive
+   *   periods of at least one second. Upon crashing, if a crash reporter is present,
+   *   prepare a crash report with the state of this barrier.
+   *
+   *
+   * @return {Promise} A promise satisfied once all blockers are complete.
+   */
+  wait: function(options = {}) {
+    // This method only implements caching on top of _wait()
+    if (this._promise) {
+      return this._promise;
+    }
+    return this._promise = this._wait(options);
+  },
+  _wait: function(options) {
+    let topic = this._name;
     let conditions = this._conditions;
     this._conditions = null; // Too late to register
-
     if (conditions.size == 0) {
-      // No need to spin anything
-      return;
+      return Promise.resolve();
     }
 
+    this._indirections = new Map();
     // The promises for which we are waiting.
     let allPromises = [];
 
     // Information to determine and report to the user which conditions
     // are not satisfied yet.
-    let allMonitors = [];
+    this._monitors = [];
 
-    for (let {condition, name, state} of conditions) {
-      // Gather all completion conditions
+    for (let _condition of conditions.keys()) {
+      for (let current of conditions.get(_condition)) {
+        let condition = _condition; // Avoid capturing the wrong variable
+        let {name, state} = current;
 
-      try {
-        if (typeof condition == "function") {
-          // Normalize |condition| to the result of the function.
-          try {
-            condition = condition(topic);
-          } catch (ex) {
-            condition = Promise.reject(ex);
+        // An indirection on top of condition, used to let clients
+        // cancel a blocker through removeBlocker.
+        let indirection = Promise.defer();
+        this._indirections.set(condition, indirection);
+
+        // Gather all completion conditions
+
+        try {
+          if (typeof condition == "function") {
+            // Normalize |condition| to the result of the function.
+            try {
+              condition = condition(topic);
+            } catch (ex) {
+              condition = Promise.reject(ex);
+            }
           }
-        }
-        // Normalize to a promise. Of course, if |condition| was not a
-        // promise in the first place (in particular if the above
-        // function returned |undefined| or failed), that new promise
-        // isn't going to be terribly interesting, but it will behave
-        // as a promise.
-        condition = Promise.resolve(condition);
 
-        // If the promise takes too long to be resolved/rejected,
-        // we need to notify the user.
-        //
-        // If it takes way too long, we need to crash.
+          // Normalize to a promise. Of course, if |condition| was not a
+          // promise in the first place (in particular if the above
+          // function returned |undefined| or failed), that new promise
+          // isn't going to be terribly interesting, but it will behave
+          // as a promise.
+          condition = Promise.resolve(condition);
 
-        let timer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
-        timer.initWithCallback(function() {
-          let msg = "A phase completion condition is" +
-            " taking too long to complete." +
-            " Condition: " + monitor.name +
-            " Phase: " + topic +
-            " State: " + safeGetState(state);
-          warn(msg);
-        }, DELAY_WARNING_MS, Ci.nsITimer.TYPE_ONE_SHOT);
+          let monitor = {
+            isComplete: false,
+            name: name,
+            state: state
+          };
 
-        let monitor = {
-          isFrozen: true,
-          name: name,
-          state: state
-        };
-        condition = condition.then(function onSuccess() {
-            timer.cancel(); // As a side-effect, this prevents |timer| from
-                            // being garbage-collected too early.
-            monitor.isFrozen = false;
-          }, function onError(error) {
-            timer.cancel();
+	  condition = condition.then(null, function onError(error) {
             let msg = "A completion condition encountered an error" +
-                " while we were spinning the event loop." +
-                " Condition: " + name +
-                " Phase: " + topic +
-                " State: " + safeGetState(state);
-            warn(msg, error);
-            monitor.isFrozen = false;
-        });
-        allMonitors.push(monitor);
-        allPromises.push(condition);
+              " while we were spinning the event loop." +
+	      " Condition: " + name +
+              " Phase: " + topic +
+              " State: " + safeGetState(state);
+	    warn(msg, error);
+	  });
+          condition.then(() => indirection.resolve());
 
-      } catch (error) {
-          let msg = "A completion condition encountered an error" +
-                " while we were initializing the phase." +
-                " Condition: " + name +
-                " Phase: " + topic +
-                " State: " + safeGetState(state);
-          warn(msg, error);
+          indirection.promise.then(() => monitor.isComplete = true);
+          this._monitors.push(monitor);
+          allPromises.push(indirection.promise);
+
+        } catch (error) {
+            let msg = "A completion condition encountered an error" +
+                  " while we were initializing the phase." +
+                  " Condition: " + name +
+                  " Phase: " + topic +
+                  " State: " + safeGetState(state);
+            warn(msg, error);
+        }
+
       }
-
     }
     conditions = null;
 
     let promise = Promise.all(allPromises);
     allPromises = null;
 
     promise = promise.then(null, function onError(error) {
       // I don't think that this can happen.
       // However, let's be overcautious with async/shutdown error reporting.
       let msg = "An uncaught error appeared while completing the phase." +
             " Phase: " + topic;
       warn(msg, error);
     });
 
-    let satisfied = false; // |true| once we have satisfied all conditions
-
-    // If after DELAY_CRASH_MS (approximately one minute, adjusted to take
-    // into account sleep and otherwise busy computer) we have not finished
-    // this shutdown phase, we assume that the shutdown is somehow frozen,
-    // presumably deadlocked. At this stage, the only thing we can do to
-    // avoid leaving the user's computer in an unstable (and battery-sucking)
-    // situation is report the issue and crash.
-    let timeToCrash = looseTimer(DELAY_CRASH_MS);
-    timeToCrash.promise.then(
-      function onTimeout() {
-        // Report the problem as best as we can, then crash.
-        let frozen = [];
-        let states = [];
-        for (let {name, isFrozen, state} of allMonitors) {
-          if (isFrozen) {
-            frozen.push({name: name, state: safeGetState(state)});
-          }
-        }
-
-        let msg = "At least one completion condition failed to complete" +
-              " within a reasonable amount of time. Causing a crash to" +
-              " ensure that we do not leave the user with an unresponsive" +
-              " process draining resources." +
-              " Conditions: " + JSON.stringify(frozen) +
-              " Phase: " + topic;
-        err(msg);
-        if (gCrashReporter && gCrashReporter.enabled) {
-          let data = {
-            phase: topic,
-            conditions: frozen
-          };
-          gCrashReporter.annotateCrashReport("AsyncShutdownTimeout",
-            JSON.stringify(data));
-        } else {
-          warn("No crash reporter available");
-        }
-
-        let error = new Error();
-        gDebug.abort(error.fileName, error.lineNumber + 1);
-      },
-      function onSatisfied() {
-        // The promise has been rejected, which means that we have satisfied
-        // all completion conditions.
-      });
-
-    promise = promise.then(function() {
-      satisfied = true;
-      timeToCrash.reject();
-    }/* No error is possible here*/);
-
-    // Now, spin the event loop
-    let thread = Services.tm.mainThread;
-    while(!satisfied) {
-      thread.processNextEvent(true);
-    }
-  }
-};
+    promise = promise.then(() => {
+      this._monitors = null;
+      this._indirections = null;
+    }); // Memory cleanup
 
 
-// List of well-known runstates
-// Ideally, runstates should be registered from the component that decides
+    // Now handle warnings and crashes
+
+    let warnAfterMS = DELAY_WARNING_MS;
+    if (options && "warnAfterMS" in options) {
+      if (typeof options.warnAfterMS == "number"
+         || options.warnAfterMS == null) {
+        // Change the delay or deactivate warnAfterMS
+        warnAfterMS = options.warnAfterMS;
+      } else {
+        throw new TypeError("Wrong option value for warnAfterMS");
+      }
+    }
+
+    if (warnAfterMS && warnAfterMS > 0) {
+      // If the promise takes too long to be resolved/rejected,
+      // we need to notify the user.
+      let timer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
+      timer.initWithCallback(function() {
+        let msg = "At least one completion condition is taking too long to complete." +
+	  " Conditions: " + safeGetState(this.state) +
+	  " Barrier: " + topic;
+        warn(msg);
+      }.bind(this), warnAfterMS, Ci.nsITimer.TYPE_ONE_SHOT);
+
+      promise = promise.then(function onSuccess() {
+        timer.cancel();
+        // As a side-effect, this prevents |timer| from
+        // being garbage-collected too early.
+      });
+    }
+
+    let crashAfterMS = DELAY_CRASH_MS;
+    if (options && "crashAfterMS" in options) {
+      if (typeof options.crashAfterMS == "number"
+         || options.crashAfterMS == null) {
+        // Change the delay or deactivate crashAfterMS
+        crashAfterMS = options.crashAfterMS;
+      } else {
+        throw new TypeError("Wrong option value for crashAfterMS");
+      }
+    }
+
+    if (crashAfterMS  > 0) {
+      let timeToCrash = null;
+
+      // If after |crashAfterMS| milliseconds (adjusted to take into
+      // account sleep and otherwise busy computer) we have not finished
+      // this shutdown phase, we assume that the shutdown is somehow
+      // frozen, presumably deadlocked. At this stage, the only thing we
+      // can do to avoid leaving the user's computer in an unstable (and
+      // battery-sucking) situation is report the issue and crash.
+      timeToCrash = looseTimer(crashAfterMS);
+      timeToCrash.promise.then(
+        function onTimeout() {
+	  // Report the problem as best as we can, then crash.
+	  let state = this.state;
+
+	  let msg = "At least one completion condition failed to complete" +
+	    " within a reasonable amount of time. Causing a crash to" +
+	    " ensure that we do not leave the user with an unresponsive" +
+	    " process draining resources." +
+	    " Conditions: " + JSON.stringify(state) +
+	    " Barrier: " + topic;
+	  err(msg);
+	  if (gCrashReporter && gCrashReporter.enabled) {
+            let data = {
+              phase: topic,
+              conditions: state
+	    };
+            gCrashReporter.annotateCrashReport("AsyncShutdownTimeout",
+            JSON.stringify(data));
+	  } else {
+            warn("No crash reporter available");
+	  }
+
+	  let error = new Error();
+	  gDebug.abort(error.fileName, error.lineNumber + 1);
+        }.bind(this),
+	  function onSatisfied() {
+            // The promise has been rejected, which means that we have satisfied
+            // all completion conditions.
+          });
+
+      promise = promise.then(function() {
+        timeToCrash.reject();
+      }.bind(this)/* No error is possible here*/);
+    }
+
+    return promise;
+  },
+});
+
+
+
+// List of well-known phases
+// Ideally, phases should be registered from the component that decides
 // when they start/stop. For compatibility with existing startup/shutdown
-// mechanisms, we register a few runstates here.
+// mechanisms, we register a few phases here.
 
 this.AsyncShutdown.profileChangeTeardown = getPhase("profile-change-teardown");
 this.AsyncShutdown.profileBeforeChange = getPhase("profile-before-change");
 this.AsyncShutdown.sendTelemetry = getPhase("profile-before-change2");
 this.AsyncShutdown.webWorkersShutdown = getPhase("web-workers-shutdown");
+
+this.AsyncShutdown.Barrier = Barrier;
+
 Object.freeze(this.AsyncShutdown);
--- a/toolkit/modules/tests/xpcshell/test_AsyncShutdown.js
+++ b/toolkit/modules/tests/xpcshell/test_AsyncShutdown.js
@@ -1,16 +1,82 @@
 let Cu = Components.utils;
 
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/Promise.jsm");
 Cu.import("resource://gre/modules/AsyncShutdown.jsm");
 
 Services.prefs.setBoolPref("toolkit.asyncshutdown.testing", true);
 
+function run_test() {
+  run_next_test();
+}
+
+function get_exn(f) {
+  try {
+    f();
+    return null;
+  } catch (ex) {
+    return ex;
+  }
+}
+
+function do_check_exn(exn, constructor) {
+  do_check_neq(exn, null);
+  if (exn.name == constructor) {
+    do_check_eq(exn.constructor.name, constructor);
+    return;
+  }
+  do_print("Wrong error constructor");
+  do_print(exn.constructor.name);
+  do_print(exn.stack);
+  do_check_true(false);
+}
+
+/**
+ * Utility function used to provide the same API for both AsyncShutdown
+ * phases and AsyncShutdown barriers.
+ *
+ * @param {bool} heavy If |true|, use a AsyncShutdown phase, otherwise
+ * a AsyncShutdown barrier.
+ *
+ * @return An object with the following methods:
+ *   - addBlocker() - the same method as AsyncShutdown phases and barrier clients
+ *   - wait() - trigger the resolution of the lock
+ */
+function makeLock(heavy) {
+  if (heavy) {
+    let topic = "test-Phase-" + ++makeLock.counter;
+    let phase = AsyncShutdown._getPhase(topic);
+    return {
+      addBlocker: function(...args) {
+        return phase.addBlocker(...args);
+      },
+      removeBlocker: function(blocker) {
+        return phase.removeBlocker(blocker);
+      },
+      wait: function() {
+        Services.obs.notifyObservers(null, topic, null);
+        return Promise.resolve();
+      }
+    };
+  } else {
+    let name = "test-Barrier-" + ++makeLock.counter;
+    let barrier = new AsyncShutdown.Barrier(name);
+    return {
+      addBlocker: barrier.client.addBlocker,
+      removeBlocker: barrier.client.removeBlocker,
+      wait: function() {
+        return barrier.wait();
+      }
+    };
+  }
+}
+makeLock.counter = 0;
+
 /**
  * An asynchronous task that takes several ticks to complete.
  *
  * @param {*=} resolution The value with which the resulting promise will be
  * resolved once the task is complete. This may be a rejected promise,
  * in which case the resulting promise will itself be rejected.
  * @param {object=} outResult An object modified by side-effect during the task.
  * Initially, its field |isFinished| is set to |false|. Once the task is
@@ -27,130 +93,184 @@ function longRunningAsyncTask(resolution
   do_timeout(100, function() {
     ++outResult.countFinished;
     outResult.isFinished = true;
     deferred.resolve(resolution);
   });
   return deferred.promise;
 }
 
-/**
- * Generate a unique notification topic.
- */
-function getUniqueTopic() {
-  const PREFIX = "testing-phases-";
-  return PREFIX + ++getUniqueTopic.counter;
-}
-getUniqueTopic.counter = 0;
+
+//////// Tests on AsyncShutdown phases
 
-add_task(function test_no_condition() {
-  do_print("Testing a phase with no condition");
-  let topic = getUniqueTopic();
-  AsyncShutdown._getPhase(topic);
-  Services.obs.notifyObservers(null, topic, null);
-  do_print("Phase with no condition didn't lock");
+/*add_task*/(function* test_no_condition() {
+  for (let heavy of [false, true]) {
+    do_print("Testing a barrier with no condition (" + heavy?"heavy":"light" + ")");
+    let lock = makeLock(heavy);
+    yield lock.wait();
+    do_print("Barrier with no condition didn't lock");
+  }
 });
 
-add_task(function test_simple_async() {
+/*add_task*/(function* test_phase_simple_async() {
   do_print("Testing various combinations of a phase with a single condition");
-  for (let arg of [undefined, null, "foo", 100, new Error("BOOM")]) {
-    for (let resolution of [arg, Promise.reject(arg)]) {
-      for (let success of [false, true]) {
-        for (let state of [[null],
-                           [],
-                           [() => "some state"],
-                           [function() {
-                             throw new Error("State BOOM"); }],
-                           [function() {
-                             return {
-                               toJSON: function() {
-                                 throw new Error("State.toJSON BOOM");
-                               }
-                             };
-                           }]]) {
-          // Asynchronous phase
-          do_print("Asynchronous test with " + arg + ", " + resolution);
-          let topic = getUniqueTopic();
-          let outParam = { isFinished: false };
-          AsyncShutdown._getPhase(topic).addBlocker(
-            "Async test",
-              function() {
-                if (success) {
-                  return longRunningAsyncTask(resolution, outParam);
-                } else {
-                  throw resolution;
-                }
-              },
-              ...state
-          );
-          do_check_false(outParam.isFinished);
-          Services.obs.notifyObservers(null, topic, null);
-          do_check_eq(outParam.isFinished, success);
+  for (let heavy of [false, true]) {
+    for (let arg of [undefined, null, "foo", 100, new Error("BOOM")]) {
+      for (let resolution of [arg, Promise.reject(arg)]) {
+        for (let success of [false, true]) {
+          for (let state of [[null],
+                             [],
+                             [() => "some state"],
+                             [function() {
+                               throw new Error("State BOOM"); }],
+                             [function() {
+                               return {
+                                 toJSON: function() {
+                                   throw new Error("State.toJSON BOOM");
+                                 }
+                               };
+                             }]]) {
+            // Asynchronous phase
+            do_print("Asynchronous test with " + arg + ", " + resolution + ", " + heavy);
+            let lock = makeLock(heavy);
+            let outParam = { isFinished: false };
+            lock.addBlocker(
+              "Async test",
+               function() {
+                 if (success) {
+                   return longRunningAsyncTask(resolution, outParam);
+                 } else {
+                   throw resolution;
+                 }
+               },
+               ...state
+            );
+            do_check_false(outParam.isFinished);
+            yield lock.wait();
+            do_check_eq(outParam.isFinished, success);
+          }
         }
+
+        // Synchronous phase - just test that we don't throw/freeze
+        do_print("Synchronous test with " + arg + ", " + resolution + ", " + heavy);
+        let lock = makeLock(heavy);
+        lock.addBlocker(
+          "Sync test",
+          resolution
+        );
+        yield lock.wait();
       }
-
-      // Synchronous phase - just test that we don't throw/freeze
-      do_print("Synchronous test with " + arg + ", " + resolution);
-      let topic = getUniqueTopic();
-      AsyncShutdown._getPhase(topic).addBlocker(
-        "Sync test",
-        resolution
-      );
-      Services.obs.notifyObservers(null, topic, null);
     }
   }
 });
 
-add_task(function test_many() {
+/*add_task*/(function* test_phase_many() {
   do_print("Testing various combinations of a phase with many conditions");
-  let topic = getUniqueTopic();
-  let phase = AsyncShutdown._getPhase(topic);
-  let outParams = [];
-  for (let arg of [undefined, null, "foo", 100, new Error("BOOM")]) {
-    for (let resolution of [arg, Promise.reject(arg)]) {
-      let outParam = { isFinished: false };
-      phase.addBlocker(
-        "Test",
-        () => longRunningAsyncTask(resolution, outParam)
-      );
+  for (let heavy of [false, true]) {
+    let lock = makeLock(heavy);
+    let outParams = [];
+    for (let arg of [undefined, null, "foo", 100, new Error("BOOM")]) {
+      for (let resolve of [true, false]) {
+        do_print("Testing with " + heavy + ", " + arg + ", " + resolve);
+        let resolution = resolve ? arg : Promise.reject(arg);
+        let outParam = { isFinished: false };
+        lock.addBlocker(
+          "Test",
+          () => longRunningAsyncTask(resolution, outParam)
+        );
+      }
     }
+    do_check_true(outParams.every((x) => !x.isFinished));
+    yield lock.wait();
+    do_check_true(outParams.every((x) => x.isFinished));
   }
-  do_check_true(outParams.every((x) => !x.isFinished));
-  Services.obs.notifyObservers(null, topic, null);
-  do_check_true(outParams.every((x) => x.isFinished));
+});
+
+
+
+/*add_task*/(function* test_phase_various_failures() {
+  do_print("Ensure that we cannot add a condition for a phase once notification has been received");
+  for (let heavy of [false, true]) {
+    let lock = makeLock(heavy);
+    lock.wait(); // Don't actually wait for the promise to be resolved
+    let exn = get_exn(() => lock.addBlocker("Test", true));
+    do_check_true(!!exn);
+
+    do_print("Ensure that an incomplete blocker causes a TypeError");
+
+    lock = makeLock(heavy);
+    exn = get_exn(() => lock.addBlocker());
+    do_check_exn(exn, "TypeError");
+
+    exn = get_exn(() => lock.addBlocker(null, true));
+    do_check_exn(exn, "TypeError");
+
+    exn = get_exn(() => lock.addBlocker("Test 2", () => true, "not a function"));
+    do_check_exn(exn, "TypeError");
+  }
 });
 
-function get_exn(f) {
-  try {
-    f();
-    return null;
-  } catch (ex) {
-    return ex;
-  }
-}
+
+add_task(function* test_phase_removeBlocker() {
+  do_print("Testing that we can call removeBlocker before, during and after the call to wait()");
+
+  for (let heavy of [false, true]) {
+
+    do_print("Attempt to add then remove a blocker before wait()");
+    let lock = makeLock(heavy);
+    let blocker = () => {
+      do_print("This promise will never be resolved");
+      return Promise.defer().promise;
+    };
+
+    lock.addBlocker("Wait forever", blocker);
+    do_check_true(lock.removeBlocker(blocker));
+    do_check_false(lock.removeBlocker(blocker));
+    do_print("Attempt to remove non-registered blockers before wait()");
+    do_check_false(lock.removeBlocker("foo"));
+    do_check_false(lock.removeBlocker(null));
+    do_print("Waiting (should lift immediately)");
+    yield lock.wait();
 
-add_task(function test_various_failures() {
-  do_print("Ensure that we cannot add a condition for a phase that is already complete");
-  let topic = getUniqueTopic();
-  let phase = AsyncShutdown._getPhase(topic);
-  Services.obs.notifyObservers(null, topic, null);
-  let exn = get_exn(() => phase.addBlocker("Test", true));
-  do_check_true(!!exn);
+    do_print("Attempt to add a blocker then remove it during wait()");
+    lock = makeLock(heavy);
+    let blockers = [
+      () => {
+        do_print("This blocker will self-destruct");
+        do_check_true(lock.removeBlocker(blockers[0]));
+        return Promise.defer().promise;
+      },
+      () => {
+        do_print("This blocker will self-destruct twice");
+        do_check_true(lock.removeBlocker(blockers[1]));
+        do_check_false(lock.removeBlocker(blockers[1]));
+        return Promise.defer().promise;
+      },
+      () => {
+        do_print("Attempt to remove non-registered blockers during wait()");
+        do_check_false(lock.removeBlocker("foo"));
+        do_check_false(lock.removeBlocker(null));
+      }
+    ];
+    for (let i in blockers) {
+      lock.addBlocker("Wait forever again: " + i, blockers[i]);
+    }
+    do_print("Waiting (should lift very quickly)");
+    yield lock.wait();
+    do_check_false(lock.removeBlocker(blockers[0]));
 
-  do_print("Ensure that an incomplete blocker causes a TypeError");
 
-  exn = get_exn(() => phase.addBlocker());
-  do_check_eq(exn.name, "TypeError");
+    do_print("Attempt to remove a blocker after wait");
+    lock = makeLock(heavy);
+    blocker = Promise.resolve;
+    yield lock.wait();
 
-  exn = get_exn(() => phase.addBlocker(null, true));
-  do_check_eq(exn.name, "TypeError");
+    do_print("Attempt to remove non-registered blocker after wait()");
+    do_check_false(lock.removeBlocker("foo"));
+    do_check_false(lock.removeBlocker(null));
+  }
 
-  exn = get_exn(() => phase.addBlocker("Test 2", () => true, "not a function"));
-  do_check_eq(exn.name, "TypeError");
 });
 
 add_task(function() {
   Services.prefs.clearUserPref("toolkit.asyncshutdown.testing");
 });
 
-function run_test() {
-  run_next_test();
-}