Bug 985655 - Split AsyncShutdown's mechanism into a lightweight Barrier (that doesn't spin the event loop and doesn't cause crashes) and a heavyweight Spinner (that does). Also, exposing Barrier so as to let services expose lightweight shutdown dependencies without having to spin the event loop. r=froydnj, sr=bsmedberg
authorDavid Rajchenbach-Teller <dteller@mozilla.com>
Mon, 12 May 2014 12:44:00 -0400
changeset 182884 01bc471d7a7f35e19b147c387dfabf1d0c4eb909
parent 182883 fa62732307ee1d4c2756db477ff897c28eb25393
child 182885 a2405cd005f696cb98e7e991fcb6fb3e2f484c8a
push id26772
push userryanvm@gmail.com
push dateTue, 13 May 2014 20:03:29 +0000
treeherdermozilla-central@08070d4b85f4 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersfroydnj, bsmedberg
bugs985655
milestone32.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 985655 - Split AsyncShutdown's mechanism into a lightweight Barrier (that doesn't spin the event loop and doesn't cause crashes) and a heavyweight Spinner (that does). Also, exposing Barrier so as to let services expose lightweight shutdown dependencies without having to spin the event loop. r=froydnj, sr=bsmedberg
toolkit/modules/AsyncShutdown.jsm
toolkit/modules/tests/xpcshell/test_AsyncShutdown.js
--- a/toolkit/modules/AsyncShutdown.jsm
+++ b/toolkit/modules/AsyncShutdown.jsm
@@ -235,41 +235,65 @@ function getPhase(topic) {
      * });
      *
      * AsyncShutdown.profileBeforeChange.addBlocker("Module: trivial callback",
      *     function callback() {
      *       // ...
      *       // Execute this code during profileBeforeChange
      *       // No specific guarantee about completion of profileBeforeChange
      * });
-     *
      */
     addBlocker: function(name, condition, state = null) {
-      if (typeof name != "string") {
-        throw new TypeError("Expected a human-readable name as first argument");
+      spinner.addBlocker(name, condition, state);
+    },
+    /**
+     * Remove the blocker for a condition.
+     *
+     * If several blockers have been registered for the same
+     * condition, remove all these blockers. If no blocker has been
+     * registered for this condition, this is a noop.
+     *
+     * @return {boolean} true if a blocker has been removed, false
+     * otherwise. Note that a result of false may mean either that
+     * the blocker has never been installed or that the phase has
+     * completed and the blocker has already been resolved.
+     */
+    removeBlocker: function(condition) {
+      return spinner.removeBlocker(condition);
+    },
+    /**
+     * Trigger the phase without having to broadcast a
+     * notification. For testing purposes only.
+     */
+    get _trigger() {
+      let accepted = false;
+      try {
+        accepted = Services.prefs.getBoolPref("toolkit.asyncshutdown.testing");
+      } catch (ex) {
+        // Ignore errors
       }
-      if (state && typeof state != "function") {
-        throw new TypeError("Expected nothing or a function as third argument");
+      if (accepted) {
+        return () => spinner.observe();
       }
-      spinner.addBlocker({name: name, condition: condition, state: state});
+      return undefined;
     }
   });
   gPhases.set(topic, phase);
   return phase;
-}
+};
 
 /**
  * Utility class used to spin the event loop until all blockers for a
  * Phase are satisfied.
  *
  * @param {string} topic The xpcom notification for that phase.
  */
 function Spinner(topic) {
+  this._barrier = new Barrier(topic);
   this._topic = topic;
-  this._conditions = new Set(); // set to |null| once it is too late to register
   Services.obs.addObserver(this, topic, false);
 }
 
 Spinner.prototype = {
   /**
    * Register a new condition for this phase.
    *
    * @param {object} condition A Condition that must be fulfilled before
@@ -277,185 +301,421 @@ Spinner.prototype = {
    * Must contain fields:
    * - {string} name The human-readable name of the condition. Used
    * for debugging/error reporting.
    * - {function} action An action that needs to be completed
    * before we proceed to the next runstate. If |action| returns a promise,
    * we wait until the promise is resolved/rejected before proceeding
    * to the next runstate.
    */
-  addBlocker: function(condition) {
-    if (!this._conditions) {
-      throw new Error("Phase " + this._topic +
-                      " has already begun, it is too late to register" +
-                      " completion condition '" + condition.name + "'.");
-    }
-    this._conditions.add(condition);
+  addBlocker: function(name, condition, state) {
+    this._barrier.client.addBlocker(name, condition, state);
+  },
+  /**
+   * Remove the blocker for a condition.
+   *
+   * If several blockers have been registered for the same
+   * condition, remove all these blockers. If no blocker has been
+   * registered for this condition, this is a noop.
+   *
+   * @return {boolean} true if a blocker has been removed, false
+   * otherwise. Note that a result of false may mean either that
+   * the blocker has never been installed or that the phase has
+   * completed and the blocker has already been resolved.
+   */
+  removeBlocker: function(condition) {
+    return this._barrier.client.removeBlocker(condition);
   },
 
+  // nsIObserver.observe
   observe: function() {
     let topic = this._topic;
+    let barrier = this._barrier;
     Services.obs.removeObserver(this, topic);
 
+    let satisfied = false; // |true| once we have satisfied all conditions
+    let promise = this._barrier.wait({
+      warnAfterMS: DELAY_WARNING_MS,
+      crashAfterMS: DELAY_CRASH_MS
+    });
+
+    // Now, spin the event loop
+    promise.then(() => satisfied = true); // This promise cannot reject
+    let thread = Services.tm.mainThread;
+    while (!satisfied) {
+      thread.processNextEvent(true);
+    }
+  }
+};
+
+/**
+ * A mechanism used to register blockers that prevent some action from
+ * happening.
+ *
+ * An instance of |Barrier| provides a capability |client| that
+ * clients can use to register blockers. The barrier is resolved once
+ * all registered blockers have been resolved. The owner of the
+ * |Barrier| may wait for the resolution of the barrier and obtain
+ * information on which blockers have not been resolved yet.
+ *
+ * @param {string} name The name of the blocker. Used mainly for error-
+ *     reporting.
+ */
+function Barrier(name) {
+  /**
+   * The set of conditions registered by clients, as a map.
+   *
+   * Key: condition (function)
+   * Value: Array of {name: string, state: function}
+   */
+  this._conditions = new Map();
+
+  /**
+   * Indirections, used to let clients cancel a blocker when they
+   * call removeBlocker().
+   *
+   * Key: condition (function)
+   * Value: Deferred.
+   */
+  this._indirections = null;
+
+  /**
+   * The name of the barrier.
+   */
+  this._name = name;
+
+  /**
+   * A cache for the promise returned by wait().
+   */
+  this._promise = null;
+
+  /**
+   * An array of objects used to monitor the state of each blocker.
+   */
+  this._monitors = null;
+
+  /**
+   * The capability of adding blockers. This object may safely be returned
+   * or passed to clients.
+   */
+  this.client = {
+    /**
+     * Register a blocker for the completion of this barrier.
+     *
+     * @param {string} name The human-readable name of the blocker. Used
+     * for debugging/error reporting. Please make sure that the name
+     * respects the following model: "Some Service: some action in progress" -
+     * for instance "OS.File: flushing all pending I/O";
+     * @param {function|promise|*} condition A condition blocking the
+     * completion of the phase. Generally, this is a function
+     * returning a promise. This function is evaluated during the
+     * phase and the phase is guaranteed to not terminate until the
+     * resulting promise is either resolved or rejected. If
+     * |condition| is not a function but another value |v|, it behaves
+     * as if it were a function returning |v|.
+     * @param {function*} state Optionally, a function returning
+     * information about the current state of the blocker as an
+     * object. Used for providing more details when logging errors or
+     * crashing.
+     */
+    addBlocker: function(name, condition, state) {
+      if (typeof name != "string") {
+        throw new TypeError("Expected a human-readable name as first argument");
+      }
+      if (state && typeof state != "function") {
+        throw new TypeError("Expected nothing or a function as third argument");
+      }
+      if (!this._conditions) {
+	throw new Error("Phase " + this._name +
+			" has already begun, it is too late to register" +
+			" completion condition '" + name + "'.");
+      }
+      let set = this._conditions.get(condition);
+      if (!set) {
+        set = [];
+        this._conditions.set(condition, set);
+      }
+      set.push({name: name, state: state});
+    }.bind(this),
+
+    /**
+     * Remove the blocker for a condition.
+     *
+     * If several blockers have been registered for the same
+     * condition, remove all these blockers. If no blocker has been
+     * registered for this condition, this is a noop.
+     *
+     * @return {boolean} true if at least one blocker has been
+     * removed, false otherwise.
+     */
+    removeBlocker: function(condition) {
+      if (this._conditions) {
+        // wait() hasn't been called yet.
+        return this._conditions.delete(condition);
+      }
+
+      if (this._indirections) {
+        // wait() is in progress
+        let deferred = this._indirections.get(condition);
+        if (deferred) {
+          // Unlock the blocker
+          deferred.resolve();
+        }
+        return this._indirections.delete(condition);
+      }
+
+      // wait() is complete.
+      return false;
+    }.bind(this),
+  };
+}
+Barrier.prototype = Object.freeze({
+  /**
+   * The current state of the barrier, as a JSON-serializable object
+   * designed for error-reporting.
+   */
+  get state() {
+    if (this._conditions) {
+      return "Not started";
+    }
+    if (!this._monitors) {
+      return "Complete";
+    }
+    let frozen = [];
+    for (let {name, isComplete, state} of this._monitors) {
+      if (!isComplete) {
+        frozen.push({name: name, state: safeGetState(state)});
+      }
+    }
+    return frozen;
+  },
+
+  /**
+   * Wait until all currently registered blockers are complete.
+   *
+   * Once this method has been called, any attempt to register a new blocker
+   * for this barrier will cause an error.
+   *
+   * Successive calls to this method always return the same value.
+   *
+   * @param {object=}  options Optionally, an object  that may contain
+   * the following fields:
+   *  {number} warnAfterMS If provided and > 0, print a warning if the barrier
+   *   has not been resolved after the given number of milliseconds.
+   *  {number} crashAfterMS If provided and > 0, crash the process if the barrier
+   *   has not been resolved after the give number of milliseconds (rounded up
+   *   to the next second). To avoid crashing simply because the computer is busy
+   *   or going to sleep, we actually wait for ceil(crashAfterMS/1000) successive
+   *   periods of at least one second. Upon crashing, if a crash reporter is present,
+   *   prepare a crash report with the state of this barrier.
+   *
+   *
+   * @return {Promise} A promise satisfied once all blockers are complete.
+   */
+  wait: function(options = {}) {
+    // This method only implements caching on top of _wait()
+    if (this._promise) {
+      return this._promise;
+    }
+    return this._promise = this._wait(options);
+  },
+  _wait: function(options) {
+    let topic = this._name;
     let conditions = this._conditions;
     this._conditions = null; // Too late to register
-
     if (conditions.size == 0) {
-      // No need to spin anything
-      return;
+      return Promise.resolve();
     }
 
+    this._indirections = new Map();
     // The promises for which we are waiting.
     let allPromises = [];
 
     // Information to determine and report to the user which conditions
     // are not satisfied yet.
-    let allMonitors = [];
+    this._monitors = [];
 
-    for (let {condition, name, state} of conditions) {
-      // Gather all completion conditions
+    for (let _condition of conditions.keys()) {
+      for (let current of conditions.get(_condition)) {
+        let condition = _condition; // Avoid capturing the wrong variable
+        let {name, state} = current;
 
-      try {
-        if (typeof condition == "function") {
-          // Normalize |condition| to the result of the function.
-          try {
-            condition = condition(topic);
-          } catch (ex) {
-            condition = Promise.reject(ex);
+        // An indirection on top of condition, used to let clients
+        // cancel a blocker through removeBlocker.
+        let indirection = Promise.defer();
+        this._indirections.set(condition, indirection);
+
+        // Gather all completion conditions
+
+        try {
+          if (typeof condition == "function") {
+            // Normalize |condition| to the result of the function.
+            try {
+              condition = condition(topic);
+            } catch (ex) {
+              condition = Promise.reject(ex);
+            }
           }
-        }
-        // Normalize to a promise. Of course, if |condition| was not a
-        // promise in the first place (in particular if the above
-        // function returned |undefined| or failed), that new promise
-        // isn't going to be terribly interesting, but it will behave
-        // as a promise.
-        condition = Promise.resolve(condition);
 
-        // If the promise takes too long to be resolved/rejected,
-        // we need to notify the user.
-        //
-        // If it takes way too long, we need to crash.
+          // Normalize to a promise. Of course, if |condition| was not a
+          // promise in the first place (in particular if the above
+          // function returned |undefined| or failed), that new promise
+          // isn't going to be terribly interesting, but it will behave
+          // as a promise.
+          condition = Promise.resolve(condition);
 
-        let timer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
-        timer.initWithCallback(function() {
-          let msg = "A phase completion condition is" +
-            " taking too long to complete." +
-            " Condition: " + monitor.name +
-            " Phase: " + topic +
-            " State: " + safeGetState(state);
-          warn(msg);
-        }, DELAY_WARNING_MS, Ci.nsITimer.TYPE_ONE_SHOT);
+          let monitor = {
+            isComplete: false,
+            name: name,
+            state: state
+          };
 
-        let monitor = {
-          isFrozen: true,
-          name: name,
-          state: state
-        };
-        condition = condition.then(function onSuccess() {
-            timer.cancel(); // As a side-effect, this prevents |timer| from
-                            // being garbage-collected too early.
-            monitor.isFrozen = false;
-          }, function onError(error) {
-            timer.cancel();
+	  condition = condition.then(null, function onError(error) {
             let msg = "A completion condition encountered an error" +
-                " while we were spinning the event loop." +
-                " Condition: " + name +
-                " Phase: " + topic +
-                " State: " + safeGetState(state);
-            warn(msg, error);
-            monitor.isFrozen = false;
-        });
-        allMonitors.push(monitor);
-        allPromises.push(condition);
+              " while we were spinning the event loop." +
+	      " Condition: " + name +
+              " Phase: " + topic +
+              " State: " + safeGetState(state);
+	    warn(msg, error);
+	  });
+          condition.then(() => indirection.resolve());
 
-      } catch (error) {
-          let msg = "A completion condition encountered an error" +
-                " while we were initializing the phase." +
-                " Condition: " + name +
-                " Phase: " + topic +
-                " State: " + safeGetState(state);
-          warn(msg, error);
+          indirection.promise.then(() => monitor.isComplete = true);
+          this._monitors.push(monitor);
+          allPromises.push(indirection.promise);
+
+        } catch (error) {
+            let msg = "A completion condition encountered an error" +
+                  " while we were initializing the phase." +
+                  " Condition: " + name +
+                  " Phase: " + topic +
+                  " State: " + safeGetState(state);
+            warn(msg, error);
+        }
+
       }
-
     }
     conditions = null;
 
     let promise = Promise.all(allPromises);
     allPromises = null;
 
     promise = promise.then(null, function onError(error) {
       // I don't think that this can happen.
       // However, let's be overcautious with async/shutdown error reporting.
       let msg = "An uncaught error appeared while completing the phase." +
             " Phase: " + topic;
       warn(msg, error);
     });
 
-    let satisfied = false; // |true| once we have satisfied all conditions
-
-    // If after DELAY_CRASH_MS (approximately one minute, adjusted to take
-    // into account sleep and otherwise busy computer) we have not finished
-    // this shutdown phase, we assume that the shutdown is somehow frozen,
-    // presumably deadlocked. At this stage, the only thing we can do to
-    // avoid leaving the user's computer in an unstable (and battery-sucking)
-    // situation is report the issue and crash.
-    let timeToCrash = looseTimer(DELAY_CRASH_MS);
-    timeToCrash.promise.then(
-      function onTimeout() {
-        // Report the problem as best as we can, then crash.
-        let frozen = [];
-        let states = [];
-        for (let {name, isFrozen, state} of allMonitors) {
-          if (isFrozen) {
-            frozen.push({name: name, state: safeGetState(state)});
-          }
-        }
-
-        let msg = "At least one completion condition failed to complete" +
-              " within a reasonable amount of time. Causing a crash to" +
-              " ensure that we do not leave the user with an unresponsive" +
-              " process draining resources." +
-              " Conditions: " + JSON.stringify(frozen) +
-              " Phase: " + topic;
-        err(msg);
-        if (gCrashReporter && gCrashReporter.enabled) {
-          let data = {
-            phase: topic,
-            conditions: frozen
-          };
-          gCrashReporter.annotateCrashReport("AsyncShutdownTimeout",
-            JSON.stringify(data));
-        } else {
-          warn("No crash reporter available");
-        }
-
-        let error = new Error();
-        gDebug.abort(error.fileName, error.lineNumber + 1);
-      },
-      function onSatisfied() {
-        // The promise has been rejected, which means that we have satisfied
-        // all completion conditions.
-      });
-
-    promise = promise.then(function() {
-      satisfied = true;
-      timeToCrash.reject();
-    }/* No error is possible here*/);
-
-    // Now, spin the event loop
-    let thread = Services.tm.mainThread;
-    while(!satisfied) {
-      thread.processNextEvent(true);
-    }
-  }
-};
+    promise = promise.then(() => {
+      this._monitors = null;
+      this._indirections = null;
+    }); // Memory cleanup
 
 
-// List of well-known runstates
-// Ideally, runstates should be registered from the component that decides
+    // Now handle warnings and crashes
+
+    let warnAfterMS = DELAY_WARNING_MS;
+    if (options && "warnAfterMS" in options) {
+      if (typeof options.warnAfterMS == "number"
+         || options.warnAfterMS == null) {
+        // Change the delay or deactivate warnAfterMS
+        warnAfterMS = options.warnAfterMS;
+      } else {
+        throw new TypeError("Wrong option value for warnAfterMS");
+      }
+    }
+
+    if (warnAfterMS && warnAfterMS > 0) {
+      // If the promise takes too long to be resolved/rejected,
+      // we need to notify the user.
+      let timer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
+      timer.initWithCallback(function() {
+        let msg = "At least one completion condition is taking too long to complete." +
+	  " Conditions: " + safeGetState(this.state) +
+	  " Barrier: " + topic;
+        warn(msg);
+      }.bind(this), warnAfterMS, Ci.nsITimer.TYPE_ONE_SHOT);
+
+      promise = promise.then(function onSuccess() {
+        timer.cancel();
+        // As a side-effect, this prevents |timer| from
+        // being garbage-collected too early.
+      });
+    }
+
+    let crashAfterMS = DELAY_CRASH_MS;
+    if (options && "crashAfterMS" in options) {
+      if (typeof options.crashAfterMS == "number"
+         || options.crashAfterMS == null) {
+        // Change the delay or deactivate crashAfterMS
+        crashAfterMS = options.crashAfterMS;
+      } else {
+        throw new TypeError("Wrong option value for crashAfterMS");
+      }
+    }
+
+    if (crashAfterMS  > 0) {
+      let timeToCrash = null;
+
+      // If after |crashAfterMS| milliseconds (adjusted to take into
+      // account sleep and otherwise busy computer) we have not finished
+      // this shutdown phase, we assume that the shutdown is somehow
+      // frozen, presumably deadlocked. At this stage, the only thing we
+      // can do to avoid leaving the user's computer in an unstable (and
+      // battery-sucking) situation is report the issue and crash.
+      timeToCrash = looseTimer(crashAfterMS);
+      timeToCrash.promise.then(
+        function onTimeout() {
+	  // Report the problem as best as we can, then crash.
+	  let state = this.state;
+
+	  let msg = "At least one completion condition failed to complete" +
+	    " within a reasonable amount of time. Causing a crash to" +
+	    " ensure that we do not leave the user with an unresponsive" +
+	    " process draining resources." +
+	    " Conditions: " + JSON.stringify(state) +
+	    " Barrier: " + topic;
+	  err(msg);
+	  if (gCrashReporter && gCrashReporter.enabled) {
+            let data = {
+              phase: topic,
+              conditions: state
+	    };
+            gCrashReporter.annotateCrashReport("AsyncShutdownTimeout",
+            JSON.stringify(data));
+	  } else {
+            warn("No crash reporter available");
+	  }
+
+	  let error = new Error();
+	  gDebug.abort(error.fileName, error.lineNumber + 1);
+        }.bind(this),
+	  function onSatisfied() {
+            // The promise has been rejected, which means that we have satisfied
+            // all completion conditions.
+          });
+
+      promise = promise.then(function() {
+        timeToCrash.reject();
+      }.bind(this)/* No error is possible here*/);
+    }
+
+    return promise;
+  },
+});
+
+
+
+// List of well-known phases
+// Ideally, phases should be registered from the component that decides
 // when they start/stop. For compatibility with existing startup/shutdown
-// mechanisms, we register a few runstates here.
+// mechanisms, we register a few phases here.
 
 this.AsyncShutdown.profileChangeTeardown = getPhase("profile-change-teardown");
 this.AsyncShutdown.profileBeforeChange = getPhase("profile-before-change");
 this.AsyncShutdown.sendTelemetry = getPhase("profile-before-change2");
 this.AsyncShutdown.webWorkersShutdown = getPhase("web-workers-shutdown");
+
+this.AsyncShutdown.Barrier = Barrier;
+
 Object.freeze(this.AsyncShutdown);
--- a/toolkit/modules/tests/xpcshell/test_AsyncShutdown.js
+++ b/toolkit/modules/tests/xpcshell/test_AsyncShutdown.js
@@ -1,16 +1,82 @@
 let Cu = Components.utils;
 
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/Promise.jsm");
 Cu.import("resource://gre/modules/AsyncShutdown.jsm");
 
 Services.prefs.setBoolPref("toolkit.asyncshutdown.testing", true);
 
+function run_test() {
+  run_next_test();
+}
+
+function get_exn(f) {
+  try {
+    f();
+    return null;
+  } catch (ex) {
+    return ex;
+  }
+}
+
+function do_check_exn(exn, constructor) {
+  do_check_neq(exn, null);
+  if (exn.name == constructor) {
+    do_check_eq(exn.constructor.name, constructor);
+    return;
+  }
+  do_print("Wrong error constructor");
+  do_print(exn.constructor.name);
+  do_print(exn.stack);
+  do_check_true(false);
+}
+
+/**
+ * Utility function used to provide the same API for both AsyncShutdown
+ * phases and AsyncShutdown barriers.
+ *
+ * @param {bool} heavy If |true|, use a AsyncShutdown phase, otherwise
+ * a AsyncShutdown barrier.
+ *
+ * @return An object with the following methods:
+ *   - addBlocker() - the same method as AsyncShutdown phases and barrier clients
+ *   - wait() - trigger the resolution of the lock
+ */
+function makeLock(heavy) {
+  if (heavy) {
+    let topic = "test-Phase-" + ++makeLock.counter;
+    let phase = AsyncShutdown._getPhase(topic);
+    return {
+      addBlocker: function(...args) {
+        return phase.addBlocker(...args);
+      },
+      removeBlocker: function(blocker) {
+        return phase.removeBlocker(blocker);
+      },
+      wait: function() {
+        Services.obs.notifyObservers(null, topic, null);
+        return Promise.resolve();
+      }
+    };
+  } else {
+    let name = "test-Barrier-" + ++makeLock.counter;
+    let barrier = new AsyncShutdown.Barrier(name);
+    return {
+      addBlocker: barrier.client.addBlocker,
+      removeBlocker: barrier.client.removeBlocker,
+      wait: function() {
+        return barrier.wait();
+      }
+    };
+  }
+}
+makeLock.counter = 0;
+
 /**
  * An asynchronous task that takes several ticks to complete.
  *
  * @param {*=} resolution The value with which the resulting promise will be
  * resolved once the task is complete. This may be a rejected promise,
  * in which case the resulting promise will itself be rejected.
  * @param {object=} outResult An object modified by side-effect during the task.
  * Initially, its field |isFinished| is set to |false|. Once the task is
@@ -27,130 +93,184 @@ function longRunningAsyncTask(resolution
   do_timeout(100, function() {
     ++outResult.countFinished;
     outResult.isFinished = true;
     deferred.resolve(resolution);
   });
   return deferred.promise;
 }
 
-/**
- * Generate a unique notification topic.
- */
-function getUniqueTopic() {
-  const PREFIX = "testing-phases-";
-  return PREFIX + ++getUniqueTopic.counter;
-}
-getUniqueTopic.counter = 0;
+
+//////// Tests on AsyncShutdown phases
 
-add_task(function test_no_condition() {
-  do_print("Testing a phase with no condition");
-  let topic = getUniqueTopic();
-  AsyncShutdown._getPhase(topic);
-  Services.obs.notifyObservers(null, topic, null);
-  do_print("Phase with no condition didn't lock");
+/*add_task*/(function* test_no_condition() {
+  for (let heavy of [false, true]) {
+    do_print("Testing a barrier with no condition (" + heavy?"heavy":"light" + ")");
+    let lock = makeLock(heavy);
+    yield lock.wait();
+    do_print("Barrier with no condition didn't lock");
+  }
 });
 
-add_task(function test_simple_async() {
+/*add_task*/(function* test_phase_simple_async() {
   do_print("Testing various combinations of a phase with a single condition");
-  for (let arg of [undefined, null, "foo", 100, new Error("BOOM")]) {
-    for (let resolution of [arg, Promise.reject(arg)]) {
-      for (let success of [false, true]) {
-        for (let state of [[null],
-                           [],
-                           [() => "some state"],
-                           [function() {
-                             throw new Error("State BOOM"); }],
-                           [function() {
-                             return {
-                               toJSON: function() {
-                                 throw new Error("State.toJSON BOOM");
-                               }
-                             };
-                           }]]) {
-          // Asynchronous phase
-          do_print("Asynchronous test with " + arg + ", " + resolution);
-          let topic = getUniqueTopic();
-          let outParam = { isFinished: false };
-          AsyncShutdown._getPhase(topic).addBlocker(
-            "Async test",
-              function() {
-                if (success) {
-                  return longRunningAsyncTask(resolution, outParam);
-                } else {
-                  throw resolution;
-                }
-              },
-              ...state
-          );
-          do_check_false(outParam.isFinished);
-          Services.obs.notifyObservers(null, topic, null);
-          do_check_eq(outParam.isFinished, success);
+  for (let heavy of [false, true]) {
+    for (let arg of [undefined, null, "foo", 100, new Error("BOOM")]) {
+      for (let resolution of [arg, Promise.reject(arg)]) {
+        for (let success of [false, true]) {
+          for (let state of [[null],
+                             [],
+                             [() => "some state"],
+                             [function() {
+                               throw new Error("State BOOM"); }],
+                             [function() {
+                               return {
+                                 toJSON: function() {
+                                   throw new Error("State.toJSON BOOM");
+                                 }
+                               };
+                             }]]) {
+            // Asynchronous phase
+            do_print("Asynchronous test with " + arg + ", " + resolution + ", " + heavy);
+            let lock = makeLock(heavy);
+            let outParam = { isFinished: false };
+            lock.addBlocker(
+              "Async test",
+               function() {
+                 if (success) {
+                   return longRunningAsyncTask(resolution, outParam);
+                 } else {
+                   throw resolution;
+                 }
+               },
+               ...state
+            );
+            do_check_false(outParam.isFinished);
+            yield lock.wait();
+            do_check_eq(outParam.isFinished, success);
+          }
         }
+
+        // Synchronous phase - just test that we don't throw/freeze
+        do_print("Synchronous test with " + arg + ", " + resolution + ", " + heavy);
+        let lock = makeLock(heavy);
+        lock.addBlocker(
+          "Sync test",
+          resolution
+        );
+        yield lock.wait();
       }
-
-      // Synchronous phase - just test that we don't throw/freeze
-      do_print("Synchronous test with " + arg + ", " + resolution);
-      let topic = getUniqueTopic();
-      AsyncShutdown._getPhase(topic).addBlocker(
-        "Sync test",
-        resolution
-      );
-      Services.obs.notifyObservers(null, topic, null);
     }
   }
 });
 
-add_task(function test_many() {
+/*add_task*/(function* test_phase_many() {
   do_print("Testing various combinations of a phase with many conditions");
-  let topic = getUniqueTopic();
-  let phase = AsyncShutdown._getPhase(topic);
-  let outParams = [];
-  for (let arg of [undefined, null, "foo", 100, new Error("BOOM")]) {
-    for (let resolution of [arg, Promise.reject(arg)]) {
-      let outParam = { isFinished: false };
-      phase.addBlocker(
-        "Test",
-        () => longRunningAsyncTask(resolution, outParam)
-      );
+  for (let heavy of [false, true]) {
+    let lock = makeLock(heavy);
+    let outParams = [];
+    for (let arg of [undefined, null, "foo", 100, new Error("BOOM")]) {
+      for (let resolve of [true, false]) {
+        do_print("Testing with " + heavy + ", " + arg + ", " + resolve);
+        let resolution = resolve ? arg : Promise.reject(arg);
+        let outParam = { isFinished: false };
+        lock.addBlocker(
+          "Test",
+          () => longRunningAsyncTask(resolution, outParam)
+        );
+      }
     }
+    do_check_true(outParams.every((x) => !x.isFinished));
+    yield lock.wait();
+    do_check_true(outParams.every((x) => x.isFinished));
   }
-  do_check_true(outParams.every((x) => !x.isFinished));
-  Services.obs.notifyObservers(null, topic, null);
-  do_check_true(outParams.every((x) => x.isFinished));
+});
+
+
+
+/*add_task*/(function* test_phase_various_failures() {
+  do_print("Ensure that we cannot add a condition for a phase once notification has been received");
+  for (let heavy of [false, true]) {
+    let lock = makeLock(heavy);
+    lock.wait(); // Don't actually wait for the promise to be resolved
+    let exn = get_exn(() => lock.addBlocker("Test", true));
+    do_check_true(!!exn);
+
+    do_print("Ensure that an incomplete blocker causes a TypeError");
+
+    lock = makeLock(heavy);
+    exn = get_exn(() => lock.addBlocker());
+    do_check_exn(exn, "TypeError");
+
+    exn = get_exn(() => lock.addBlocker(null, true));
+    do_check_exn(exn, "TypeError");
+
+    exn = get_exn(() => lock.addBlocker("Test 2", () => true, "not a function"));
+    do_check_exn(exn, "TypeError");
+  }
 });
 
-function get_exn(f) {
-  try {
-    f();
-    return null;
-  } catch (ex) {
-    return ex;
-  }
-}
+
+add_task(function* test_phase_removeBlocker() {
+  do_print("Testing that we can call removeBlocker before, during and after the call to wait()");
+
+  for (let heavy of [false, true]) {
+
+    do_print("Attempt to add then remove a blocker before wait()");
+    let lock = makeLock(heavy);
+    let blocker = () => {
+      do_print("This promise will never be resolved");
+      return Promise.defer().promise;
+    };
+
+    lock.addBlocker("Wait forever", blocker);
+    do_check_true(lock.removeBlocker(blocker));
+    do_check_false(lock.removeBlocker(blocker));
+    do_print("Attempt to remove non-registered blockers before wait()");
+    do_check_false(lock.removeBlocker("foo"));
+    do_check_false(lock.removeBlocker(null));
+    do_print("Waiting (should lift immediately)");
+    yield lock.wait();
 
-add_task(function test_various_failures() {
-  do_print("Ensure that we cannot add a condition for a phase that is already complete");
-  let topic = getUniqueTopic();
-  let phase = AsyncShutdown._getPhase(topic);
-  Services.obs.notifyObservers(null, topic, null);
-  let exn = get_exn(() => phase.addBlocker("Test", true));
-  do_check_true(!!exn);
+    do_print("Attempt to add a blocker then remove it during wait()");
+    lock = makeLock(heavy);
+    let blockers = [
+      () => {
+        do_print("This blocker will self-destruct");
+        do_check_true(lock.removeBlocker(blockers[0]));
+        return Promise.defer().promise;
+      },
+      () => {
+        do_print("This blocker will self-destruct twice");
+        do_check_true(lock.removeBlocker(blockers[1]));
+        do_check_false(lock.removeBlocker(blockers[1]));
+        return Promise.defer().promise;
+      },
+      () => {
+        do_print("Attempt to remove non-registered blockers during wait()");
+        do_check_false(lock.removeBlocker("foo"));
+        do_check_false(lock.removeBlocker(null));
+      }
+    ];
+    for (let i in blockers) {
+      lock.addBlocker("Wait forever again: " + i, blockers[i]);
+    }
+    do_print("Waiting (should lift very quickly)");
+    yield lock.wait();
+    do_check_false(lock.removeBlocker(blockers[0]));
 
-  do_print("Ensure that an incomplete blocker causes a TypeError");
 
-  exn = get_exn(() => phase.addBlocker());
-  do_check_eq(exn.name, "TypeError");
+    do_print("Attempt to remove a blocker after wait");
+    lock = makeLock(heavy);
+    blocker = Promise.resolve;
+    yield lock.wait();
 
-  exn = get_exn(() => phase.addBlocker(null, true));
-  do_check_eq(exn.name, "TypeError");
+    do_print("Attempt to remove non-registered blocker after wait()");
+    do_check_false(lock.removeBlocker("foo"));
+    do_check_false(lock.removeBlocker(null));
+  }
 
-  exn = get_exn(() => phase.addBlocker("Test 2", () => true, "not a function"));
-  do_check_eq(exn.name, "TypeError");
 });
 
 add_task(function() {
   Services.prefs.clearUserPref("toolkit.asyncshutdown.testing");
 });
 
-function run_test() {
-  run_next_test();
-}