Bug 1276390 - Use blocking polling in worker to handle subprocess IO. r=aswan
☠☠ backed out by 6e95ea056bf5 ☠ ☠
authorKris Maglione <maglione.k@gmail.com>
Wed, 27 Jul 2016 15:14:57 -0700
changeset 347209 48cff1c9b619bb224dd763332836522fd60c160f
parent 347208 b498c2f4fdd8ba44f0d064e29d60a3e61f5acfdd
child 347210 a5fc40f291f50a6e76d1fe25320d88b851f014cf
push id6389
push userraliiev@mozilla.com
push dateMon, 19 Sep 2016 13:38:22 +0000
treeherdermozilla-beta@01d67bfe6c81 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersaswan
bugs1276390
milestone50.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1276390 - Use blocking polling in worker to handle subprocess IO. r=aswan MozReview-Commit-ID: KXqgCLnO7dR
toolkit/modules/subprocess/subprocess_common.jsm
toolkit/modules/subprocess/subprocess_shared_win.js
toolkit/modules/subprocess/subprocess_unix.jsm
toolkit/modules/subprocess/subprocess_win.jsm
toolkit/modules/subprocess/subprocess_worker_common.js
toolkit/modules/subprocess/subprocess_worker_unix.js
toolkit/modules/subprocess/subprocess_worker_win.js
toolkit/modules/subprocess/test/xpcshell/test_subprocess.js
--- a/toolkit/modules/subprocess/subprocess_common.jsm
+++ b/toolkit/modules/subprocess/subprocess_common.jsm
@@ -10,16 +10,18 @@
 /* exported BaseProcess, PromiseWorker */
 
 var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
 
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/XPCOMUtils.jsm");
 Cu.importGlobalProperties(["TextDecoder"]);
 
+XPCOMUtils.defineLazyModuleGetter(this, "AsyncShutdown",
+                                  "resource://gre/modules/AsyncShutdown.jsm");
 XPCOMUtils.defineLazyModuleGetter(this, "setTimeout",
                                   "resource://gre/modules/Timer.jsm");
 
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
 
 var EXPORTED_SYMBOLS = ["BaseProcess", "PromiseWorker", "SubprocessConstants"];
 
 const BUFFER_SIZE = 4096;
@@ -33,21 +35,35 @@ let nextResponseId = 0;
  */
 class PromiseWorker extends ChromeWorker {
   constructor(url) {
     super(url);
 
     this.listeners = new Map();
     this.pendingResponses = new Map();
 
+    this.addListener("close", this.onClose.bind(this));
     this.addListener("failure", this.onFailure.bind(this));
     this.addListener("success", this.onSuccess.bind(this));
     this.addListener("debug", this.onDebug.bind(this));
 
     this.addEventListener("message", this.onmessage);
+
+    this.shutdown = this.shutdown.bind(this);
+    AsyncShutdown.webWorkersShutdown.addBlocker(
+      "Subprocess.jsm: Shut down IO worker",
+      this.shutdown);
+  }
+
+  onClose() {
+    AsyncShutdown.webWorkersShutdown.removeBlocker(this.shutdown);
+  }
+
+  shutdown() {
+    return this.call("shutdown", []);
   }
 
   /**
    * Adds a listener for the given message from the worker. Any message received
    * from the worker with a `data.msg` property matching the given `msg`
    * parameter are passed to the given listener.
    *
    * @param {string} msg
@@ -610,25 +626,29 @@ class BaseProcess {
       return new this(worker, processId, fds, pid);
     });
   }
 
   static get WORKER_URL() {
     throw new Error("Not implemented");
   }
 
+  static get WorkerClass() {
+    return PromiseWorker;
+  }
+
   /**
    * Gets the current subprocess worker, or spawns a new one if it does not
    * currently exist.
    *
    * @returns {PromiseWorker}
    */
   static getWorker() {
     if (!this._worker) {
-      this._worker = new PromiseWorker(this.WORKER_URL);
+      this._worker = new this.WorkerClass(this.WORKER_URL);
     }
     return this._worker;
   }
 
   /**
    * Kills the process.
    *
    * @param {integer} [timeout=300]
--- a/toolkit/modules/subprocess/subprocess_shared_win.js
+++ b/toolkit/modules/subprocess/subprocess_shared_win.js
@@ -17,16 +17,17 @@ var win32 = {
   // On Windows 64, winapi_abi is an alias for default_abi.
   WINAPI: ctypes.winapi_abi,
 
   VOID: ctypes.void_t,
 
   BYTE: ctypes.uint8_t,
   WORD: ctypes.uint16_t,
   DWORD: ctypes.uint32_t,
+  LONG: ctypes.long,
 
   UINT: ctypes.unsigned_int,
   UCHAR: ctypes.unsigned_char,
 
   BOOL: ctypes.bool,
 
   HANDLE: ctypes.voidptr_t,
   PVOID: ctypes.voidptr_t,
@@ -214,16 +215,25 @@ var libc = new Library("libc", LIBC_CHOI
     win32.BOOL, /* bInheritHandle */
     win32.DWORD, /* dwCreationFlags */
     win32.LPVOID, /* opt lpEnvironment */
     win32.LPCWSTR, /* opt lpCurrentDirectory */
     win32.STARTUPINFOW.ptr, /* lpStartupInfo */
     win32.PROCESS_INFORMATION.ptr, /* out lpProcessInformation */
   ],
 
+  CreateSemaphoreW: [
+    win32.WINAPI,
+    win32.HANDLE,
+    win32.SECURITY_ATTRIBUTES.ptr, /* opt lpSemaphoreAttributes */
+    win32.LONG, /* lInitialCount */
+    win32.LONG, /* lMaximumCount */
+    win32.LPCWSTR, /* opt lpName */
+  ],
+
   DeleteProcThreadAttributeList: [
     win32.WINAPI,
     win32.VOID,
     win32.LPPROC_THREAD_ATTRIBUTE_LIST, /* in/out lpAttributeList */
   ],
 
   DuplicateHandle: [
     win32.WINAPI,
@@ -294,16 +304,24 @@ var libc = new Library("libc", LIBC_CHOI
     win32.BOOL,
     win32.HANDLE, /* hFile */
     win32.LPVOID, /* out lpBuffer */
     win32.DWORD, /* nNumberOfBytesToRead */
     win32.LPDWORD, /* opt out lpNumberOfBytesRead */
     win32.OVERLAPPED.ptr, /* opt in/out lpOverlapped */
   ],
 
+  ReleaseSemaphore: [
+    win32.WINAPI,
+    win32.BOOL,
+    win32.HANDLE, /* hSemaphore */
+    win32.LONG, /* lReleaseCount */
+    win32.LONG.ptr, /* opt out lpPreviousCount */
+  ],
+
   TerminateProcess: [
     win32.WINAPI,
     win32.BOOL,
     win32.HANDLE, /* hProcess */
     win32.UINT, /* uExitCode */
   ],
 
   UpdateProcThreadAttribute: [
--- a/toolkit/modules/subprocess/subprocess_unix.jsm
+++ b/toolkit/modules/subprocess/subprocess_unix.jsm
@@ -4,35 +4,81 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 "use strict";
 
 /* eslint-disable mozilla/balanced-listeners */
 
 /* exported SubprocessImpl */
 
-/* globals BaseProcess */
+/* globals BaseProcess, PromiseWorker */
 
 var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
 
 var EXPORTED_SYMBOLS = ["SubprocessImpl"];
 
 Cu.import("resource://gre/modules/ctypes.jsm");
 Cu.import("resource://gre/modules/osfile.jsm");
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/Task.jsm");
 Cu.import("resource://gre/modules/subprocess/subprocess_common.jsm");
 
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared_unix.js", this);
 
+class UnixPromiseWorker extends PromiseWorker {
+  constructor(...args) {
+    super(...args);
+
+    let fds = ctypes.int.array(2)();
+    let res = libc.pipe(fds);
+    if (res == -1) {
+      throw new Error("Unable to create pipe");
+    }
+
+    this.signalFd = fds[1];
+
+    libc.fcntl(fds[0], LIBC.F_SETFL, LIBC.O_NONBLOCK);
+    libc.fcntl(fds[0], LIBC.F_SETFD, LIBC.FD_CLOEXEC);
+    libc.fcntl(fds[1], LIBC.F_SETFD, LIBC.FD_CLOEXEC);
+
+    this.call("init", [{signalFd: fds[0]}]);
+  }
+
+  closePipe() {
+    if (this.signalFd) {
+      libc.close(this.signalFd);
+      this.signalFd = null;
+    }
+  }
+
+  onClose() {
+    this.closePipe();
+    super.onClose();
+  }
+
+  signalWorker() {
+    libc.write(this.signalFd, new ArrayBuffer(1), 1);
+  }
+
+  postMessage(...args) {
+    this.signalWorker();
+    return super.postMessage(...args);
+  }
+}
+
+
 class Process extends BaseProcess {
   static get WORKER_URL() {
     return "resource://gre/modules/subprocess/subprocess_worker_unix.js";
   }
+
+  static get WorkerClass() {
+    return UnixPromiseWorker;
+  }
 }
 
 var SubprocessUnix = {
   Process,
 
   call(options) {
     return Process.create(options);
   },
--- a/toolkit/modules/subprocess/subprocess_win.jsm
+++ b/toolkit/modules/subprocess/subprocess_win.jsm
@@ -4,45 +4,69 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 "use strict";
 
 /* eslint-disable mozilla/balanced-listeners */
 
 /* exported SubprocessImpl */
 
-/* globals BaseProcess */
+/* globals BaseProcess, PromiseWorker */
 
 var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
 
 var EXPORTED_SYMBOLS = ["SubprocessImpl"];
 
 Cu.import("resource://gre/modules/ctypes.jsm");
 Cu.import("resource://gre/modules/osfile.jsm");
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/Task.jsm");
 Cu.import("resource://gre/modules/subprocess/subprocess_common.jsm");
 
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared_win.js", this);
 
+class WinPromiseWorker extends PromiseWorker {
+  constructor(...args) {
+    super(...args);
+
+    this.signalEvent = libc.CreateSemaphoreW(null, 0, 32, null);
+
+    this.call("init", [{
+      signalEvent: String(ctypes.cast(this.signalEvent, ctypes.uintptr_t).value),
+    }]);
+  }
+
+  signalWorker() {
+    libc.ReleaseSemaphore(this.signalEvent, 1, null);
+  }
+
+  postMessage(...args) {
+    this.signalWorker();
+    return super.postMessage(...args);
+  }
+}
+
 class Process extends BaseProcess {
   static get WORKER_URL() {
     return "resource://gre/modules/subprocess/subprocess_worker_win.js";
   }
+
+  static get WorkerClass() {
+    return WinPromiseWorker;
+  }
 }
 
 var SubprocessWin = {
   Process,
 
   call(options) {
     return Process.create(options);
   },
 
-
   * getEnvironment() {
     let env = libc.GetEnvironmentStringsW();
     try {
       for (let p = env, q = env; ; p = p.increment()) {
         if (p.contents == "\0") {
           if (String(p) == String(q)) {
             break;
           }
--- a/toolkit/modules/subprocess/subprocess_worker_common.js
+++ b/toolkit/modules/subprocess/subprocess_worker_common.js
@@ -83,16 +83,28 @@ class BaseProcess {
     // by the GC if it runs before our process is started.
     this.stringArrays.push(cstrings);
 
     return result;
   }
 }
 
 let requests = {
+  init(details) {
+    io.init(details);
+
+    return {data: {}};
+  },
+
+  shutdown() {
+    io.shutdown();
+
+    return {data: {}};
+  },
+
   close(pipeId, force = false) {
     let pipe = io.getPipe(pipeId);
 
     return pipe.close(force).then(() => ({data: {}}));
   },
 
   spawn(options) {
     let process = new Process(options);
@@ -151,16 +163,18 @@ let requests = {
   },
 
   waitForNoProcesses() {
     return Promise.all(Array.from(io.processes.values(), proc => proc.exitPromise));
   },
 };
 
 onmessage = event => {
+  io.messageCount--;
+
   let {msg, msgId, args} = event.data;
 
   new Promise(resolve => {
     resolve(requests[msg](...args));
   }).then(result => {
     let response = {
       msg: "success",
       msgId,
@@ -190,8 +204,14 @@ onmessage = event => {
 
     self.postMessage({
       msg: "failure",
       msgId,
       error: {},
     });
   });
 };
+
+onclose = event => {
+  io.shutdown();
+
+  self.postMessage({msg: "close"});
+};
--- a/toolkit/modules/subprocess/subprocess_worker_unix.js
+++ b/toolkit/modules/subprocess/subprocess_worker_unix.js
@@ -7,18 +7,17 @@
 
 /* exported Process */
 /* globals BaseProcess, BasePipe */
 
 importScripts("resource://gre/modules/subprocess/subprocess_shared.js",
               "resource://gre/modules/subprocess/subprocess_shared_unix.js",
               "resource://gre/modules/subprocess/subprocess_worker_common.js");
 
-const POLL_INTERVAL = 50;
-const POLL_TIMEOUT = 0;
+const POLL_TIMEOUT = 5000;
 
 let io;
 
 let nextPipeId = 0;
 
 class Pipe extends BasePipe {
   constructor(process, fd) {
     super();
@@ -243,16 +242,50 @@ class OutputPipe extends Pipe {
     }
 
     if (writes.length == 0) {
       io.updatePollFds();
     }
   }
 }
 
+class Signal {
+  constructor(fd) {
+    this.fd = fd;
+  }
+
+  cleanup() {
+    libc.close(this.fd);
+    this.fd = null;
+  }
+
+  get pollEvents() {
+    return LIBC.POLLIN;
+  }
+
+  /**
+   * Called when an error occurred while polling our file descriptor.
+   */
+  onError() {
+    io.shutdown();
+  }
+
+  /**
+   * Called when one of the IO operations matching the `pollEvents` mask may be
+   * performed without blocking.
+   */
+  onReady() {
+    let buffer = new ArrayBuffer(16);
+    let count = +libc.read(this.fd, buffer, buffer.byteLength);
+    if (count > 0) {
+      io.messageCount += count;
+    }
+  }
+}
+
 class Process extends BaseProcess {
   /**
    * Each Process object opens an additional pipe from the target object, which
    * will be automatically closed when the process exits, but otherwise
    * carries no data.
    *
    * This property contains a bit mask of poll() events which we wish to be
    * notified of on this descriptor. We're not expecting any input from this
@@ -444,17 +477,37 @@ class Process extends BaseProcess {
 io = {
   pollFds: null,
   pollHandlers: null,
 
   pipes: new Map(),
 
   processes: new Map(),
 
-  interval: null,
+  messageCount: 0,
+
+  running: true,
+
+  init(details) {
+    this.signal = new Signal(details.signalFd);
+    this.updatePollFds();
+
+    setTimeout(this.loop.bind(this), 0);
+  },
+
+  shutdown() {
+    if (this.running) {
+      this.running = false;
+
+      this.signal.cleanup();
+      this.signal = null;
+
+      self.close();
+    }
+  },
 
   getPipe(pipeId) {
     let pipe = this.pipes.get(pipeId);
 
     if (!pipe) {
       let error = new Error("File closed");
       error.errorCode = SubprocessConstants.ERROR_END_OF_FILE;
       throw error;
@@ -467,47 +520,49 @@ io = {
 
     if (!process) {
       throw new Error(`Invalid process ID: ${processId}`);
     }
     return process;
   },
 
   updatePollFds() {
-    let handlers = [...this.pipes.values(),
+    let handlers = [this.signal,
+                    ...this.pipes.values(),
                     ...this.processes.values()];
 
     handlers = handlers.filter(handler => handler.pollEvents);
 
     let pollfds = unix.pollfd.array(handlers.length)();
 
     for (let [i, handler] of handlers.entries()) {
       let pollfd = pollfds[i];
 
       pollfd.fd = handler.fd;
       pollfd.events = handler.pollEvents;
       pollfd.revents = 0;
     }
 
     this.pollFds = pollfds;
     this.pollHandlers = handlers;
+  },
 
-    if (pollfds.length && !this.interval) {
-      this.interval = setInterval(this.poll.bind(this), POLL_INTERVAL);
-    } else if (!pollfds.length && this.interval) {
-      clearInterval(this.interval);
-      this.interval = null;
+  loop() {
+    this.poll();
+    if (this.running) {
+      setTimeout(this.loop.bind(this), 0);
     }
   },
 
   poll() {
     let handlers = this.pollHandlers;
     let pollfds = this.pollFds;
 
-    let count = libc.poll(pollfds, pollfds.length, POLL_TIMEOUT);
+    let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT;
+    let count = libc.poll(pollfds, pollfds.length, timeout);
 
     for (let i = 0; count && i < pollfds.length; i++) {
       let pollfd = pollfds[i];
       if (pollfd.revents) {
         count--;
 
         let handler = handlers[i];
         try {
--- a/toolkit/modules/subprocess/subprocess_worker_win.js
+++ b/toolkit/modules/subprocess/subprocess_worker_win.js
@@ -7,18 +7,17 @@
 
 /* exported Process */
 /* globals BaseProcess, BasePipe, win32 */
 
 importScripts("resource://gre/modules/subprocess/subprocess_shared.js",
               "resource://gre/modules/subprocess/subprocess_shared_win.js",
               "resource://gre/modules/subprocess/subprocess_worker_common.js");
 
-const POLL_INTERVAL = 50;
-const POLL_TIMEOUT = 0;
+const POLL_TIMEOUT = 5000;
 
 // The exit code that we send when we forcibly terminate a process.
 const TERMINATE_EXIT_CODE = 0x7f;
 
 let io;
 
 let nextPipeId = 0;
 
@@ -293,16 +292,35 @@ class OutputPipe extends Pipe {
         this.writeNext();
       } else {
         io.updatePollEvents();
       }
     }
   }
 }
 
+class Signal {
+  constructor(event) {
+    this.event = event;
+  }
+
+  cleanup() {
+    libc.CloseHandle(this.event);
+    this.event = null;
+  }
+
+  onError() {
+    io.shutdown();
+  }
+
+  onReady() {
+    io.messageCount += 1;
+  }
+}
+
 class Process extends BaseProcess {
   constructor(...args) {
     super(...args);
 
     this.killed = false;
   }
 
   /**
@@ -538,17 +556,39 @@ class Process extends BaseProcess {
 io = {
   events: null,
   eventHandlers: null,
 
   pipes: new Map(),
 
   processes: new Map(),
 
-  interval: null,
+  messageCount: 0,
+
+  running: true,
+
+  init(details) {
+    let signalEvent = ctypes.cast(ctypes.uintptr_t(details.signalEvent),
+                                  win32.HANDLE);
+    this.signal = new Signal(signalEvent);
+    this.updatePollEvents();
+
+    setTimeout(this.loop.bind(this), 0);
+  },
+
+  shutdown() {
+    if (this.running) {
+      this.running = false;
+
+      this.signal.cleanup();
+      this.signal = null;
+
+      self.close();
+    }
+  },
 
   getPipe(pipeId) {
     let pipe = this.pipes.get(pipeId);
 
     if (!pipe) {
       let error = new Error("File closed");
       error.errorCode = SubprocessConstants.ERROR_END_OF_FILE;
       throw error;
@@ -561,41 +601,44 @@ io = {
 
     if (!process) {
       throw new Error(`Invalid process ID: ${processId}`);
     }
     return process;
   },
 
   updatePollEvents() {
-    let handlers = [...this.pipes.values(),
+    let handlers = [this.signal,
+                    ...this.pipes.values(),
                     ...this.processes.values()];
 
     handlers = handlers.filter(handler => handler.event);
 
     this.eventHandlers = handlers;
 
     let handles = handlers.map(handler => handler.event);
     this.events = win32.HANDLE.array()(handles);
+  },
 
-    if (handles.length && !this.interval) {
-      this.interval = setInterval(this.poll.bind(this), POLL_INTERVAL);
-    } else if (!handlers.length && this.interval) {
-      clearInterval(this.interval);
-      this.interval = null;
+  loop() {
+    this.poll();
+    if (this.running) {
+      setTimeout(this.loop.bind(this), 0);
     }
   },
 
+
   poll() {
-    for (;;) {
+    let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT;
+    for (;; timeout = 0) {
       let events = this.events;
       let handlers = this.eventHandlers;
 
       let result = libc.WaitForMultipleObjects(events.length, events,
-                                               false, POLL_TIMEOUT);
+                                               false, timeout);
 
       if (result < handlers.length) {
         try {
           handlers[result].onReady();
         } catch (e) {
           console.error(e);
           debug(`Worker error: ${e} :: ${e.stack}`);
           handlers[result].onError();
--- a/toolkit/modules/subprocess/test/xpcshell/test_subprocess.js
+++ b/toolkit/modules/subprocess/test/xpcshell/test_subprocess.js
@@ -1,16 +1,19 @@
 "use strict";
 
+Cu.import("resource://gre/modules/AppConstants.jsm");
 Cu.import("resource://gre/modules/Task.jsm");
 Cu.import("resource://gre/modules/Timer.jsm");
 
 
 const env = Cc["@mozilla.org/process/environment;1"].getService(Ci.nsIEnvironment);
 
+const MAX_ROUND_TRIP_TIME_MS = AppConstants.DEBUG ? 18 : 9;
+
 let PYTHON;
 let PYTHON_BIN;
 let PYTHON_DIR;
 
 const TEST_SCRIPT = do_get_file("data_test_script.py").path;
 
 let read = pipe => {
   return pipe.readUint32().then(count => {
@@ -173,16 +176,52 @@ add_task(function* test_subprocess_huge(
 
 
   let {exitCode} = yield proc.wait();
 
   equal(exitCode, 0, "Got expected exit code");
 });
 
 
+add_task(function* test_subprocess_round_trip_perf() {
+  let proc = yield Subprocess.call({
+    command: PYTHON,
+    arguments: ["-u", TEST_SCRIPT, "echo"],
+  });
+
+
+  const LINE = "I'm a leaf on the wind.\n";
+
+  let now = Date.now();
+  const COUNT = 1000;
+  for (let i = 0; i < COUNT; i++) {
+    let [output] = yield Promise.all([
+      read(proc.stdout),
+      proc.stdin.write(LINE),
+    ]);
+
+    // We don't want to log this for every iteration, but we still need
+    // to fail if it goes wrong.
+    if (output !== LINE) {
+      equal(output, LINE, "Got expected output");
+    }
+  }
+
+  let roundTripTime = (Date.now() - now) / COUNT;
+  ok(roundTripTime <= MAX_ROUND_TRIP_TIME_MS,
+     `Expected round trip time (${roundTripTime}ms) to be less than ${MAX_ROUND_TRIP_TIME_MS}ms`);
+
+  yield proc.stdin.close();
+
+  let {exitCode} = yield proc.wait();
+
+  equal(exitCode, 0, "Got expected exit code");
+});
+
+
 add_task(function* test_subprocess_stderr_default() {
   const LINE1 = "I'm a leaf on the wind.\n";
   const LINE2 = "Watch how I soar.\n";
 
   let proc = yield Subprocess.call({
     command: PYTHON,
     arguments: ["-u", TEST_SCRIPT, "print", LINE1, LINE2],
   });