Bug 1276390 - Use blocking polling in worker to handle subprocess IO. r=aswan
authorKris Maglione <maglione.k@gmail.com>
Thu, 28 Jul 2016 16:27:25 -0700
changeset 347238 97b1b6e04bc92e1efa31b66603fe4f060d161aca
parent 347237 5224dd4c03792d416ad664c23248ee6c75523896
child 347239 18642055568111041c5b099b66209824b330b74e
push id6389
push userraliiev@mozilla.com
push dateMon, 19 Sep 2016 13:38:22 +0000
treeherdermozilla-beta@01d67bfe6c81 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersaswan
bugs1276390
milestone50.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1276390 - Use blocking polling in worker to handle subprocess IO. r=aswan MozReview-Commit-ID: KXqgCLnO7dR
toolkit/modules/AppConstants.jsm
toolkit/modules/subprocess/subprocess_common.jsm
toolkit/modules/subprocess/subprocess_shared_win.js
toolkit/modules/subprocess/subprocess_unix.jsm
toolkit/modules/subprocess/subprocess_win.jsm
toolkit/modules/subprocess/subprocess_worker_common.js
toolkit/modules/subprocess/subprocess_worker_unix.js
toolkit/modules/subprocess/subprocess_worker_win.js
toolkit/modules/subprocess/test/xpcshell/test_subprocess.js
--- a/toolkit/modules/AppConstants.jsm
+++ b/toolkit/modules/AppConstants.jsm
@@ -206,16 +206,23 @@ this.AppConstants = Object.freeze({
 
   DEBUG:
 #ifdef DEBUG
   true,
 #else
   false,
 #endif
 
+  ASAN:
+#ifdef MOZ_ASAN
+  true,
+#else
+  false,
+#endif
+
   MOZ_B2G_RIL:
 #ifdef MOZ_B2G_RIL
   true,
 #else
   false,
 #endif
 
   MOZ_GRAPHENE:
--- a/toolkit/modules/subprocess/subprocess_common.jsm
+++ b/toolkit/modules/subprocess/subprocess_common.jsm
@@ -10,16 +10,18 @@
 /* exported BaseProcess, PromiseWorker */
 
 var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
 
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/XPCOMUtils.jsm");
 Cu.importGlobalProperties(["TextDecoder"]);
 
+XPCOMUtils.defineLazyModuleGetter(this, "AsyncShutdown",
+                                  "resource://gre/modules/AsyncShutdown.jsm");
 XPCOMUtils.defineLazyModuleGetter(this, "setTimeout",
                                   "resource://gre/modules/Timer.jsm");
 
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
 
 var EXPORTED_SYMBOLS = ["BaseProcess", "PromiseWorker", "SubprocessConstants"];
 
 const BUFFER_SIZE = 4096;
@@ -33,21 +35,35 @@ let nextResponseId = 0;
  */
 class PromiseWorker extends ChromeWorker {
   constructor(url) {
     super(url);
 
     this.listeners = new Map();
     this.pendingResponses = new Map();
 
+    this.addListener("close", this.onClose.bind(this));
     this.addListener("failure", this.onFailure.bind(this));
     this.addListener("success", this.onSuccess.bind(this));
     this.addListener("debug", this.onDebug.bind(this));
 
     this.addEventListener("message", this.onmessage);
+
+    this.shutdown = this.shutdown.bind(this);
+    AsyncShutdown.webWorkersShutdown.addBlocker(
+      "Subprocess.jsm: Shut down IO worker",
+      this.shutdown);
+  }
+
+  onClose() {
+    AsyncShutdown.webWorkersShutdown.removeBlocker(this.shutdown);
+  }
+
+  shutdown() {
+    return this.call("shutdown", []);
   }
 
   /**
    * Adds a listener for the given message from the worker. Any message received
    * from the worker with a `data.msg` property matching the given `msg`
    * parameter are passed to the given listener.
    *
    * @param {string} msg
@@ -610,25 +626,29 @@ class BaseProcess {
       return new this(worker, processId, fds, pid);
     });
   }
 
   static get WORKER_URL() {
     throw new Error("Not implemented");
   }
 
+  static get WorkerClass() {
+    return PromiseWorker;
+  }
+
   /**
    * Gets the current subprocess worker, or spawns a new one if it does not
    * currently exist.
    *
    * @returns {PromiseWorker}
    */
   static getWorker() {
     if (!this._worker) {
-      this._worker = new PromiseWorker(this.WORKER_URL);
+      this._worker = new this.WorkerClass(this.WORKER_URL);
     }
     return this._worker;
   }
 
   /**
    * Kills the process.
    *
    * @param {integer} [timeout=300]
--- a/toolkit/modules/subprocess/subprocess_shared_win.js
+++ b/toolkit/modules/subprocess/subprocess_shared_win.js
@@ -17,16 +17,17 @@ var win32 = {
   // On Windows 64, winapi_abi is an alias for default_abi.
   WINAPI: ctypes.winapi_abi,
 
   VOID: ctypes.void_t,
 
   BYTE: ctypes.uint8_t,
   WORD: ctypes.uint16_t,
   DWORD: ctypes.uint32_t,
+  LONG: ctypes.long,
 
   UINT: ctypes.unsigned_int,
   UCHAR: ctypes.unsigned_char,
 
   BOOL: ctypes.bool,
 
   HANDLE: ctypes.voidptr_t,
   PVOID: ctypes.voidptr_t,
@@ -214,16 +215,25 @@ var libc = new Library("libc", LIBC_CHOI
     win32.BOOL, /* bInheritHandle */
     win32.DWORD, /* dwCreationFlags */
     win32.LPVOID, /* opt lpEnvironment */
     win32.LPCWSTR, /* opt lpCurrentDirectory */
     win32.STARTUPINFOW.ptr, /* lpStartupInfo */
     win32.PROCESS_INFORMATION.ptr, /* out lpProcessInformation */
   ],
 
+  CreateSemaphoreW: [
+    win32.WINAPI,
+    win32.HANDLE,
+    win32.SECURITY_ATTRIBUTES.ptr, /* opt lpSemaphoreAttributes */
+    win32.LONG, /* lInitialCount */
+    win32.LONG, /* lMaximumCount */
+    win32.LPCWSTR, /* opt lpName */
+  ],
+
   DeleteProcThreadAttributeList: [
     win32.WINAPI,
     win32.VOID,
     win32.LPPROC_THREAD_ATTRIBUTE_LIST, /* in/out lpAttributeList */
   ],
 
   DuplicateHandle: [
     win32.WINAPI,
@@ -294,16 +304,24 @@ var libc = new Library("libc", LIBC_CHOI
     win32.BOOL,
     win32.HANDLE, /* hFile */
     win32.LPVOID, /* out lpBuffer */
     win32.DWORD, /* nNumberOfBytesToRead */
     win32.LPDWORD, /* opt out lpNumberOfBytesRead */
     win32.OVERLAPPED.ptr, /* opt in/out lpOverlapped */
   ],
 
+  ReleaseSemaphore: [
+    win32.WINAPI,
+    win32.BOOL,
+    win32.HANDLE, /* hSemaphore */
+    win32.LONG, /* lReleaseCount */
+    win32.LONG.ptr, /* opt out lpPreviousCount */
+  ],
+
   TerminateProcess: [
     win32.WINAPI,
     win32.BOOL,
     win32.HANDLE, /* hProcess */
     win32.UINT, /* uExitCode */
   ],
 
   UpdateProcThreadAttribute: [
--- a/toolkit/modules/subprocess/subprocess_unix.jsm
+++ b/toolkit/modules/subprocess/subprocess_unix.jsm
@@ -4,35 +4,81 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 "use strict";
 
 /* eslint-disable mozilla/balanced-listeners */
 
 /* exported SubprocessImpl */
 
-/* globals BaseProcess */
+/* globals BaseProcess, PromiseWorker */
 
 var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
 
 var EXPORTED_SYMBOLS = ["SubprocessImpl"];
 
 Cu.import("resource://gre/modules/ctypes.jsm");
 Cu.import("resource://gre/modules/osfile.jsm");
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/Task.jsm");
 Cu.import("resource://gre/modules/subprocess/subprocess_common.jsm");
 
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared_unix.js", this);
 
+class UnixPromiseWorker extends PromiseWorker {
+  constructor(...args) {
+    super(...args);
+
+    let fds = ctypes.int.array(2)();
+    let res = libc.pipe(fds);
+    if (res == -1) {
+      throw new Error("Unable to create pipe");
+    }
+
+    this.signalFd = fds[1];
+
+    libc.fcntl(fds[0], LIBC.F_SETFL, LIBC.O_NONBLOCK);
+    libc.fcntl(fds[0], LIBC.F_SETFD, LIBC.FD_CLOEXEC);
+    libc.fcntl(fds[1], LIBC.F_SETFD, LIBC.FD_CLOEXEC);
+
+    this.call("init", [{signalFd: fds[0]}]);
+  }
+
+  closePipe() {
+    if (this.signalFd) {
+      libc.close(this.signalFd);
+      this.signalFd = null;
+    }
+  }
+
+  onClose() {
+    this.closePipe();
+    super.onClose();
+  }
+
+  signalWorker() {
+    libc.write(this.signalFd, new ArrayBuffer(1), 1);
+  }
+
+  postMessage(...args) {
+    this.signalWorker();
+    return super.postMessage(...args);
+  }
+}
+
+
 class Process extends BaseProcess {
   static get WORKER_URL() {
     return "resource://gre/modules/subprocess/subprocess_worker_unix.js";
   }
+
+  static get WorkerClass() {
+    return UnixPromiseWorker;
+  }
 }
 
 var SubprocessUnix = {
   Process,
 
   call(options) {
     return Process.create(options);
   },
--- a/toolkit/modules/subprocess/subprocess_win.jsm
+++ b/toolkit/modules/subprocess/subprocess_win.jsm
@@ -4,45 +4,69 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 "use strict";
 
 /* eslint-disable mozilla/balanced-listeners */
 
 /* exported SubprocessImpl */
 
-/* globals BaseProcess */
+/* globals BaseProcess, PromiseWorker */
 
 var {classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
 
 var EXPORTED_SYMBOLS = ["SubprocessImpl"];
 
 Cu.import("resource://gre/modules/ctypes.jsm");
 Cu.import("resource://gre/modules/osfile.jsm");
 Cu.import("resource://gre/modules/Services.jsm");
 Cu.import("resource://gre/modules/Task.jsm");
 Cu.import("resource://gre/modules/subprocess/subprocess_common.jsm");
 
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared.js", this);
 Services.scriptloader.loadSubScript("resource://gre/modules/subprocess/subprocess_shared_win.js", this);
 
+class WinPromiseWorker extends PromiseWorker {
+  constructor(...args) {
+    super(...args);
+
+    this.signalEvent = libc.CreateSemaphoreW(null, 0, 32, null);
+
+    this.call("init", [{
+      signalEvent: String(ctypes.cast(this.signalEvent, ctypes.uintptr_t).value),
+    }]);
+  }
+
+  signalWorker() {
+    libc.ReleaseSemaphore(this.signalEvent, 1, null);
+  }
+
+  postMessage(...args) {
+    this.signalWorker();
+    return super.postMessage(...args);
+  }
+}
+
 class Process extends BaseProcess {
   static get WORKER_URL() {
     return "resource://gre/modules/subprocess/subprocess_worker_win.js";
   }
+
+  static get WorkerClass() {
+    return WinPromiseWorker;
+  }
 }
 
 var SubprocessWin = {
   Process,
 
   call(options) {
     return Process.create(options);
   },
 
-
   * getEnvironment() {
     let env = libc.GetEnvironmentStringsW();
     try {
       for (let p = env, q = env; ; p = p.increment()) {
         if (p.contents == "\0") {
           if (String(p) == String(q)) {
             break;
           }
--- a/toolkit/modules/subprocess/subprocess_worker_common.js
+++ b/toolkit/modules/subprocess/subprocess_worker_common.js
@@ -83,16 +83,28 @@ class BaseProcess {
     // by the GC if it runs before our process is started.
     this.stringArrays.push(cstrings);
 
     return result;
   }
 }
 
 let requests = {
+  init(details) {
+    io.init(details);
+
+    return {data: {}};
+  },
+
+  shutdown() {
+    io.shutdown();
+
+    return {data: {}};
+  },
+
   close(pipeId, force = false) {
     let pipe = io.getPipe(pipeId);
 
     return pipe.close(force).then(() => ({data: {}}));
   },
 
   spawn(options) {
     let process = new Process(options);
@@ -151,16 +163,18 @@ let requests = {
   },
 
   waitForNoProcesses() {
     return Promise.all(Array.from(io.processes.values(), proc => proc.exitPromise));
   },
 };
 
 onmessage = event => {
+  io.messageCount--;
+
   let {msg, msgId, args} = event.data;
 
   new Promise(resolve => {
     resolve(requests[msg](...args));
   }).then(result => {
     let response = {
       msg: "success",
       msgId,
@@ -190,8 +204,14 @@ onmessage = event => {
 
     self.postMessage({
       msg: "failure",
       msgId,
       error: {},
     });
   });
 };
+
+onclose = event => {
+  io.shutdown();
+
+  self.postMessage({msg: "close"});
+};
--- a/toolkit/modules/subprocess/subprocess_worker_unix.js
+++ b/toolkit/modules/subprocess/subprocess_worker_unix.js
@@ -7,18 +7,17 @@
 
 /* exported Process */
 /* globals BaseProcess, BasePipe */
 
 importScripts("resource://gre/modules/subprocess/subprocess_shared.js",
               "resource://gre/modules/subprocess/subprocess_shared_unix.js",
               "resource://gre/modules/subprocess/subprocess_worker_common.js");
 
-const POLL_INTERVAL = 50;
-const POLL_TIMEOUT = 0;
+const POLL_TIMEOUT = 5000;
 
 let io;
 
 let nextPipeId = 0;
 
 class Pipe extends BasePipe {
   constructor(process, fd) {
     super();
@@ -243,16 +242,50 @@ class OutputPipe extends Pipe {
     }
 
     if (writes.length == 0) {
       io.updatePollFds();
     }
   }
 }
 
+class Signal {
+  constructor(fd) {
+    this.fd = fd;
+  }
+
+  cleanup() {
+    libc.close(this.fd);
+    this.fd = null;
+  }
+
+  get pollEvents() {
+    return LIBC.POLLIN;
+  }
+
+  /**
+   * Called when an error occurred while polling our file descriptor.
+   */
+  onError() {
+    io.shutdown();
+  }
+
+  /**
+   * Called when one of the IO operations matching the `pollEvents` mask may be
+   * performed without blocking.
+   */
+  onReady() {
+    let buffer = new ArrayBuffer(16);
+    let count = +libc.read(this.fd, buffer, buffer.byteLength);
+    if (count > 0) {
+      io.messageCount += count;
+    }
+  }
+}
+
 class Process extends BaseProcess {
   /**
    * Each Process object opens an additional pipe from the target object, which
    * will be automatically closed when the process exits, but otherwise
    * carries no data.
    *
    * This property contains a bit mask of poll() events which we wish to be
    * notified of on this descriptor. We're not expecting any input from this
@@ -444,17 +477,37 @@ class Process extends BaseProcess {
 io = {
   pollFds: null,
   pollHandlers: null,
 
   pipes: new Map(),
 
   processes: new Map(),
 
-  interval: null,
+  messageCount: 0,
+
+  running: true,
+
+  init(details) {
+    this.signal = new Signal(details.signalFd);
+    this.updatePollFds();
+
+    setTimeout(this.loop.bind(this), 0);
+  },
+
+  shutdown() {
+    if (this.running) {
+      this.running = false;
+
+      this.signal.cleanup();
+      this.signal = null;
+
+      self.close();
+    }
+  },
 
   getPipe(pipeId) {
     let pipe = this.pipes.get(pipeId);
 
     if (!pipe) {
       let error = new Error("File closed");
       error.errorCode = SubprocessConstants.ERROR_END_OF_FILE;
       throw error;
@@ -467,47 +520,49 @@ io = {
 
     if (!process) {
       throw new Error(`Invalid process ID: ${processId}`);
     }
     return process;
   },
 
   updatePollFds() {
-    let handlers = [...this.pipes.values(),
+    let handlers = [this.signal,
+                    ...this.pipes.values(),
                     ...this.processes.values()];
 
     handlers = handlers.filter(handler => handler.pollEvents);
 
     let pollfds = unix.pollfd.array(handlers.length)();
 
     for (let [i, handler] of handlers.entries()) {
       let pollfd = pollfds[i];
 
       pollfd.fd = handler.fd;
       pollfd.events = handler.pollEvents;
       pollfd.revents = 0;
     }
 
     this.pollFds = pollfds;
     this.pollHandlers = handlers;
+  },
 
-    if (pollfds.length && !this.interval) {
-      this.interval = setInterval(this.poll.bind(this), POLL_INTERVAL);
-    } else if (!pollfds.length && this.interval) {
-      clearInterval(this.interval);
-      this.interval = null;
+  loop() {
+    this.poll();
+    if (this.running) {
+      setTimeout(this.loop.bind(this), 0);
     }
   },
 
   poll() {
     let handlers = this.pollHandlers;
     let pollfds = this.pollFds;
 
-    let count = libc.poll(pollfds, pollfds.length, POLL_TIMEOUT);
+    let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT;
+    let count = libc.poll(pollfds, pollfds.length, timeout);
 
     for (let i = 0; count && i < pollfds.length; i++) {
       let pollfd = pollfds[i];
       if (pollfd.revents) {
         count--;
 
         let handler = handlers[i];
         try {
--- a/toolkit/modules/subprocess/subprocess_worker_win.js
+++ b/toolkit/modules/subprocess/subprocess_worker_win.js
@@ -7,18 +7,17 @@
 
 /* exported Process */
 /* globals BaseProcess, BasePipe, win32 */
 
 importScripts("resource://gre/modules/subprocess/subprocess_shared.js",
               "resource://gre/modules/subprocess/subprocess_shared_win.js",
               "resource://gre/modules/subprocess/subprocess_worker_common.js");
 
-const POLL_INTERVAL = 50;
-const POLL_TIMEOUT = 0;
+const POLL_TIMEOUT = 5000;
 
 // The exit code that we send when we forcibly terminate a process.
 const TERMINATE_EXIT_CODE = 0x7f;
 
 let io;
 
 let nextPipeId = 0;
 
@@ -293,16 +292,35 @@ class OutputPipe extends Pipe {
         this.writeNext();
       } else {
         io.updatePollEvents();
       }
     }
   }
 }
 
+class Signal {
+  constructor(event) {
+    this.event = event;
+  }
+
+  cleanup() {
+    libc.CloseHandle(this.event);
+    this.event = null;
+  }
+
+  onError() {
+    io.shutdown();
+  }
+
+  onReady() {
+    io.messageCount += 1;
+  }
+}
+
 class Process extends BaseProcess {
   constructor(...args) {
     super(...args);
 
     this.killed = false;
   }
 
   /**
@@ -538,17 +556,39 @@ class Process extends BaseProcess {
 io = {
   events: null,
   eventHandlers: null,
 
   pipes: new Map(),
 
   processes: new Map(),
 
-  interval: null,
+  messageCount: 0,
+
+  running: true,
+
+  init(details) {
+    let signalEvent = ctypes.cast(ctypes.uintptr_t(details.signalEvent),
+                                  win32.HANDLE);
+    this.signal = new Signal(signalEvent);
+    this.updatePollEvents();
+
+    setTimeout(this.loop.bind(this), 0);
+  },
+
+  shutdown() {
+    if (this.running) {
+      this.running = false;
+
+      this.signal.cleanup();
+      this.signal = null;
+
+      self.close();
+    }
+  },
 
   getPipe(pipeId) {
     let pipe = this.pipes.get(pipeId);
 
     if (!pipe) {
       let error = new Error("File closed");
       error.errorCode = SubprocessConstants.ERROR_END_OF_FILE;
       throw error;
@@ -561,41 +601,44 @@ io = {
 
     if (!process) {
       throw new Error(`Invalid process ID: ${processId}`);
     }
     return process;
   },
 
   updatePollEvents() {
-    let handlers = [...this.pipes.values(),
+    let handlers = [this.signal,
+                    ...this.pipes.values(),
                     ...this.processes.values()];
 
     handlers = handlers.filter(handler => handler.event);
 
     this.eventHandlers = handlers;
 
     let handles = handlers.map(handler => handler.event);
     this.events = win32.HANDLE.array()(handles);
+  },
 
-    if (handles.length && !this.interval) {
-      this.interval = setInterval(this.poll.bind(this), POLL_INTERVAL);
-    } else if (!handlers.length && this.interval) {
-      clearInterval(this.interval);
-      this.interval = null;
+  loop() {
+    this.poll();
+    if (this.running) {
+      setTimeout(this.loop.bind(this), 0);
     }
   },
 
+
   poll() {
-    for (;;) {
+    let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT;
+    for (;; timeout = 0) {
       let events = this.events;
       let handlers = this.eventHandlers;
 
       let result = libc.WaitForMultipleObjects(events.length, events,
-                                               false, POLL_TIMEOUT);
+                                               false, timeout);
 
       if (result < handlers.length) {
         try {
           handlers[result].onReady();
         } catch (e) {
           console.error(e);
           debug(`Worker error: ${e} :: ${e.stack}`);
           handlers[result].onError();
--- a/toolkit/modules/subprocess/test/xpcshell/test_subprocess.js
+++ b/toolkit/modules/subprocess/test/xpcshell/test_subprocess.js
@@ -1,16 +1,19 @@
 "use strict";
 
+Cu.import("resource://gre/modules/AppConstants.jsm");
 Cu.import("resource://gre/modules/Task.jsm");
 Cu.import("resource://gre/modules/Timer.jsm");
 
 
 const env = Cc["@mozilla.org/process/environment;1"].getService(Ci.nsIEnvironment);
 
+const MAX_ROUND_TRIP_TIME_MS = AppConstants.DEBUG || AppConstants.ASAN ? 18 : 9;
+
 let PYTHON;
 let PYTHON_BIN;
 let PYTHON_DIR;
 
 const TEST_SCRIPT = do_get_file("data_test_script.py").path;
 
 let read = pipe => {
   return pipe.readUint32().then(count => {
@@ -173,16 +176,52 @@ add_task(function* test_subprocess_huge(
 
 
   let {exitCode} = yield proc.wait();
 
   equal(exitCode, 0, "Got expected exit code");
 });
 
 
+add_task(function* test_subprocess_round_trip_perf() {
+  let proc = yield Subprocess.call({
+    command: PYTHON,
+    arguments: ["-u", TEST_SCRIPT, "echo"],
+  });
+
+
+  const LINE = "I'm a leaf on the wind.\n";
+
+  let now = Date.now();
+  const COUNT = 1000;
+  for (let i = 0; i < COUNT; i++) {
+    let [output] = yield Promise.all([
+      read(proc.stdout),
+      proc.stdin.write(LINE),
+    ]);
+
+    // We don't want to log this for every iteration, but we still need
+    // to fail if it goes wrong.
+    if (output !== LINE) {
+      equal(output, LINE, "Got expected output");
+    }
+  }
+
+  let roundTripTime = (Date.now() - now) / COUNT;
+  ok(roundTripTime <= MAX_ROUND_TRIP_TIME_MS,
+     `Expected round trip time (${roundTripTime}ms) to be less than ${MAX_ROUND_TRIP_TIME_MS}ms`);
+
+  yield proc.stdin.close();
+
+  let {exitCode} = yield proc.wait();
+
+  equal(exitCode, 0, "Got expected exit code");
+});
+
+
 add_task(function* test_subprocess_stderr_default() {
   const LINE1 = "I'm a leaf on the wind.\n";
   const LINE2 = "Watch how I soar.\n";
 
   let proc = yield Subprocess.call({
     command: PYTHON,
     arguments: ["-u", TEST_SCRIPT, "print", LINE1, LINE2],
   });
@@ -320,19 +359,20 @@ add_task(function* test_subprocess_lazy_
   proc.stdin.close();
 
   let len = yield readPromise;
   equal(len, LINE.length);
 
   yield closedPromise;
 
 
-  let {exitCode} = yield proc.wait();
-
-  equal(exitCode, 0, "Got expected exit code");
+  // Don't test for a successful exit here. The process may exit with a
+  // write error if we close the pipe after it's written the message
+  // size but before it's written the message.
+  yield proc.wait();
 });
 
 
 add_task(function* test_subprocess_force_close() {
   let proc = yield Subprocess.call({
     command: PYTHON,
     arguments: ["-u", TEST_SCRIPT, "echo"],
   });