Bug 815339 - Extract PromiseWorker from OS.File. r=froydnj
authorDavid Rajchenbach-Teller <dteller@mozilla.com>
Wed, 19 Dec 2012 20:11:14 -0500
changeset 116572 52b5ac5877670aab08ef027055e8c4ff115b6e5b
parent 116571 3dcf9f0a42d8738c1150dbeda142d934cd677a68
child 116573 5f697a87ec466d96daffc0b515147e5adf91ccfa
push id20006
push userryanvm@gmail.com
push dateThu, 20 Dec 2012 01:26:52 +0000
treeherdermozilla-inbound@5f697a87ec46 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersfroydnj
bugs815339
milestone20.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 815339 - Extract PromiseWorker from OS.File. r=froydnj
toolkit/components/osfile/Makefile.in
toolkit/components/osfile/_PromiseWorker.jsm
toolkit/components/osfile/osfile_async_front.jsm
toolkit/components/osfile/osfile_async_worker.js
--- a/toolkit/components/osfile/Makefile.in
+++ b/toolkit/components/osfile/Makefile.in
@@ -33,8 +33,9 @@ libs::
 	$(NSINSTALL) $(srcdir)/osfile_unix_front.jsm $(FINAL_TARGET)/modules/osfile
 	$(NSINSTALL) $(srcdir)/osfile_win_allthreads.jsm $(FINAL_TARGET)/modules/osfile
 	$(NSINSTALL) $(srcdir)/ospath_win_back.jsm $(FINAL_TARGET)/modules/osfile
 	$(NSINSTALL) $(srcdir)/osfile_win_back.jsm $(FINAL_TARGET)/modules/osfile
 	$(NSINSTALL) $(srcdir)/osfile_win_front.jsm $(FINAL_TARGET)/modules/osfile
 	$(NSINSTALL) $(srcdir)/osfile_async_front.jsm $(FINAL_TARGET)/modules/osfile
 	$(NSINSTALL) $(srcdir)/osfile_async_worker.js $(FINAL_TARGET)/modules/osfile
 	$(NSINSTALL) $(srcdir)/osfile_shared_front.jsm $(FINAL_TARGET)/modules/osfile
+	$(NSINSTALL) $(srcdir)/_PromiseWorker.jsm $(FINAL_TARGET)/modules/osfile
new file mode 100644
--- /dev/null
+++ b/toolkit/components/osfile/_PromiseWorker.jsm
@@ -0,0 +1,182 @@
+/**
+ * Thin wrapper around a ChromeWorker that wraps postMessage/onmessage/onerror
+ * as promises.
+ *
+ * Not for public use yet.
+ */
+
+"use strict";
+
+this.EXPORTED_SYMBOLS = ["PromiseWorker"];
+
+// The library of promises.
+Components.utils.import("resource://gre/modules/commonjs/promise/core.js", this);
+
+/**
+ * An implementation of queues (FIFO).
+ *
+ * The current implementation uses one array, runs in O(n ^ 2), and is optimized
+ * for the case in which queues are generally short.
+ */
+let Queue = function Queue() {
+  this._array = [];
+};
+Queue.prototype = {
+  pop: function pop() {
+    return this._array.shift();
+  },
+  push: function push(x) {
+    return this._array.push(x);
+  },
+  isEmpty: function isEmpty() {
+    return this._array.length == 0;
+  }
+};
+
+/**
+ * An object responsible for dispatching messages to
+ * a chrome worker and routing the responses.
+ *
+ * @param {string} url The url containing the source code for this worker,
+ * as in constructor ChromeWorker.
+ * @param {Function=} log Optionally, a logging function.
+ *
+ * @constructor
+ */
+function PromiseWorker(url, log) {
+  if (typeof url != "string") {
+    throw new TypeError("Expecting a string");
+  }
+  if (log != null && typeof log != "function") {
+    throw new TypeError("Expecting either null or a function");
+  }
+  this._log = log;
+  this._url = url;
+
+  /**
+   * The queue of deferred, waiting for the completion of their
+   * respective job by the worker.
+   *
+   * Each item in the list may contain an additional field |closure|,
+   * used to store strong references to value that must not be
+   * garbage-collected before the reply has been received (e.g.
+   * arrays).
+   *
+   * @type {Queue<{deferred:deferred, closure:*=}>}
+   */
+  this._queue = new Queue();
+
+  /**
+   * The number of the current message.
+   *
+   * Used for debugging purposes.
+   */
+  this._id = 0;
+}
+PromiseWorker.prototype = {
+  /**
+   * Instantiate the worker lazily.
+   */
+  get _worker() {
+    delete this._worker;
+    let worker = new ChromeWorker(this._url);
+    let self = this;
+    Object.defineProperty(this, "_worker", {value:
+      worker
+    });
+
+    /**
+     * Receive errors that are not instances of OS.File.Error, propagate
+     * them to the listeners.
+     *
+     * The worker knows how to serialize errors that are instances
+     * of |OS.File.Error|. These are treated by |worker.onmessage|.
+     * However, for other errors, we rely on DOM's mechanism for
+     * serializing errors, which transmits these errors through
+     * |worker.onerror|.
+     *
+     * @param {Error} error Some JS error.
+     */
+    worker.onerror = function onerror(error) {
+      if (self._log) {
+        self._log("Received uncaught error from worker", JSON.stringify(error.message), error.message);
+      }
+      error.preventDefault();
+      let {deferred} = self._queue.pop();
+      deferred.reject(error);
+    };
+
+    /**
+     * Receive messages from the worker, propagate them to the listeners.
+     *
+     * Messages must have one of the following shapes:
+     * - {ok: some_value} in case of success
+     * - {fail: some_error} in case of error, where
+     *    some_error is an instance of |PromiseWorker.WorkerError|
+     *
+     * Messages may also contain a field |id| to help
+     * with debugging.
+     *
+     * @param {*} msg The message received from the worker.
+     */
+    worker.onmessage = function onmessage(msg) {
+      if (self._log) {
+        self._log("Received message from worker", JSON.stringify(msg.data));
+      }
+      let handler = self._queue.pop();
+      let deferred = handler.deferred;
+      let data = msg.data;
+      if (data.id != handler.id) {
+        throw new Error("Internal error: expecting msg " + handler.id + ", " +
+                        " got " + data.id + ": " + JSON.stringify(msg.data));
+      }
+      if ("ok" in data) {
+        deferred.resolve(data.ok);
+      } else if ("StopIteration" in data) {
+        // We have received a StopIteration error
+        deferred.reject(StopIteration);
+      } if ("fail" in data) {
+        // We have received an error that was serialized by the
+        // worker.
+        deferred.reject(new PromiseWorker.WorkerError(data.fail));
+      }
+    };
+    return worker;
+  },
+
+  /**
+   * Post a message to a worker.
+   *
+   * @param {string} fun The name of the function to call.
+   * @param {Array} array The contents of the message.
+   * @param {*=} closure An object holding references that should not be
+   * garbage-collected before the message treatment is complete.
+   *
+   * @return {promise}
+   */
+  post: function post(fun, array, closure) {
+    let deferred = Promise.defer();
+    let id = ++this._id;
+    let message = {fun: fun, args: array, id: id};
+    if (this._log) {
+      this._log("Posting message", JSON.stringify(message));
+    }
+    this._queue.push({deferred:deferred, closure: closure, id: id});
+    this._worker.postMessage(message);
+    if (this._log) {
+      this._log("Message posted");
+    }
+    return deferred.promise;
+  }
+};
+
+/**
+ * An error that has been serialized by the worker.
+ *
+ * @constructor
+ */
+PromiseWorker.WorkerError = function WorkerError(data) {
+  this.data = data;
+};
+
+this.PromiseWorker = PromiseWorker;
--- a/toolkit/components/osfile/osfile_async_front.jsm
+++ b/toolkit/components/osfile/osfile_async_front.jsm
@@ -44,16 +44,19 @@ if (OS.Constants.Win) {
 } else {
   throw new Error("I am neither under Windows nor under a Posix system");
 }
 let Type = OS.Shared.Type;
 
 // The library of promises.
 Components.utils.import("resource://gre/modules/commonjs/promise/core.js", this);
 
+// The implementation of communications
+Components.utils.import("resource://gre/modules/osfile/_PromiseWorker.jsm", this);
+
 // If profileDir is not available, osfile.jsm has been imported before the
 // profile is setup. In this case, we need to observe "profile-do-change"
 // and set OS.Constants.Path.profileDir as soon as it becomes available.
 if (!("profileDir" in OS.Constants.Path) || !("localProfileDir" in OS.Constants.Path)) {
   Components.utils.import("resource://gre/modules/Services.jsm", this);
   let observer = function observer() {
     Services.obs.removeObserver(observer, "profile-do-change");
 
@@ -62,214 +65,53 @@ if (!("profileDir" in OS.Constants.Path)
 
     let localProfileDir = Services.dirsvc.get("ProfLD", Components.interfaces.nsIFile).path;
     OS.Constants.Path.localProfileDir = localProfileDir;
   };
   Services.obs.addObserver(observer, "profile-do-change", false);
 }
 
 /**
- * Return a shallow clone of the enumerable properties of an object
+ * Return a shallow clone of the enumerable properties of an object.
+ *
+ * We use this whenever normalizing options requires making (shallow)
+ * changes to an option object. The copy ensures that we do not modify
+ * a client-provided object by accident.
  */
 let clone = function clone(object) {
   let result = {};
   for (let k in object) {
     result[k] = object[k];
   }
   return result;
 };
 
 /**
  * A shared constant used to normalize a set of options to nothing.
  */
 const noOptions = {};
 
-/**
- * An implementation of queues (FIFO).
- *
- * The current implementation uses two arrays and runs in O(n * log(n)).
- * It is optimized for the case in which many items are enqueued sequentially.
- */
-let Queue = function Queue() {
-  // The array to which the following |push| operations will add elements.
-  // If |null|, |this._pushing| will receive a new array.
-  // @type {Array|null}
-  this._pushing = null;
 
-  // The array from which the following |pop| operations will remove elements.
-  // If |null|, |this._popping| will receive |this._pushing|
-  // @type {Array|null}
-  this._popping = null;
-
-  // The number of items in |this._popping| that have been popped already
-  this._popindex = 0;
-};
-Queue.prototype = {
-  /**
-   * Push a new element
-   */
-  push: function push(x) {
-    if (!this._pushing) {
-      this._pushing = [];
-    }
-    this._pushing.push({ value: x });
-  },
-  /**
-   * Pop an element.
-   *
-   * If the queue is empty, raise |Error|.
-   */
-  pop: function pop() {
-    if (!this._popping) {
-      if (!this._pushing) {
-        throw new Error("Queue is empty");
-      }
-      this._popping = this._pushing;
-      this._pushing = null;
-      this._popindex = 0;
-    }
-    let result = this._popping[this._popindex];
-    delete this._popping[this._popindex];
-    ++this._popindex;
-    if (this._popindex >= this._popping.length) {
-      this._popping = null;
-    }
-    return result.value;
-  }
-};
-
-
-/**
- * An object responsible for dispatching messages to
- * a worker and routing the responses.
- *
- * In this implementation, the Scheduler uses only
- * one worker.
- */
+let worker = new PromiseWorker(
+  "resource://gre/modules/osfile/osfile_async_worker.js",
+  DEBUG?LOG:null);
 let Scheduler = {
-  /**
-   * Instantiate the worker lazily.
-   */
-  get _worker() {
-    delete this._worker;
-    let worker = new ChromeWorker("osfile_async_worker.js");
-    let self = this;
-    Object.defineProperty(this, "_worker", {value:
-      worker
-    });
-
-    /**
-     * Receive errors that are not instances of OS.File.Error, propagate
-     * them to the listeners.
-     *
-     * The worker knows how to serialize errors that are instances
-     * of |OS.File.Error|. These are treated by |worker.onmessage|.
-     * However, for other errors, we rely on DOM's mechanism for
-     * serializing errors, which transmits these errors through
-     * |worker.onerror|.
-     *
-     * @param {Error} error Some JS error.
-     */
-    worker.onerror = function onerror(error) {
-      if (DEBUG) {
-        LOG("Received uncaught error from worker", JSON.stringify(error.message), error.message);
+  post: function post(...args) {
+    let promise = worker.post.apply(worker, args);
+    return promise.then(
+      null,
+      function onError(error) {
+        // Decode any serialized error
+        if (error instanceof PromiseWorker.WorkerError) {
+          throw OS.File.Error.fromMsg(error.data);
+        } else {
+          throw error;
+        }
       }
-      error.preventDefault();
-      let {deferred} = self._queue.pop();
-      deferred.reject(error);
-    };
-
-    /**
-     * Receive messages from the worker, propagate them to the listeners.
-     *
-     * Messages must have one of the following shapes:
-     * - {ok: some_value} in case of success
-     * - {fail: some_error} in case of error, where
-     *    some_error can be deserialized by
-     *    |OS.File.Error.fromMsg|
-     *
-     * Messages may also contain a field |id| to help
-     * with debugging.
-     *
-     * @param {*} msg The message received from the worker.
-     */
-    worker.onmessage = function onmessage(msg) {
-      if (DEBUG) {
-        LOG("Received message from worker", JSON.stringify(msg.data));
-      }
-      let handler = self._queue.pop();
-      let deferred = handler.deferred;
-      let data = msg.data;
-      if (data.id != handler.id) {
-        throw new Error("Internal error: expecting msg " + handler.id + ", " +
-                        " got " + data.id + ": " + JSON.stringify(msg.data));
-      }
-      if ("ok" in data) {
-        deferred.resolve(data.ok);
-      } else if ("fail" in data) {
-        let error;
-        try {
-          error = OS.File.Error.fromMsg(data.fail);
-        } catch (x) {
-          LOG("Cannot decode OS.File.Error", data.fail, data.id);
-          deferred.reject(x);
-          return;
-        }
-        deferred.reject(error);
-      } else {
-        throw new Error("Message does not respect protocol: " +
-          data.toSource());
-      }
-    };
-    return worker;
-  },
-
-  /**
-   * The queue of deferred, waiting for the completion of their
-   * respective job by the worker.
-   *
-   * Each item in the list may contain an additional field |closure|,
-   * used to store strong references to value that must not be
-   * garbage-collected before the reply has been received (e.g.
-   * arrays).
-   *
-   * @type {Queue<{deferred:deferred, closure:*=}>}
-   */
-  _queue: new Queue(),
-
-  /**
-   * The number of the current message.
-   *
-   * Used for debugging purposes.
-   */
-  _id: 0,
-
-  /**
-   * Post a message to a worker.
-   *
-   * @param {string} fun The name of the function to call.
-   * @param array The contents of the message.
-   * @param closure An object holding references that should not be
-   * garbage-collected before the message treatment is complete.
-   *
-   * @return {promise}
-   */
-  post: function post(fun, array, closure) {
-    let deferred = Promise.defer();
-    let id = ++this._id;
-    let message = {fun: fun, args: array, id: id};
-    if (DEBUG) {
-      LOG("Posting message", JSON.stringify(message));
-    }
-    this._queue.push({deferred:deferred, closure: closure, id: id});
-    this._worker.postMessage(message);
-    if (DEBUG) {
-      LOG("Message posted");
-    }
-    return deferred.promise;
+    );
   }
 };
 
 /**
  * Representation of a file, with asynchronous methods.
  *
  * @param {*} fdmsg The _message_ representing the platform-specific file
  * handle.
--- a/toolkit/components/osfile/osfile_async_worker.js
+++ b/toolkit/components/osfile/osfile_async_worker.js
@@ -58,16 +58,22 @@ if (this.Components) {
        // We post this message from outside the |try ... catch| block
        // to avoid capturing errors that take place during |postMessage| and
        // built-in serialization.
        if (!exn) {
          if (DEBUG) {
            LOG("Sending positive reply", JSON.stringify(result), "id is", id);
          }
          self.postMessage({ok: result, id:id});
+       } else if (exn == StopIteration) {
+         // StopIteration cannot be serialized automatically
+         if (DEBUG) {
+           LOG("Sending back StopIteration");
+         }
+         self.postMessage({StopIteration: true, id: id});
        } else if (exn instanceof exports.OS.File.Error) {
          if (DEBUG) {
            LOG("Sending back OS.File error", exn, "id is", id);
          }
          // Instances of OS.File.Error know how to serialize themselves
          // (deserialization ensures that we end up with OS-specific
          // instances of |OS.File.Error|)
          self.postMessage({fail: exports.OS.File.Error.toMsg(exn), id:id});