Bug 797639 - Part 1: Bulk data support in the transport layer. r=jimb
authorJ. Ryan Stinnett <jryans@gmail.com>
Wed, 14 May 2014 14:30:02 -0500
changeset 183190 e97f6c0bbdd614b13f0007e19a86fe08454bbd9b
parent 183189 463401983c4c95ebe92c63b70de035ae50632d32
child 183191 f6290ce0b388c268e74fdfc4c802e43ebfb83681
push idunknown
push userunknown
push dateunknown
reviewersjimb
bugs797639
milestone32.0a1
Bug 797639 - Part 1: Bulk data support in the transport layer. r=jimb
toolkit/devtools/DevToolsUtils.js
toolkit/devtools/client/dbg-client.jsm
toolkit/devtools/moz.build
toolkit/devtools/server/main.js
toolkit/devtools/server/tests/unit/test_dbgsocket.js
toolkit/devtools/server/tests/unit/test_dbgsocket_connection_drop.js
toolkit/devtools/server/tests/unit/xpcshell.ini
toolkit/devtools/server/transport.js
toolkit/devtools/transport/moz.build
toolkit/devtools/transport/packets.js
toolkit/devtools/transport/stream-utils.js
toolkit/devtools/transport/tests/moz.build
toolkit/devtools/transport/tests/unit/head_dbg.js
toolkit/devtools/transport/tests/unit/test_dbgsocket.js
toolkit/devtools/transport/tests/unit/test_dbgsocket_connection_drop.js
toolkit/devtools/transport/tests/unit/test_delimited_read.js
toolkit/devtools/transport/tests/unit/test_packet.js
toolkit/devtools/transport/tests/unit/test_transport_bulk.js
toolkit/devtools/transport/tests/unit/testactors.js
toolkit/devtools/transport/tests/unit/xpcshell.ini
toolkit/devtools/transport/transport.js
--- a/toolkit/devtools/DevToolsUtils.js
+++ b/toolkit/devtools/DevToolsUtils.js
@@ -308,16 +308,29 @@ exports.dumpn = function dumpn(str) {
     dump("DBG-SERVER: " + str + "\n");
   }
 }
 
 // We want wantLogging to be writable. The exports object is frozen by the
 // loader, so define it on dumpn instead.
 exports.dumpn.wantLogging = false;
 
+/**
+ * A verbose logger for low-level tracing.
+ */
+exports.dumpv = function(msg) {
+  if (exports.dumpv.wantVerbose) {
+    exports.dumpn(msg);
+  }
+};
+
+// We want wantLogging to be writable. The exports object is frozen by the
+// loader, so define it on dumpn instead.
+exports.dumpv.wantVerbose = false;
+
 exports.dbg_assert = function dbg_assert(cond, e) {
   if (!cond) {
     return e;
   }
 }
 
 
 /**
--- a/toolkit/devtools/client/dbg-client.jsm
+++ b/toolkit/devtools/client/dbg-client.jsm
@@ -4,22 +4,24 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 "use strict";
 var Ci = Components.interfaces;
 var Cc = Components.classes;
 var Cu = Components.utils;
 var Cr = Components.results;
+var CC = Components.Constructor;
 // On B2G scope object misbehaves and we have to bind globals to `this`
 // in order to ensure theses variable to be visible in transport.js
 this.Ci = Ci;
 this.Cc = Cc;
 this.Cu = Cu;
 this.Cr = Cr;
+this.CC = CC;
 
 this.EXPORTED_SYMBOLS = ["DebuggerTransport",
                          "DebuggerClient",
                          "RootClient",
                          "debuggerSocketConnect",
                          "LongStringClient",
                          "EnvironmentClient",
                          "ObjectClient"];
@@ -48,28 +50,38 @@ Object.defineProperty(this, "WebConsoleC
   },
   configurable: true,
   enumerable: true
 });
 
 Components.utils.import("resource://gre/modules/devtools/DevToolsUtils.jsm");
 this.makeInfallible = DevToolsUtils.makeInfallible;
 
-let wantLogging = Services.prefs.getBoolPref("devtools.debugger.log");
+let LOG_PREF = "devtools.debugger.log";
+let VERBOSE_PREF = "devtools.debugger.log.verbose";
+let wantLogging = Services.prefs.getBoolPref(LOG_PREF);
+let wantVerbose =
+  Services.prefs.getPrefType(VERBOSE_PREF) !== Services.prefs.PREF_INVALID &&
+  Services.prefs.getBoolPref(VERBOSE_PREF);
 
-function dumpn(str)
-{
+function dumpn(str) {
   if (wantLogging) {
     dump("DBG-CLIENT: " + str + "\n");
   }
 }
 
+function dumpv(msg) {
+  if (wantVerbose) {
+    dumpn(msg);
+  }
+}
+
 let loader = Cc["@mozilla.org/moz/jssubscript-loader;1"]
   .getService(Ci.mozIJSSubScriptLoader);
-loader.loadSubScript("resource://gre/modules/devtools/server/transport.js", this);
+loader.loadSubScript("resource://gre/modules/devtools/transport/transport.js", this);
 
 /**
  * Add simple event notification to a prototype object. Any object that has
  * some use for event notifications or the observer pattern in general can be
  * augmented with the necessary facilities by passing its prototype to this
  * function.
  *
  * @param aProto object
--- a/toolkit/devtools/moz.build
+++ b/toolkit/devtools/moz.build
@@ -9,13 +9,14 @@ PARALLEL_DIRS += [
     'client',
     'gcli',
     'sourcemap',
     'webconsole',
     'apps',
     'styleinspector',
     'acorn',
     'pretty-fast',
-    'qrcode'
+    'qrcode',
+    'transport'
 ]
 
 MOCHITEST_CHROME_MANIFESTS += ['tests/mochitest/chrome.ini']
 XPCSHELL_TESTS_MANIFESTS += ['tests/unit/xpcshell.ini']
--- a/toolkit/devtools/server/main.js
+++ b/toolkit/devtools/server/main.js
@@ -8,19 +8,20 @@
 /**
  * Toolkit glue for the remote debugging protocol, loaded into the
  * debugging global.
  */
 let { Ci, Cc, CC, Cu, Cr } = require("chrome");
 let Debugger = require("Debugger");
 let Services = require("Services");
 let { ActorPool } = require("devtools/server/actors/common");
-let { DebuggerTransport, LocalDebuggerTransport, ChildDebuggerTransport } = require("devtools/server/transport");
+let { DebuggerTransport, LocalDebuggerTransport, ChildDebuggerTransport } =
+  require("devtools/toolkit/transport/transport");
 let DevToolsUtils = require("devtools/toolkit/DevToolsUtils");
-let { dumpn, dbg_assert } = DevToolsUtils;
+let { dumpn, dumpv, dbg_assert } = DevToolsUtils;
 let Services = require("Services");
 let EventEmitter = require("devtools/toolkit/event-emitter");
 
 // Until all Debugger server code is converted to SDK modules,
 // imports Components.* alias from chrome module.
 var { Ci, Cc, CC, Cu, Cr } = require("chrome");
 // On B2G, `this` != Global scope, so `Ci` won't be binded on `this`
 // (i.e. this.Ci is undefined) Then later, when using loadSubScript,
@@ -30,29 +31,36 @@ this.Cc = Cc;
 this.CC = CC;
 this.Cu = Cu;
 this.Cr = Cr;
 this.Debugger = Debugger;
 this.Services = Services;
 this.ActorPool = ActorPool;
 this.DevToolsUtils = DevToolsUtils;
 this.dumpn = dumpn;
+this.dumpv = dumpv;
 this.dbg_assert = dbg_assert;
 
 // Overload `Components` to prevent SDK loader exception on Components
 // object usage
 Object.defineProperty(this, "Components", {
   get: function () require("chrome").components
 });
 
 const DBG_STRINGS_URI = "chrome://global/locale/devtools/debugger.properties";
 
 const nsFile = CC("@mozilla.org/file/local;1", "nsIFile", "initWithPath");
 Cu.import("resource://gre/modules/XPCOMUtils.jsm");
-dumpn.wantLogging = Services.prefs.getBoolPref("devtools.debugger.log");
+
+const LOG_PREF = "devtools.debugger.log";
+const VERBOSE_PREF = "devtools.debugger.log.verbose";
+dumpn.wantLogging = Services.prefs.getBoolPref(LOG_PREF);
+dumpv.wantVerbose =
+  Services.prefs.getPrefType(VERBOSE_PREF) !== Services.prefs.PREF_INVALID &&
+  Services.prefs.getBoolPref(VERBOSE_PREF);
 
 Cu.import("resource://gre/modules/devtools/deprecated-sync-thenables.js");
 
 function loadSubScript(aURL)
 {
   try {
     let loader = Cc["@mozilla.org/moz/jssubscript-loader;1"]
       .getService(Ci.mozIJSSubScriptLoader);
--- a/toolkit/devtools/server/tests/unit/xpcshell.ini
+++ b/toolkit/devtools/server/tests/unit/xpcshell.ini
@@ -16,20 +16,16 @@ support-files =
   tracerlocations.js
 
 [test_nesting-01.js]
 [test_nesting-02.js]
 [test_nesting-03.js]
 [test_forwardingprefix.js]
 [test_getyoungestframe.js]
 [test_nsjsinspector.js]
-[test_dbgsocket.js]
-skip-if = toolkit == "gonk"
-reason = bug 821285
-[test_dbgsocket_connection_drop.js]
 [test_dbgactor.js]
 [test_dbgglobal.js]
 [test_dbgclient_debuggerstatement.js]
 [test_attach.js]
 [test_reattach-thread.js]
 [test_blackboxing-01.js]
 [test_blackboxing-02.js]
 [test_blackboxing-03.js]
copy from toolkit/devtools/moz.build
copy to toolkit/devtools/transport/moz.build
--- a/toolkit/devtools/moz.build
+++ b/toolkit/devtools/transport/moz.build
@@ -1,21 +1,15 @@
 # -*- Mode: python; c-basic-offset: 4; indent-tabs-mode: nil; tab-width: 40 -*-
 # vim: set filetype=python:
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
-PARALLEL_DIRS += [
-    'server',
-    'client',
-    'gcli',
-    'sourcemap',
-    'webconsole',
-    'apps',
-    'styleinspector',
-    'acorn',
-    'pretty-fast',
-    'qrcode'
+TEST_DIRS += ['tests']
+
+JS_MODULES_PATH = 'modules/devtools/transport'
+
+EXTRA_JS_MODULES += [
+    'packets.js',
+    'stream-utils.js',
+    'transport.js'
 ]
-
-MOCHITEST_CHROME_MANIFESTS += ['tests/mochitest/chrome.ini']
-XPCSHELL_TESTS_MANIFESTS += ['tests/unit/xpcshell.ini']
new file mode 100644
--- /dev/null
+++ b/toolkit/devtools/transport/packets.js
@@ -0,0 +1,404 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+/**
+ * Packets contain read / write functionality for the different packet types
+ * supported by the debugging protocol, so that a transport can focus on
+ * delivery and queue management without worrying too much about the specific
+ * packet types.
+ *
+ * They are intended to be "one use only", so a new packet should be
+ * instantiated for each incoming or outgoing packet.
+ *
+ * A complete Packet type should expose at least the following:
+ *   * read(stream, scriptableStream)
+ *     Called when the input stream has data to read
+ *   * write(stream)
+ *     Called when the output stream is ready to write
+ *   * get done()
+ *     Returns true once the packet is done being read / written
+ *   * destroy()
+ *     Called to clean up at the end of use
+ */
+
+const { Cc, Ci, Cu } = require("chrome");
+const { Promise: promise } =
+  Cu.import("resource://gre/modules/Promise.jsm", {});
+const Heritage = require("sdk/core/heritage");
+const DevToolsUtils = require("devtools/toolkit/DevToolsUtils");
+const { dumpn, dumpv } = DevToolsUtils;
+const StreamUtils = require("devtools/toolkit/transport/stream-utils");
+
+const unicodeConverter = Cc["@mozilla.org/intl/scriptableunicodeconverter"]
+                         .createInstance(Ci.nsIScriptableUnicodeConverter);
+unicodeConverter.charset = "UTF-8";
+
+// The transport's previous check ensured the header length did not exceed 20
+// characters.  Here, we opt for the somewhat smaller, but still large limit of
+// 1 TiB.
+const PACKET_LENGTH_MAX = Math.pow(2, 40);
+
+/**
+ * A generic Packet processing object (extended by two subtypes below).
+ */
+function Packet(transport) {
+  this._transport = transport;
+  this._length = 0;
+}
+
+/**
+ * Attempt to initialize a new Packet based on the incoming packet header we've
+ * received so far.  We try each of the types in succession, trying JSON packets
+ * first since they are much more common.
+ * @param header string
+ *        The packet header string to attempt parsing.
+ * @param transport DebuggerTransport
+ *        The transport instance that will own the packet.
+ * @return Packet
+ *         The parsed packet of the matching type, or null if no types matched.
+ */
+Packet.fromHeader = function(header, transport) {
+  return JSONPacket.fromHeader(header, transport) ||
+         BulkPacket.fromHeader(header, transport);
+};
+
+Packet.prototype = {
+
+  get length() {
+    return this._length;
+  },
+
+  set length(length) {
+    if (length > PACKET_LENGTH_MAX) {
+      throw Error("Packet length " + length + " exceeds the max length of " +
+                  PACKET_LENGTH_MAX);
+    }
+    this._length = length;
+  },
+
+  destroy: function() {
+    this._transport = null;
+  }
+
+};
+
+exports.Packet = Packet;
+
+/**
+ * With a JSON packet (the typical packet type sent via the transport), data is
+ * transferred as a JSON packet serialized into a string, with the string length
+ * prepended to the packet, followed by a colon ([length]:[packet]). The
+ * contents of the JSON packet are specified in the Remote Debugging Protocol
+ * specification.
+ * @param transport DebuggerTransport
+ *        The transport instance that will own the packet.
+ */
+function JSONPacket(transport) {
+  Packet.call(this, transport);
+  this._data = "";
+  this._done = false;
+}
+
+/**
+ * Attempt to initialize a new JSONPacket based on the incoming packet header
+ * we've received so far.
+ * @param header string
+ *        The packet header string to attempt parsing.
+ * @param transport DebuggerTransport
+ *        The transport instance that will own the packet.
+ * @return JSONPacket
+ *         The parsed packet, or null if it's not a match.
+ */
+JSONPacket.fromHeader = function(header, transport) {
+  let match = this.HEADER_PATTERN.exec(header);
+
+  if (!match) {
+    return null;
+  }
+
+  dumpv("Header matches JSON packet");
+  let packet = new JSONPacket(transport);
+  packet.length = +match[1];
+  return packet;
+};
+
+JSONPacket.HEADER_PATTERN = /^(\d+):$/;
+
+JSONPacket.prototype = Heritage.extend(Packet.prototype, {
+
+  /**
+   * Gets the object (not the serialized string) being read or written.
+   */
+  get object() { return this._object; },
+
+  /**
+   * Sets the object to be sent when write() is called.
+   */
+  set object(object) {
+    this._object = object;
+    let data = JSON.stringify(object);
+    this._data = unicodeConverter.ConvertFromUnicode(data);
+    this.length = this._data.length;
+  },
+
+  read: function(stream, scriptableStream) {
+    dumpv("Reading JSON packet");
+
+    // Read in more packet data.
+    this._readData(stream, scriptableStream);
+
+    if (!this.done) {
+      // Don't have a complete packet yet.
+      return;
+    }
+
+    let json = this._data;
+    try {
+      json = unicodeConverter.ConvertToUnicode(json);
+      this._object = JSON.parse(json);
+    } catch(e) {
+      let msg = "Error parsing incoming packet: " + json + " (" + e +
+                " - " + e.stack + ")";
+      if (Cu.reportError) {
+        Cu.reportError(msg);
+      }
+      dumpn(msg);
+      return;
+    }
+
+    this._transport._onJSONObjectReady(this._object);
+  },
+
+  _readData: function(stream, scriptableStream) {
+    if (dumpv.wantVerbose) {
+      dumpv("Reading JSON data: _l: " + this.length + " dL: " +
+            this._data.length + " sA: " + stream.available());
+    }
+    let bytesToRead = Math.min(this.length - this._data.length,
+                               stream.available());
+    this._data += scriptableStream.readBytes(bytesToRead);
+    this._done = this._data.length === this.length;
+  },
+
+  write: function(stream) {
+    dumpv("Writing JSON packet");
+
+    if (this._outgoing === undefined) {
+      // Format the serialized packet to a buffer
+      this._outgoing = this.length + ":" + this._data;
+    }
+
+    let written = stream.write(this._outgoing, this._outgoing.length);
+    this._outgoing = this._outgoing.slice(written);
+    this._done = !this._outgoing.length;
+  },
+
+  get done() { return this._done; },
+
+  toString: function() {
+    return JSON.stringify(this._object, null, 2);
+  }
+
+});
+
+exports.JSONPacket = JSONPacket;
+
+/**
+ * With a bulk packet, data is transferred by temporarily handing over the
+ * transport's input or output stream to the application layer for writing data
+ * directly.  This can be much faster for large data sets, and avoids various
+ * stages of copies and data duplication inherent in the JSON packet type.  The
+ * bulk packet looks like:
+ *
+ * bulk [actor] [type] [length]:[data]
+ *
+ * The interpretation of the data portion depends on the kind of actor and the
+ * packet's type.  See the Remote Debugging Protocol Stream Transport spec for
+ * more details.
+ * @param transport DebuggerTransport
+ *        The transport instance that will own the packet.
+ */
+function BulkPacket(transport) {
+  Packet.call(this, transport);
+  this._done = false;
+  this._readyForWriting = promise.defer();
+}
+
+/**
+ * Attempt to initialize a new BulkPacket based on the incoming packet header
+ * we've received so far.
+ * @param header string
+ *        The packet header string to attempt parsing.
+ * @param transport DebuggerTransport
+ *        The transport instance that will own the packet.
+ * @return BulkPacket
+ *         The parsed packet, or null if it's not a match.
+ */
+BulkPacket.fromHeader = function(header, transport) {
+  let match = this.HEADER_PATTERN.exec(header);
+
+  if (!match) {
+    return null;
+  }
+
+  dumpv("Header matches bulk packet");
+  let packet = new BulkPacket(transport);
+  packet.header = {
+    actor: match[1],
+    type: match[2],
+    length: +match[3]
+  };
+  return packet;
+};
+
+BulkPacket.HEADER_PATTERN = /^bulk ([^: ]+) ([^: ]+) (\d+):$/;
+
+BulkPacket.prototype = Heritage.extend(Packet.prototype, {
+
+  read: function(stream) {
+    dumpv("Reading bulk packet, handing off input stream");
+
+    // Temporarily pause monitoring of the input stream
+    this._transport.pauseIncoming();
+
+    let deferred = promise.defer();
+
+    this._transport._onBulkReadReady({
+      actor: this.actor,
+      type: this.type,
+      length: this.length,
+      copyTo: (output) => {
+        dumpv("CT length: " + this.length);
+        deferred.resolve(StreamUtils.copyStream(stream, output, this.length));
+        return deferred.promise;
+      },
+      stream: stream,
+      done: deferred
+    });
+
+    // Await the result of reading from the stream
+    deferred.promise.then(() => {
+      dumpv("onReadDone called, ending bulk mode");
+      this._done = true;
+      this._transport.resumeIncoming();
+    }, this._transport.close);
+
+    // Ensure this is only done once
+    this.read = () => {
+      throw new Error("Tried to read() a BulkPacket's stream multiple times.");
+    };
+  },
+
+  write: function(stream) {
+    dumpv("Writing bulk packet");
+
+    if (this._outgoingHeader === undefined) {
+      dumpv("Serializing bulk packet header");
+      // Format the serialized packet header to a buffer
+      this._outgoingHeader = "bulk " + this.actor + " " + this.type + " " +
+                             this.length + ":";
+    }
+
+    // Write the header, or whatever's left of it to write.
+    if (this._outgoingHeader.length) {
+      dumpv("Writing bulk packet header");
+      let written = stream.write(this._outgoingHeader,
+                                 this._outgoingHeader.length);
+      this._outgoingHeader = this._outgoingHeader.slice(written);
+      return;
+    }
+
+    dumpv("Handing off output stream");
+
+    // Temporarily pause the monitoring of the output stream
+    this._transport.pauseOutgoing();
+
+    let deferred = promise.defer();
+
+    this._readyForWriting.resolve({
+      copyFrom: (input) => {
+        dumpv("CF length: " + this.length);
+        deferred.resolve(StreamUtils.copyStream(input, stream, this.length));
+        return deferred.promise;
+      },
+      stream: stream,
+      done: deferred
+    });
+
+    // Await the result of writing to the stream
+    deferred.promise.then(() => {
+      dumpv("onWriteDone called, ending bulk mode");
+      this._done = true;
+      this._transport.resumeOutgoing();
+    }, this._transport.close);
+
+    // Ensure this is only done once
+    this.write = () => {
+      throw new Error("Tried to write() a BulkPacket's stream multiple times.");
+    };
+  },
+
+  get streamReadyForWriting() {
+    return this._readyForWriting.promise;
+  },
+
+  get header() {
+    return {
+      actor: this.actor,
+      type: this.type,
+      length: this.length
+    };
+  },
+
+  set header(header) {
+    this.actor = header.actor;
+    this.type = header.type;
+    this.length = header.length;
+  },
+
+  get done() { return this._done; },
+
+  toString: function() {
+    return "Bulk: " + JSON.stringify(this.header, null, 2);
+  }
+
+});
+
+exports.BulkPacket = BulkPacket;
+
+/**
+ * RawPacket is used to test the transport's error handling of malformed
+ * packets, by writing data directly onto the stream.
+ * @param transport DebuggerTransport
+ *        The transport instance that will own the packet.
+ * @param data string
+ *        The raw string to send out onto the stream.
+ */
+function RawPacket(transport, data) {
+  Packet.call(this, transport);
+  this._data = data;
+  this.length = data.length;
+  this._done = false;
+}
+
+RawPacket.prototype = Heritage.extend(Packet.prototype, {
+
+  read: function(stream) {
+    // This hasn't yet been needed for testing.
+    throw Error("Not implmented.");
+  },
+
+  write: function(stream) {
+    let written = stream.write(this._data, this._data.length);
+    this._data = this._data.slice(written);
+    this._done = !this._data.length;
+  },
+
+  get done() { return this._done; }
+
+});
+
+exports.RawPacket = RawPacket;
new file mode 100644
--- /dev/null
+++ b/toolkit/devtools/transport/stream-utils.js
@@ -0,0 +1,217 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+const { Ci, Cc, Cu, Cr, CC } = require("chrome");
+const { Promise: promise } =
+  Cu.import("resource://gre/modules/Promise.jsm", {});
+const Services = require("Services");
+const DevToolsUtils = require("devtools/toolkit/DevToolsUtils");
+const { dumpv } = DevToolsUtils;
+const IOUtil = Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil);
+
+const ScriptableInputStream = CC("@mozilla.org/scriptableinputstream;1",
+                                 "nsIScriptableInputStream", "init");
+
+const BUFFER_SIZE = 0x8000;
+
+/**
+ * This helper function (and its companion object) are used by bulk senders and
+ * receivers to read and write data in and out of other streams.  Functions that
+ * make use of this tool are passed to callers when it is time to read or write
+ * bulk data.  It is highly recommended to use these copier functions instead of
+ * the stream directly because the copier enforces the agreed upon length.
+ * Since bulk mode reuses an existing stream, the sender and receiver must write
+ * and read exactly the agreed upon amount of data, or else the entire transport
+ * will be left in a invalid state.  Additionally, other methods of stream
+ * copying (such as NetUtil.asyncCopy) close the streams involved, which would
+ * terminate the debugging transport, and so it is avoided here.
+ *
+ * Overall, this *works*, but clearly the optimal solution would be able to just
+ * use the streams directly.  If it were possible to fully implement
+ * nsIInputStream / nsIOutputStream in JS, wrapper streams could be created to
+ * enforce the length and avoid closing, and consumers could use familiar stream
+ * utilities like NetUtil.asyncCopy.
+ *
+ * The function takes two async streams and copies a precise number of bytes
+ * from one to the other.  Copying begins immediately, but may complete at some
+ * future time depending on data size.  Use the returned promise to know when
+ * it's complete.
+ *
+ * @param input nsIAsyncInputStream
+ *        The stream to copy from.
+ * @param output nsIAsyncOutputStream
+ *        The stream to copy to.
+ * @param length Integer
+ *        The amount of data that needs to be copied.
+ * @return Promise
+ *         The promise is resolved when copying completes or rejected if any
+ *         (unexpected) errors occur.
+ */
+function copyStream(input, output, length) {
+  let copier = new StreamCopier(input, output, length);
+  return copier.copy();
+}
+
+function StreamCopier(input, output, length) {
+  this._id = StreamCopier._nextId++;
+  this.input = input;
+  // Save off the base output stream, since we know it's async as we've required
+  this.baseAsyncOutput = output;
+  if (IOUtil.outputStreamIsBuffered(output)) {
+    this.output = output;
+  } else {
+    this.output = Cc["@mozilla.org/network/buffered-output-stream;1"].
+                  createInstance(Ci.nsIBufferedOutputStream);
+    this.output.init(output, BUFFER_SIZE);
+  }
+  this._amountLeft = length;
+  this._deferred = promise.defer();
+
+  this._copy = this._copy.bind(this);
+  this._flush = this._flush.bind(this);
+  this._destroy = this._destroy.bind(this);
+  this._deferred.promise.then(this._destroy, this._destroy);
+
+  // Stream ready callback starts as |_copy|, but may switch to |_flush| at end
+  // if flushing would block the output stream.
+  this._streamReadyCallback = this._copy;
+}
+StreamCopier._nextId = 0;
+
+StreamCopier.prototype = {
+
+  get copied() { return this._deferred.promise; },
+
+  copy: function() {
+    try {
+      this._copy();
+    } catch(e) {
+      this._deferred.reject(e);
+    }
+    return this.copied;
+  },
+
+  _copy: function() {
+    let bytesAvailable = this.input.available();
+    let amountToCopy = Math.min(bytesAvailable, this._amountLeft);
+    this._debug("Trying to copy: " + amountToCopy);
+
+    let bytesCopied;
+    try {
+      bytesCopied = this.output.writeFrom(this.input, amountToCopy);
+    } catch(e if e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
+      this._debug("Base stream would block, will retry");
+      this._debug("Waiting for output stream");
+      this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
+      return;
+    }
+
+    this._amountLeft -= bytesCopied;
+    this._debug("Copied: " + bytesCopied +
+                ", Left: " + this._amountLeft);
+
+    if (this._amountLeft === 0) {
+      this._debug("Copy done!");
+      this._flush();
+      return;
+    }
+
+    this._debug("Waiting for input stream");
+    this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
+  },
+
+  _flush: function() {
+    try {
+      this.output.flush();
+    } catch(e if e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
+                 e.result == Cr.NS_ERROR_FAILURE) {
+      this._debug("Flush would block, will retry");
+      this._streamReadyCallback = this._flush;
+      this._debug("Waiting for output stream");
+      this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
+      return;
+    }
+    this._deferred.resolve();
+  },
+
+  _destroy: function() {
+    this._destroy = null;
+    this._copy = null;
+    this._flush = null;
+    this.input = null;
+    this.output = null;
+  },
+
+  // nsIInputStreamCallback
+  onInputStreamReady: function() {
+    this._streamReadyCallback();
+  },
+
+  // nsIOutputStreamCallback
+  onOutputStreamReady: function() {
+    this._streamReadyCallback();
+  },
+
+  _debug: function(msg) {
+    // Prefix logs with the copier ID, which makes logs much easier to
+    // understand when several copiers are running simultaneously
+    dumpv("Copier: " + this._id + " " + msg);
+  }
+
+};
+
+/**
+ * Read from a stream, one byte at a time, up to the next |delimiter|
+ * character, but stopping if we've read |count| without finding it.  Reading
+ * also terminates early if there are less than |count| bytes available on the
+ * stream.  In that case, we only read as many bytes as the stream currently has
+ * to offer.
+ * TODO: This implementation could be removed if bug 984651 is fixed, which
+ *       provides a native version of the same idea.
+ * @param stream nsIInputStream
+ *        The input stream to read from.
+ * @param delimiter string
+ *        The character we're trying to find.
+ * @param count integer
+ *        The max number of characters to read while searching.
+ * @return string
+ *         The data collected.  If the delimiter was found, this string will
+ *         end with it.
+ */
+function delimitedRead(stream, delimiter, count) {
+  dumpv("Starting delimited read for " + delimiter + " up to " +
+        count + " bytes");
+
+  let scriptableStream;
+  if (stream instanceof Ci.nsIScriptableInputStream) {
+    scriptableStream = stream;
+  } else {
+    scriptableStream = new ScriptableInputStream(stream);
+  }
+
+  let data = "";
+
+  // Don't exceed what's available on the stream
+  count = Math.min(count, stream.available());
+
+  if (count <= 0) {
+    return data;
+  }
+
+  let char;
+  while (char !== delimiter && count > 0) {
+    char = scriptableStream.readBytes(1);
+    count--;
+    data += char;
+  }
+
+  return data;
+}
+
+module.exports = {
+  copyStream: copyStream,
+  delimitedRead: delimitedRead
+};
copy from toolkit/devtools/moz.build
copy to toolkit/devtools/transport/tests/moz.build
--- a/toolkit/devtools/moz.build
+++ b/toolkit/devtools/transport/tests/moz.build
@@ -1,21 +1,7 @@
 # -*- Mode: python; c-basic-offset: 4; indent-tabs-mode: nil; tab-width: 40 -*-
 # vim: set filetype=python:
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
-PARALLEL_DIRS += [
-    'server',
-    'client',
-    'gcli',
-    'sourcemap',
-    'webconsole',
-    'apps',
-    'styleinspector',
-    'acorn',
-    'pretty-fast',
-    'qrcode'
-]
-
-MOCHITEST_CHROME_MANIFESTS += ['tests/mochitest/chrome.ini']
-XPCSHELL_TESTS_MANIFESTS += ['tests/unit/xpcshell.ini']
+XPCSHELL_TESTS_MANIFESTS += ['unit/xpcshell.ini']
new file mode 100644
--- /dev/null
+++ b/toolkit/devtools/transport/tests/unit/head_dbg.js
@@ -0,0 +1,292 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+"use strict";
+const Cc = Components.classes;
+const Ci = Components.interfaces;
+const Cu = Components.utils;
+const Cr = Components.results;
+const CC = Components.Constructor;
+
+const { devtools } =
+  Cu.import("resource://gre/modules/devtools/Loader.jsm", {});
+const { Promise: promise } =
+  Cu.import("resource://gre/modules/Promise.jsm", {});
+
+const Services = devtools.require("Services");
+const DevToolsUtils = devtools.require("devtools/toolkit/DevToolsUtils.js");
+
+// Always log packets when running tests. runxpcshelltests.py will throw
+// the output away anyway, unless you give it the --verbose flag.
+Services.prefs.setBoolPref("devtools.debugger.log", true);
+Services.prefs.setBoolPref("devtools.debugger.log.verbose", true);
+// Enable remote debugging for the relevant tests.
+Services.prefs.setBoolPref("devtools.debugger.remote-enabled", true);
+
+function tryImport(url) {
+  try {
+    Cu.import(url);
+  } catch (e) {
+    dump("Error importing " + url + "\n");
+    dump(DevToolsUtils.safeErrorString(e) + "\n");
+    throw e;
+  }
+}
+
+tryImport("resource://gre/modules/devtools/dbg-server.jsm");
+tryImport("resource://gre/modules/devtools/dbg-client.jsm");
+tryImport("resource://gre/modules/devtools/Loader.jsm");
+
+function testExceptionHook(ex) {
+  try {
+    do_report_unexpected_exception(ex);
+  } catch(ex) {
+    return {throw: ex}
+  }
+  return undefined;
+}
+
+// Convert an nsIScriptError 'aFlags' value into an appropriate string.
+function scriptErrorFlagsToKind(aFlags) {
+  var kind;
+  if (aFlags & Ci.nsIScriptError.warningFlag)
+    kind = "warning";
+  if (aFlags & Ci.nsIScriptError.exceptionFlag)
+    kind = "exception";
+  else
+    kind = "error";
+
+  if (aFlags & Ci.nsIScriptError.strictFlag)
+    kind = "strict " + kind;
+
+  return kind;
+}
+
+// Redeclare dbg_assert with a fatal behavior.
+function dbg_assert(cond, e) {
+  if (!cond) {
+    throw e;
+  }
+}
+
+// Register a console listener, so console messages don't just disappear
+// into the ether.
+let errorCount = 0;
+let listener = {
+  observe: function (aMessage) {
+    errorCount++;
+    try {
+      // If we've been given an nsIScriptError, then we can print out
+      // something nicely formatted, for tools like Emacs to pick up.
+      var scriptError = aMessage.QueryInterface(Ci.nsIScriptError);
+      dump(aMessage.sourceName + ":" + aMessage.lineNumber + ": " +
+           scriptErrorFlagsToKind(aMessage.flags) + ": " +
+           aMessage.errorMessage + "\n");
+      var string = aMessage.errorMessage;
+    } catch (x) {
+      // Be a little paranoid with message, as the whole goal here is to lose
+      // no information.
+      try {
+        var string = "" + aMessage.message;
+      } catch (x) {
+        var string = "<error converting error message to string>";
+      }
+    }
+
+    // Make sure we exit all nested event loops so that the test can finish.
+    while (DebuggerServer.xpcInspector.eventLoopNestLevel > 0) {
+      DebuggerServer.xpcInspector.exitNestedEventLoop();
+    }
+
+    // Throw in most cases, but ignore the "strict" messages
+    if (!(aMessage.flags & Ci.nsIScriptError.strictFlag)) {
+      do_throw("head_dbg.js got console message: " + string + "\n");
+    }
+  }
+};
+
+let consoleService = Cc["@mozilla.org/consoleservice;1"]
+                     .getService(Ci.nsIConsoleService);
+consoleService.registerListener(listener);
+
+function check_except(func) {
+  try {
+    func();
+  } catch (e) {
+    do_check_true(true);
+    return;
+  }
+  dump("Should have thrown an exception: " + func.toString());
+  do_check_true(false);
+}
+
+function testGlobal(aName) {
+  let systemPrincipal = Cc["@mozilla.org/systemprincipal;1"]
+    .createInstance(Ci.nsIPrincipal);
+
+  let sandbox = Cu.Sandbox(systemPrincipal);
+  sandbox.__name = aName;
+  return sandbox;
+}
+
+function addTestGlobal(aName)
+{
+  let global = testGlobal(aName);
+  DebuggerServer.addTestGlobal(global);
+  return global;
+}
+
+// List the DebuggerClient |aClient|'s tabs, look for one whose title is
+// |aTitle|, and apply |aCallback| to the packet's entry for that tab.
+function getTestTab(aClient, aTitle, aCallback) {
+  aClient.listTabs(function (aResponse) {
+    for (let tab of aResponse.tabs) {
+      if (tab.title === aTitle) {
+        aCallback(tab);
+        return;
+      }
+    }
+    aCallback(null);
+  });
+}
+
+// Attach to |aClient|'s tab whose title is |aTitle|; pass |aCallback| the
+// response packet and a TabClient instance referring to that tab.
+function attachTestTab(aClient, aTitle, aCallback) {
+  getTestTab(aClient, aTitle, function (aTab) {
+    aClient.attachTab(aTab.actor, aCallback);
+  });
+}
+
+// Attach to |aClient|'s tab whose title is |aTitle|, and then attach to
+// that tab's thread. Pass |aCallback| the thread attach response packet, a
+// TabClient referring to the tab, and a ThreadClient referring to the
+// thread.
+function attachTestThread(aClient, aTitle, aCallback) {
+  attachTestTab(aClient, aTitle, function (aResponse, aTabClient) {
+    function onAttach(aResponse, aThreadClient) {
+      aCallback(aResponse, aTabClient, aThreadClient);
+    }
+    aTabClient.attachThread({ useSourceMaps: true }, onAttach);
+  });
+}
+
+// Attach to |aClient|'s tab whose title is |aTitle|, attach to the tab's
+// thread, and then resume it. Pass |aCallback| the thread's response to
+// the 'resume' packet, a TabClient for the tab, and a ThreadClient for the
+// thread.
+function attachTestTabAndResume(aClient, aTitle, aCallback) {
+  attachTestThread(aClient, aTitle, function(aResponse, aTabClient, aThreadClient) {
+    aThreadClient.resume(function (aResponse) {
+      aCallback(aResponse, aTabClient, aThreadClient);
+    });
+  });
+}
+
+/**
+ * Initialize the testing debugger server.
+ */
+function initTestDebuggerServer() {
+  DebuggerServer.registerModule("devtools/server/actors/script");
+  DebuggerServer.registerModule("xpcshell-test/testactors");
+  // Allow incoming connections.
+  DebuggerServer.init(function () { return true; });
+}
+
+function finishClient(aClient) {
+  aClient.close(function() {
+    do_test_finished();
+  });
+}
+
+/**
+ * Takes a relative file path and returns the absolute file url for it.
+ */
+function getFileUrl(aName, aAllowMissing=false) {
+  let file = do_get_file(aName, aAllowMissing);
+  return Services.io.newFileURI(file).spec;
+}
+
+/**
+ * Returns the full path of the file with the specified name in a
+ * platform-independent and URL-like form.
+ */
+function getFilePath(aName, aAllowMissing=false) {
+  let file = do_get_file(aName, aAllowMissing);
+  let path = Services.io.newFileURI(file).spec;
+  let filePrePath = "file://";
+  if ("nsILocalFileWin" in Ci &&
+      file instanceof Ci.nsILocalFileWin) {
+    filePrePath += "/";
+  }
+  return path.slice(filePrePath.length);
+}
+
+Cu.import("resource://gre/modules/NetUtil.jsm");
+
+/**
+ * Wrapper around do_get_file to prefix files with the name of current test to
+ * avoid collisions when running in parallel.
+ */
+function getTestTempFile(fileName, allowMissing) {
+  let thisTest = _TEST_FILE.toString().replace(/\\/g, "/");
+  thisTest = thisTest.substring(thisTest.lastIndexOf("/") + 1);
+  thisTest = thisTest.replace(/\..*$/, "");
+  return do_get_file(fileName + "-" + thisTest, allowMissing);
+}
+
+function writeTestTempFile(aFileName, aContent) {
+  let file = getTestTempFile(aFileName, true);
+  let stream = Cc["@mozilla.org/network/file-output-stream;1"]
+    .createInstance(Ci.nsIFileOutputStream);
+  stream.init(file, -1, -1, 0);
+  try {
+    do {
+      let numWritten = stream.write(aContent, aContent.length);
+      aContent = aContent.slice(numWritten);
+    } while (aContent.length > 0);
+  } finally {
+    stream.close();
+  }
+}
+
+function try_open_listener() {
+  if (DebuggerServer._listener) {
+    return DebuggerServer._listener.port;
+  }
+  try {
+    // Pick a random one between 2000 and 65000.
+    let port = Math.floor(Math.random() * (65000 - 2000 + 1)) + 2000;
+    do_check_true(DebuggerServer.openListener(port));
+    return port;
+  } catch (e) {
+    return try_open_listener();
+  }
+}
+
+/*** Transport Factories ***/
+
+function socket_transport() {
+  let port = try_open_listener();
+  do_print("Debugger server port is " + port);
+  return debuggerSocketConnect("127.0.0.1", port);
+}
+
+function local_transport() {
+  return DebuggerServer.connectPipe();
+}
+
+/*** Sample Data ***/
+
+let gReallyLong;
+function really_long() {
+  if (gReallyLong) {
+    return gReallyLong;
+  }
+  let ret = "0123456789";
+  for (let i = 0; i < 18; i++) {
+    ret += ret;
+  }
+  gReallyLong = ret;
+  return ret;
+}
rename from toolkit/devtools/server/tests/unit/test_dbgsocket.js
rename to toolkit/devtools/transport/tests/unit/test_dbgsocket.js
--- a/toolkit/devtools/server/tests/unit/test_dbgsocket.js
+++ b/toolkit/devtools/transport/tests/unit/test_dbgsocket.js
@@ -79,28 +79,16 @@ function test_socket_shutdown()
 
     onClosed: function(aStatus) {
       do_print("test_socket_shutdown onClosed called at " + new Date().toTimeString());
       do_check_eq(aStatus, Cr.NS_ERROR_CONNECTION_REFUSED);
       run_next_test();
     }
   };
 
-  // Hack to get more logging for bug 775924.
-  transport.onDataAvailable = makeInfallible(function DT_onDataAvailable(aRequest, aContext,
-                                             aStream, aOffset, aCount) {
-    do_print("onDataAvailable. offset: "+aOffset+", count: "+aCount);
-    let buf = NetUtil.readInputStreamToString(aStream, aStream.available());
-    transport._incoming += buf;
-    do_print("Read form stream("+buf.length+"): "+buf);
-    while (transport._processIncoming()) {
-      do_print("Look ma, I'm looping!");
-    };
-  }, "DebuggerTransport.prototype.onDataAvailable");
-
   do_print("Initializing input stream at " + new Date().toTimeString());
   transport.ready();
 }
 
 function test_pipe_conn()
 {
   let transport = DebuggerServer.connectPipe();
   transport.hooks = {
@@ -121,42 +109,8 @@ function try_open_listener()
   try {
     do_check_true(DebuggerServer.openListener(port));
   } catch (e) {
     // In case the port is unavailable, pick a random one between 2000 and 65000.
     port = Math.floor(Math.random() * (65000 - 2000 + 1)) + 2000;
     try_open_listener();
   }
 }
-
-// Copied verbatim from dbg-transport.js.
-// Hack to get more logging for bug 775924.
-function makeInfallible(aHandler, aName) {
-  if (!aName)
-    aName = aHandler.name;
-
-  return function (/* arguments */) {
-    try {
-      return aHandler.apply(this, arguments);
-    } catch (ex) {
-      let msg = "Handler function ";
-      if (aName) {
-        msg += aName + " ";
-      }
-      msg += "threw an exception: " + DevToolsUtils.safeErrorString(ex);
-      if (ex.stack) {
-        msg += "\nCall stack:\n" + ex.stack;
-      }
-
-      do_print(msg + "\n");
-
-      if (Cu.reportError) {
-        /*
-         * Note that the xpcshell test harness registers an observer for
-         * console messages, so when we're running tests, this will cause
-         * the test to quit.
-         */
-        Cu.reportError(msg);
-      }
-      return undefined;
-    }
-  }
-}
rename from toolkit/devtools/server/tests/unit/test_dbgsocket_connection_drop.js
rename to toolkit/devtools/transport/tests/unit/test_dbgsocket_connection_drop.js
--- a/toolkit/devtools/server/tests/unit/test_dbgsocket_connection_drop.js
+++ b/toolkit/devtools/transport/tests/unit/test_dbgsocket_connection_drop.js
@@ -6,67 +6,77 @@
 /**
  * Bug 755412 - checks if the server drops the connection on an improperly
  * framed packet, i.e. when the length header is invalid.
  */
 
 Cu.import("resource://gre/modules/devtools/dbg-server.jsm");
 Cu.import("resource://gre/modules/devtools/dbg-client.jsm");
 
+const { RawPacket } = devtools.require("devtools/toolkit/transport/packets");
+
 let port = 2929;
 
-function run_test()
-{
+function run_test() {
   do_print("Starting test at " + new Date().toTimeString());
   initTestDebuggerServer();
 
   add_test(test_socket_conn_drops_after_invalid_header);
   add_test(test_socket_conn_drops_after_invalid_header_2);
+  add_test(test_socket_conn_drops_after_too_large_length);
   add_test(test_socket_conn_drops_after_too_long_header);
   run_next_test();
 }
 
 function test_socket_conn_drops_after_invalid_header() {
   return test_helper('fluff30:27:{"to":"root","type":"echo"}');
 }
 
 function test_socket_conn_drops_after_invalid_header_2() {
   return test_helper('27asd:{"to":"root","type":"echo"}');
 }
 
-function test_socket_conn_drops_after_too_long_header() {
-  return test_helper('4305724038957487634549823475894325');
+function test_socket_conn_drops_after_too_large_length() {
+  // Packet length is limited (semi-arbitrarily) to 1 TiB (2^40)
+  return test_helper('4305724038957487634549823475894325:');
 }
 
+function test_socket_conn_drops_after_too_long_header() {
+  // The packet header is currently limited to no more than 200 bytes
+  let rawPacket = '4305724038957487634549823475894325';
+  for (let i = 0; i < 8; i++) {
+    rawPacket += rawPacket;
+  }
+  return test_helper(rawPacket + ':');
+}
 
 function test_helper(payload) {
   try_open_listener();
 
   let transport = debuggerSocketConnect("127.0.0.1", port);
   transport.hooks = {
     onPacket: function(aPacket) {
       this.onPacket = function(aPacket) {
         do_throw(new Error("This connection should be dropped."));
         transport.close();
       }
 
       // Inject the payload directly into the stream.
-      transport._outgoing += payload;
+      transport._outgoing.push(new RawPacket(transport, payload));
       transport._flushOutgoing();
     },
     onClosed: function(aStatus) {
       do_check_true(true);
       run_next_test();
     },
   };
   transport.ready();
 }
 
-function try_open_listener()
-{
+function try_open_listener() {
   try {
     do_check_true(DebuggerServer.openListener(port));
   } catch (e) {
     // In case the port is unavailable, pick a random one between 2000 and 65000.
     port = Math.floor(Math.random() * (65000 - 2000 + 1)) + 2000;
     try_open_listener();
   }
 }
new file mode 100644
--- /dev/null
+++ b/toolkit/devtools/transport/tests/unit/test_delimited_read.js
@@ -0,0 +1,26 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+const StreamUtils = devtools.require("devtools/toolkit/transport/stream-utils");
+
+const StringInputStream = CC("@mozilla.org/io/string-input-stream;1",
+                             "nsIStringInputStream", "setData");
+
+function run_test() {
+  add_task(function() {
+    yield test_delimited_read("0123:", "0123:");
+    yield test_delimited_read("0123:4567:", "0123:");
+    yield test_delimited_read("012345678901:", "0123456789");
+    yield test_delimited_read("0123/0123", "0123/0123");
+  });
+
+  run_next_test();
+}
+
+/*** Tests ***/
+
+function test_delimited_read(input, expected) {
+  input = new StringInputStream(input, input.length);
+  let result = StreamUtils.delimitedRead(input, ":", 10);
+  do_check_eq(result, expected);
+}
new file mode 100644
--- /dev/null
+++ b/toolkit/devtools/transport/tests/unit/test_packet.js
@@ -0,0 +1,21 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+const { JSONPacket, BulkPacket } =
+  devtools.require("devtools/toolkit/transport/packets");
+
+function run_test() {
+  add_test(test_packet_done);
+  run_next_test();
+}
+
+// Ensure done can be checked without getting an error
+function test_packet_done() {
+  let json = new JSONPacket();
+  do_check_false(!!json.done);
+
+  let bulk = new BulkPacket();
+  do_check_false(!!bulk.done);
+
+  run_next_test();
+}
new file mode 100644
--- /dev/null
+++ b/toolkit/devtools/transport/tests/unit/test_transport_bulk.js
@@ -0,0 +1,143 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+let { DebuggerServer } =
+  Cu.import("resource://gre/modules/devtools/dbg-server.jsm", {});
+let { FileUtils } = Cu.import("resource://gre/modules/FileUtils.jsm", {});
+let { NetUtil } = Cu.import("resource://gre/modules/NetUtil.jsm", {});
+
+function run_test() {
+  initTestDebuggerServer();
+
+  add_task(function() {
+    yield test_bulk_transfer_transport(socket_transport);
+    yield test_bulk_transfer_transport(local_transport);
+    DebuggerServer.destroy();
+  });
+
+  run_next_test();
+}
+
+/*** Tests ***/
+
+/**
+ * This tests a one-way bulk transfer at the transport layer.
+ */
+function test_bulk_transfer_transport(transportFactory) {
+  do_print("Starting bulk transfer test at " + new Date().toTimeString());
+
+  let clientDeferred = promise.defer();
+  let serverDeferred = promise.defer();
+
+  // Ensure test files are not present from a failed run
+  cleanup_files();
+  let reallyLong = really_long();
+  writeTestTempFile("bulk-input", reallyLong);
+
+  do_check_eq(Object.keys(DebuggerServer._connections).length, 0);
+
+  let transport = transportFactory();
+
+  // Sending from client to server
+  function write_data({copyFrom}) {
+    NetUtil.asyncFetch(getTestTempFile("bulk-input"), function(input, status) {
+      copyFrom(input).then(() => {
+        input.close();
+      });
+    });
+  }
+
+  // Receiving on server from client
+  function on_bulk_packet({actor, type, length, copyTo}) {
+    do_check_eq(actor, "root");
+    do_check_eq(type, "file-stream");
+    do_check_eq(length, reallyLong.length);
+
+    let outputFile = getTestTempFile("bulk-output", true);
+    outputFile.create(Ci.nsIFile.NORMAL_FILE_TYPE, parseInt("666", 8));
+
+    let output = FileUtils.openSafeFileOutputStream(outputFile);
+
+    copyTo(output).then(() => {
+      FileUtils.closeSafeFileOutputStream(output);
+      return verify();
+    }).then(() => {
+      // It's now safe to close
+      transport.hooks.onClosed = () => {
+        clientDeferred.resolve();
+      };
+      transport.close();
+    });
+  }
+
+  // Client
+  transport.hooks = {
+    onPacket: function(aPacket) {
+      // We've received the initial start up packet
+      do_check_eq(aPacket.from, "root");
+
+      // Server
+      do_check_eq(Object.keys(DebuggerServer._connections).length, 1);
+      do_print(Object.keys(DebuggerServer._connections));
+      for (let connId in DebuggerServer._connections) {
+        DebuggerServer._connections[connId].onBulkPacket = on_bulk_packet;
+      }
+
+      DebuggerServer.on("connectionchange", (event, type) => {
+        if (type === "closed") {
+          serverDeferred.resolve();
+        }
+      });
+
+      transport.startBulkSend({
+         actor: "root",
+         type: "file-stream",
+         length: reallyLong.length
+      }).then(write_data);
+    },
+
+    onClosed: function() {
+      do_throw("Transport closed before we expected");
+    }
+  };
+
+  transport.ready();
+
+  return promise.all([clientDeferred.promise, serverDeferred.promise]);
+}
+
+/*** Test Utils ***/
+
+function verify() {
+  let reallyLong = really_long();
+
+  let inputFile = getTestTempFile("bulk-input");
+  let outputFile = getTestTempFile("bulk-output");
+
+  do_check_eq(inputFile.fileSize, reallyLong.length);
+  do_check_eq(outputFile.fileSize, reallyLong.length);
+
+  // Ensure output file contents actually match
+  let compareDeferred = promise.defer();
+  NetUtil.asyncFetch(getTestTempFile("bulk-output"), input => {
+    let outputData = NetUtil.readInputStreamToString(input, reallyLong.length);
+    // Avoid do_check_eq here so we don't log the contents
+    do_check_true(outputData === reallyLong);
+    input.close();
+    compareDeferred.resolve();
+  });
+
+  return compareDeferred.promise.then(cleanup_files);
+}
+
+function cleanup_files() {
+  let inputFile = getTestTempFile("bulk-input", true);
+  if (inputFile.exists()) {
+    inputFile.remove(false);
+  }
+
+  let outputFile = getTestTempFile("bulk-output", true);
+  if (outputFile.exists()) {
+    outputFile.remove(false);
+  }
+}
new file mode 100644
--- /dev/null
+++ b/toolkit/devtools/transport/tests/unit/testactors.js
@@ -0,0 +1,130 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+const { ActorPool, appendExtraActors, createExtraActors } =
+  require("devtools/server/actors/common");
+const { RootActor } = require("devtools/server/actors/root");
+const { ThreadActor } = require("devtools/server/actors/script");
+const { DebuggerServer } = require("devtools/server/main");
+
+var gTestGlobals = [];
+DebuggerServer.addTestGlobal = function(aGlobal) {
+  gTestGlobals.push(aGlobal);
+};
+
+// A mock tab list, for use by tests. This simply presents each global in
+// gTestGlobals as a tab, and the list is fixed: it never calls its
+// onListChanged handler.
+//
+// As implemented now, we consult gTestGlobals when we're constructed, not
+// when we're iterated over, so tests have to add their globals before the
+// root actor is created.
+function TestTabList(aConnection) {
+  this.conn = aConnection;
+
+  // An array of actors for each global added with
+  // DebuggerServer.addTestGlobal.
+  this._tabActors = [];
+
+  // A pool mapping those actors' names to the actors.
+  this._tabActorPool = new ActorPool(aConnection);
+
+  for (let global of gTestGlobals) {
+    let actor = new TestTabActor(aConnection, global);
+    actor.selected = false;
+    this._tabActors.push(actor);
+    this._tabActorPool.addActor(actor);
+  }
+  if (this._tabActors.length > 0) {
+    this._tabActors[0].selected = true;
+  }
+
+  aConnection.addActorPool(this._tabActorPool);
+}
+
+TestTabList.prototype = {
+  constructor: TestTabList,
+  getList: function () {
+    return promise.resolve([tabActor for (tabActor of this._tabActors)]);
+  }
+};
+
+function createRootActor(aConnection) {
+  let root = new RootActor(aConnection, {
+    tabList: new TestTabList(aConnection),
+    globalActorFactories: DebuggerServer.globalActorFactories
+  });
+  root.applicationType = "xpcshell-tests";
+  return root;
+}
+
+function TestTabActor(aConnection, aGlobal) {
+  this.conn = aConnection;
+  this._global = aGlobal;
+  this._threadActor = new ThreadActor(this, this._global);
+  this.conn.addActor(this._threadActor);
+  this._attached = false;
+  this._extraActors = {};
+}
+
+TestTabActor.prototype = {
+  constructor: TestTabActor,
+  actorPrefix: "TestTabActor",
+
+  get window() {
+    return { wrappedJSObject: this._global };
+  },
+
+  get url() {
+    return this._global.__name;
+  },
+
+  form: function() {
+    let response = { actor: this.actorID, title: this._global.__name };
+
+    // Walk over tab actors added by extensions and add them to a new ActorPool.
+    let actorPool = new ActorPool(this.conn);
+    this._createExtraActors(DebuggerServer.tabActorFactories, actorPool);
+    if (!actorPool.isEmpty()) {
+      this._tabActorPool = actorPool;
+      this.conn.addActorPool(this._tabActorPool);
+    }
+
+    this._appendExtraActors(response);
+
+    return response;
+  },
+
+  onAttach: function(aRequest) {
+    this._attached = true;
+
+    let response = { type: "tabAttached", threadActor: this._threadActor.actorID };
+    this._appendExtraActors(response);
+
+    return response;
+  },
+
+  onDetach: function(aRequest) {
+    if (!this._attached) {
+      return { "error":"wrongState" };
+    }
+    return { type: "detached" };
+  },
+
+  /* Support for DebuggerServer.addTabActor. */
+  _createExtraActors: createExtraActors,
+  _appendExtraActors: appendExtraActors
+};
+
+TestTabActor.prototype.requestTypes = {
+  "attach": TestTabActor.prototype.onAttach,
+  "detach": TestTabActor.prototype.onDetach
+};
+
+exports.register = function(handle) {
+  handle.setRootActor(createRootActor);
+};
+
+exports.unregister = function(handle) {
+  handle.setRootActor(null);
+};
new file mode 100644
--- /dev/null
+++ b/toolkit/devtools/transport/tests/unit/xpcshell.ini
@@ -0,0 +1,14 @@
+[DEFAULT]
+head = head_dbg.js
+tail =
+
+support-files =
+  testactors.js
+
+[test_dbgsocket.js]
+skip-if = toolkit == "gonk"
+reason = bug 821285
+[test_dbgsocket_connection_drop.js]
+[test_delimited_read.js]
+[test_packet.js]
+[test_transport_bulk.js]
rename from toolkit/devtools/server/transport.js
rename to toolkit/devtools/transport/transport.js
--- a/toolkit/devtools/server/transport.js
+++ b/toolkit/devtools/transport/transport.js
@@ -17,217 +17,479 @@
       const { devtools } = Cu.import("resource://gre/modules/devtools/Loader.jsm", {});
       factory(devtools.require, this);
     }
   }
 }).call(this, function (require, exports) {
 
 "use strict";
 
-const { Cc, Ci, Cr, Cu } = require("chrome");
+const { Cc, Ci, Cr, Cu, CC } = require("chrome");
+const { Promise: promise } =
+  Cu.import("resource://gre/modules/Promise.jsm", {});
 const Services = require("Services");
 const DevToolsUtils = require("devtools/toolkit/DevToolsUtils");
-const { dumpn } = DevToolsUtils;
+const { dumpn, dumpv } = DevToolsUtils;
+const StreamUtils = require("devtools/toolkit/transport/stream-utils");
+const { Packet, JSONPacket, BulkPacket } =
+  require("devtools/toolkit/transport/packets");
 
-Cu.import("resource://gre/modules/NetUtil.jsm");
+const Pipe = CC("@mozilla.org/pipe;1", "nsIPipe", "init");
+const ScriptableInputStream = CC("@mozilla.org/scriptableinputstream;1",
+                                 "nsIScriptableInputStream", "init");
+
+const PACKET_HEADER_MAX = 200;
 
 /**
  * An adapter that handles data transfers between the debugger client and
  * server. It can work with both nsIPipe and nsIServerSocket transports so
  * long as the properly created input and output streams are specified.
  * (However, for intra-process connections, LocalDebuggerTransport, below,
  * is more efficient than using an nsIPipe pair with DebuggerTransport.)
  *
- * @param input nsIInputStream
+ * @param input nsIAsyncInputStream
  *        The input stream.
  * @param output nsIAsyncOutputStream
  *        The output stream.
  *
  * Given a DebuggerTransport instance dt:
  * 1) Set dt.hooks to a packet handler object (described below).
  * 2) Call dt.ready() to begin watching for input packets.
- * 3) Call dt.send() to send packets as you please, and handle incoming
- *    packets passed to hook.onPacket.
+ * 3) Call dt.send() / dt.startBulkSend() to send packets.
  * 4) Call dt.close() to close the connection, and disengage from the event
  *    loop.
  *
- * A packet handler is an object with two methods:
+ * A packet handler is an object with the following methods:
  *
  * - onPacket(packet) - called when we have received a complete packet.
- *   |Packet| is the parsed form of the packet --- a JavaScript value, not
+ *   |packet| is the parsed form of the packet --- a JavaScript value, not
  *   a JSON-syntax string.
  *
- * - onClosed(status) - called when the connection is closed. |Status| is
- *   an nsresult, of the sort passed to nsIRequestObserver.
+ * - onBulkPacket(packet) - called when we have switched to bulk packet
+ *   receiving mode. |packet| is an object containing:
+ *   * actor:  Name of actor that will receive the packet
+ *   * type:   Name of actor's method that should be called on receipt
+ *   * length: Size of the data to be read
+ *   * stream: This input stream should only be used directly if you can ensure
+ *             that you will read exactly |length| bytes and will not close the
+ *             stream when reading is complete
+ *   * done:   If you use the stream directly (instead of |copyTo| below), you
+ *             must signal completion by resolving / rejecting this deferred.
+ *             If it's rejected, the transport will be closed.  If an Error is
+ *             supplied as a rejection value, it will be logged via |dumpn|.
+ *             If you do use |copyTo|, resolving is taken care of for you when
+ *             copying completes.
+ *   * copyTo: A helper function for getting your data out of the stream that
+ *             meets the stream handling requirements above, and has the
+ *             following signature:
+ *     @param  output nsIAsyncOutputStream
+ *             The stream to copy to.
+ *     @return Promise
+ *             The promise is resolved when copying completes or rejected if any
+ *             (unexpected) errors occur.
  *
- * Data is transferred as a JSON packet serialized into a string, with the
- * string length prepended to the packet, followed by a colon
- * ([length]:[packet]). The contents of the JSON packet are specified in
- * the Remote Debugging Protocol specification.
+ * - onClosed(reason) - called when the connection is closed. |reason| is
+ *   an optional nsresult or object, typically passed when the transport is
+ *   closed due to some error in a underlying stream.
+ *
+ * See ./packets.js and the Remote Debugging Protocol specification for more
+ * details on the format of these packets.
  */
 function DebuggerTransport(input, output) {
   this._input = input;
+  this._scriptableInput = new ScriptableInputStream(input);
   this._output = output;
 
-  this._converter = Cc["@mozilla.org/intl/scriptableunicodeconverter"]
-    .createInstance(Ci.nsIScriptableUnicodeConverter);
-  this._converter.charset = "UTF-8";
-
-  this._outgoing = "";
-  this._incoming = "";
+  // The current incoming (possibly partial) header, which will determine which
+  // type of Packet |_incoming| below will become.
+  this._incomingHeader = "";
+  // The current incoming Packet object
+  this._incoming = null;
+  // A queue of outgoing Packet objects
+  this._outgoing = [];
 
   this.hooks = null;
+  this.active = false;
+
+  this._incomingEnabled = true;
+  this._outgoingEnabled = true;
+
+  this.close = this.close.bind(this);
 }
 
 DebuggerTransport.prototype = {
   /**
-   * Transmit a packet.
+   * Transmit an object as a JSON packet.
    *
    * This method returns immediately, without waiting for the entire
    * packet to be transmitted, registering event handlers as needed to
    * transmit the entire packet. Packets are transmitted in the order
    * they are passed to this method.
    */
-  send: function(packet) {
-    let data = JSON.stringify(packet);
-    data = this._converter.ConvertFromUnicode(data);
-    data = data.length + ":" + data;
-    this._outgoing += data;
+  send: function(object) {
+    let packet = new JSONPacket(this);
+    packet.object = object;
+    this._outgoing.push(packet);
     this._flushOutgoing();
   },
 
   /**
+   * Transmit streaming data via a bulk packet.
+   *
+   * This method initiates the bulk send process by queuing up the header data.
+   * The caller receives eventual access to a stream for writing.
+   *
+   * N.B.: Do *not* attempt to close the stream handed to you, as it will
+   * continue to be used by this transport afterwards.  Most users should
+   * instead use the provided |copyFrom| function instead.
+   *
+   * @param header Object
+   *        This is modeled after the format of JSON packets above, but does not
+   *        actually contain the data, but is instead just a routing header:
+   *          * actor:  Name of actor that will receive the packet
+   *          * type:   Name of actor's method that should be called on receipt
+   *          * length: Size of the data to be sent
+   * @return Promise
+   *         The promise will be resolved when you are allowed to write to the
+   *         stream with an object containing:
+   *           * stream:   This output stream should only be used directly if
+   *                       you can ensure that you will write exactly |length|
+   *                       bytes and will not close the stream when writing is
+   *                       complete
+   *           * done:     If you use the stream directly (instead of |copyFrom|
+   *                       below), you must signal completion by resolving /
+   *                       rejecting this deferred.  If it's rejected, the
+   *                       transport will be closed.  If an Error is supplied as
+   *                       a rejection value, it will be logged via |dumpn|.  If
+   *                       you do use |copyFrom|, resolving is taken care of for
+   *                       you when copying completes.
+   *           * copyFrom: A helper function for getting your data onto the
+   *                       stream that meets the stream handling requirements
+   *                       above, and has the following signature:
+   *             @param  input nsIAsyncInputStream
+   *                     The stream to copy from.
+   *             @return Promise
+   *                     The promise is resolved when copying completes or
+   *                     rejected if any (unexpected) errors occur.
+   */
+  startBulkSend: function(header) {
+    let packet = new BulkPacket(this);
+    packet.header = header;
+    this._outgoing.push(packet);
+    this._flushOutgoing();
+    return packet.streamReadyForWriting;
+  },
+
+  /**
    * Close the transport.
+   * @param reason nsresult / object (optional)
+   *        The status code or error message that corresponds to the reason for
+   *        closing the transport (likely because a stream closed or failed).
    */
-  close: function() {
+  close: function(reason) {
+    this.active = false;
     this._input.close();
+    this._scriptableInput.close();
     this._output.close();
+    this._destroyIncoming();
+    this._destroyAllOutgoing();
+    if (this.hooks) {
+      this.hooks.onClosed(reason);
+      this.hooks = null;
+    }
+    if (reason) {
+      dumpn("Transport closed: " + DevToolsUtils.safeErrorString(reason));
+    } else {
+      dumpn("Transport closed.");
+    }
   },
 
   /**
-   * Flush the outgoing stream.
+   * The currently outgoing packet (at the top of the queue).
+   */
+  get _currentOutgoing() { return this._outgoing[0]; },
+
+  /**
+   * Flush data to the outgoing stream.  Waits until the output stream notifies
+   * us that it is ready to be written to (via onOutputStreamReady).
    */
   _flushOutgoing: function() {
+    if (!this._outgoingEnabled || this._outgoing.length === 0) {
+      return;
+    }
+
+    // If the top of the packet queue has nothing more to send, remove it.
+    if (this._currentOutgoing.done) {
+      this._finishCurrentOutgoing();
+    }
+
     if (this._outgoing.length > 0) {
       var threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
       this._output.asyncWait(this, 0, 0, threadManager.currentThread);
     }
   },
 
-  onOutputStreamReady:
-  DevToolsUtils.makeInfallible(function(stream) {
-    let written = 0;
-    try {
-      written = stream.write(this._outgoing, this._outgoing.length);
-    } catch(e if e.result == Cr.NS_BASE_STREAM_CLOSED) {
-      dumpn("Connection closed.");
-      this.close();
+  /**
+   * Pause this transport's attempts to write to the output stream.  This is
+   * used when we've temporarily handed off our output stream for writing bulk
+   * data.
+   */
+  pauseOutgoing: function() {
+    this._outgoingEnabled = false;
+  },
+
+  /**
+   * Resume this transport's attempts to write to the output stream.
+   */
+  resumeOutgoing: function() {
+    this._outgoingEnabled = true;
+    this._flushOutgoing();
+  },
+
+  // nsIOutputStreamCallback
+  /**
+   * This is called when the output stream is ready for more data to be written.
+   * The current outgoing packet will attempt to write some amount of data, but
+   * may not complete.
+   */
+  onOutputStreamReady: DevToolsUtils.makeInfallible(function(stream) {
+    if (this._outgoing.length === 0) {
       return;
     }
-    this._outgoing = this._outgoing.slice(written);
+
+    try {
+      this._currentOutgoing.write(stream);
+    } catch(e if e.result == Cr.NS_BASE_STREAM_CLOSED ||
+                 e.result == Cr.NS_ERROR_NET_RESET) {
+      this.close(e.result);
+      return;
+    }
+
     this._flushOutgoing();
   }, "DebuggerTransport.prototype.onOutputStreamReady"),
 
   /**
-   * Initialize the input stream for reading. Once this method has been
-   * called, we watch for packets on the input stream, and pass them to
-   * this.hook.onPacket.
+   * Remove the current outgoing packet from the queue upon completion.
+   */
+  _finishCurrentOutgoing: function() {
+    if (this._currentOutgoing) {
+      this._currentOutgoing.destroy();
+      this._outgoing.shift();
+    }
+  },
+
+  /**
+   * Clear the entire outgoing queue.
+   */
+  _destroyAllOutgoing: function() {
+    for (let packet of this._outgoing) {
+      packet.destroy();
+    }
+    this._outgoing = [];
+  },
+
+  /**
+   * Initialize the input stream for reading. Once this method has been called,
+   * we watch for packets on the input stream, and pass them to the appropriate
+   * handlers via this.hooks.
    */
   ready: function() {
-    let pump = Cc["@mozilla.org/network/input-stream-pump;1"]
-      .createInstance(Ci.nsIInputStreamPump);
-    pump.init(this._input, -1, -1, 0, 0, false);
-    pump.asyncRead(this, null);
+    this.active = true;
+    this._waitForIncoming();
+  },
+
+  /**
+   * Asks the input stream to notify us (via onInputStreamReady) when it is
+   * ready for reading.
+   */
+  _waitForIncoming: function() {
+    if (this._incomingEnabled) {
+      let threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
+      this._input.asyncWait(this, 0, 0, threadManager.currentThread);
+    }
   },
 
-  // nsIStreamListener
-  onStartRequest:
-  DevToolsUtils.makeInfallible(function(request, context) {},
-                 "DebuggerTransport.prototype.onStartRequest"),
-
-  onStopRequest:
-  DevToolsUtils.makeInfallible(function(request, context, status) {
-    this.close();
-    if (this.hooks) {
-      this.hooks.onClosed(status);
-      this.hooks = null;
-    }
-  }, "DebuggerTransport.prototype.onStopRequest"),
-
-  onDataAvailable:
-  DevToolsUtils.makeInfallible(function(request, context, stream,
-                                        offset, count) {
-    this._incoming += NetUtil.readInputStreamToString(stream,
-                                                      stream.available());
-    while (this._processIncoming()) {}
-  }, "DebuggerTransport.prototype.onDataAvailable"),
+  /**
+   * Pause this transport's attempts to read from the input stream.  This is
+   * used when we've temporarily handed off our input stream for reading bulk
+   * data.
+   */
+  pauseIncoming: function() {
+    this._incomingEnabled = false;
+  },
 
   /**
-   * Process incoming packets. Returns true if a packet has been received, either
-   * if it was properly parsed or not. Returns false if the incoming stream does
-   * not contain a full packet yet. After a proper packet is parsed, the dispatch
-   * handler DebuggerTransport.hooks.onPacket is called with the packet as a
-   * parameter.
+   * Resume this transport's attempts to read from the input stream.
+   */
+  resumeIncoming: function() {
+    this._incomingEnabled = true;
+    this._flushIncoming();
+    this._waitForIncoming();
+  },
+
+  // nsIInputStreamCallback
+  /**
+   * Called when the stream is either readable or closed.
    */
-  _processIncoming: function() {
-    // Well this is ugly.
-    let sep = this._incoming.indexOf(":");
-    if (sep < 0) {
-      // Incoming packet length is too big anyway - drop the connection.
-      if (this._incoming.length > 20) {
-        this.close();
-      }
+  onInputStreamReady:
+  DevToolsUtils.makeInfallible(function(stream) {
+    try {
+      while(stream.available() && this._incomingEnabled &&
+            this._processIncoming(stream, stream.available())) {}
+      this._waitForIncoming();
+    } catch(e if e.result == Cr.NS_BASE_STREAM_CLOSED ||
+                 e.result == Cr.NS_ERROR_CONNECTION_REFUSED ||
+                 e.result == Cr.NS_ERROR_OFFLINE) {
+      this.close(e.result);
+    }
+  }, "DebuggerTransport.prototype.onInputStreamReady"),
 
+  /**
+   * Process the incoming data.  Will create a new currently incoming Packet if
+   * needed.  Tells the incoming Packet to read as much data as it can, but
+   * reading may not complete.  The Packet signals that its data is ready for
+   * delivery by calling one of this transport's _on*Ready methods (see
+   * ./packets.js and the _on*Ready methods below).
+   * @return boolean
+   *         Whether incoming stream processing should continue for any
+   *         remaining data.
+   */
+  _processIncoming: function(stream, count) {
+    dumpv("Data available: " + count);
+
+    if (!count) {
+      dumpv("Nothing to read, skipping");
       return false;
     }
 
-    let count = this._incoming.substring(0, sep);
-    // Check for a positive number with no garbage afterwards.
-    if (!/^[0-9]+$/.exec(count)) {
+    try {
+      if (!this._incoming) {
+        dumpv("Creating a new packet from incoming");
+
+        if (!this._readHeader(stream)) {
+          return false; // Not enough data to read packet type
+        }
+
+        // Attempt to create a new Packet by trying to parse each possible
+        // header pattern.
+        this._incoming = Packet.fromHeader(this._incomingHeader, this);
+        if (!this._incoming) {
+          throw new Error("No packet types for header: " +
+                          this._incomingHeader);
+        }
+      }
+
+      if (!this._incoming.done) {
+        // We have an incomplete packet, keep reading it.
+        dumpv("Existing packet incomplete, keep reading");
+        this._incoming.read(stream, this._scriptableInput);
+      }
+    } catch(e) {
+      let msg = "Error reading incoming packet: (" + e + " - " + e.stack + ")";
+      dumpn(msg);
+
+      // Now in an invalid state, shut down the transport.
       this.close();
       return false;
     }
 
-    count = +count;
-    if (this._incoming.length - (sep + 1) < count) {
-      // Don't have a complete request yet.
-      return false;
+    if (!this._incoming.done) {
+      // Still not complete, we'll wait for more data.
+      dumpv("Packet not done, wait for more");
+      return true;
     }
 
-    // We have a complete request, pluck it out of the data and parse it.
-    this._incoming = this._incoming.substring(sep + 1);
-    let packet = this._incoming.substring(0, count);
-    this._incoming = this._incoming.substring(count);
+    // Ready for next packet
+    this._flushIncoming();
+    return true;
+  },
 
-    try {
-      packet = this._converter.ConvertToUnicode(packet);
-      var parsed = JSON.parse(packet);
-    } catch(e) {
-      let msg = "Error parsing incoming packet: " + packet + " (" + e + " - " + e.stack + ")";
-      if (Cu.reportError) {
-        Cu.reportError(msg);
+  /**
+   * Read as far as we can into the incoming data, attempting to build up a
+   * complete packet header (which terminates with ":").  We'll only read up to
+   * PACKET_HEADER_MAX characters.
+   * @return boolean
+   *         True if we now have a complete header.
+   */
+  _readHeader: function() {
+    let amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length;
+    this._incomingHeader +=
+      StreamUtils.delimitedRead(this._scriptableInput, ":", amountToRead);
+    if (dumpv.wantVerbose) {
+      dumpv("Header read: " + this._incomingHeader);
+    }
+
+    if (this._incomingHeader.endsWith(":")) {
+      if (dumpv.wantVerbose) {
+        dumpv("Found packet header successfully: " + this._incomingHeader);
       }
-      dump(msg + "\n");
       return true;
     }
 
-    if (dumpn.wantLogging) {
-      dumpn("Got: " + JSON.stringify(parsed, null, 2));
+    if (this._incomingHeader.length >= PACKET_HEADER_MAX) {
+      throw new Error("Failed to parse packet header!");
+    }
+
+    // Not enough data yet.
+    return false;
+  },
+
+  /**
+   * If the incoming packet is done, log it as needed and clear the buffer.
+   */
+  _flushIncoming: function() {
+    if (!this._incoming.done) {
+      return;
     }
-    let self = this;
-    Services.tm.currentThread.dispatch(DevToolsUtils.makeInfallible(function() {
-      // Ensure the hooks are still around by the time this runs (they will go
-      // away when the transport is closed).
-      if (self.hooks) {
-        self.hooks.onPacket(parsed);
+    if (dumpn.wantLogging) {
+      dumpn("Got: " + this._incoming);
+    }
+    this._destroyIncoming();
+  },
+
+  /**
+   * Handler triggered by an incoming JSONPacket completing it's |read| method.
+   * Delivers the packet to this.hooks.onPacket.
+   */
+  _onJSONObjectReady: function(object) {
+    Services.tm.currentThread.dispatch(DevToolsUtils.makeInfallible(() => {
+      // Ensure the transport is still alive by the time this runs.
+      if (this.active) {
+        this.hooks.onPacket(object);
       }
     }, "DebuggerTransport instance's this.hooks.onPacket"), 0);
+  },
 
-    return true;
+  /**
+   * Handler triggered by an incoming BulkPacket entering the |read| phase for
+   * the stream portion of the packet.  Delivers info about the incoming
+   * streaming data to this.hooks.onBulkPacket.  See the main comment on the
+   * transport at the top of this file for more details.
+   */
+  _onBulkReadReady: function(...args) {
+    Services.tm.currentThread.dispatch(DevToolsUtils.makeInfallible(() => {
+      // Ensure the transport is still alive by the time this runs.
+      if (this.active) {
+        this.hooks.onBulkPacket(...args);
+      }
+    }, "DebuggerTransport instance's this.hooks.onBulkPacket"), 0);
+  },
+
+  /**
+   * Remove all handlers and references related to the current incoming packet,
+   * either because it is now complete or because the transport is closing.
+   */
+  _destroyIncoming: function() {
+    if (this._incoming) {
+      this._incoming.destroy();
+    }
+    this._incomingHeader = "";
+    this._incoming = null;
   }
+
 };
 
 exports.DebuggerTransport = DebuggerTransport;
 
 /**
  * An adapter that handles data transfers between the debugger client and
  * server when they both run in the same process. It presents the same API as
  * DebuggerTransport, but instead of transmitting serialized messages across a
@@ -243,16 +505,17 @@ function LocalDebuggerTransport(other) {
   this.hooks = null;
 
   /*
    * A packet number, shared between this and this.other. This isn't used
    * by the protocol at all, but it makes the packet traces a lot easier to
    * follow.
    */
   this._serial = this.other ? this.other._serial : { count: 0 };
+  this.close = this.close.bind(this);
 }
 
 LocalDebuggerTransport.prototype = {
   /**
    * Transmit a message by directly calling the onPacket handler of the other
    * endpoint.
    */
   send: function(packet) {
@@ -276,16 +539,86 @@ LocalDebuggerTransport.prototype = {
         if (other.hooks) {
           other.hooks.onPacket(packet);
         }
       }, "LocalDebuggerTransport instance's this.other.hooks.onPacket"), 0);
     }
   },
 
   /**
+   * Send a streaming bulk packet directly to the onBulkPacket handler of the
+   * other endpoint.
+   *
+   * This case is much simpler than the full DebuggerTransport, since there is
+   * no primary stream we have to worry about managing while we hand it off to
+   * others temporarily.  Instead, we can just make a single use pipe and be
+   * done with it.
+   */
+  startBulkSend: function({actor, type, length}) {
+    let serial = this._serial.count++;
+
+    dumpn("Sent bulk packet " + serial + " for actor " + actor);
+    if (!this.other) {
+      return;
+    }
+
+    let pipe = new Pipe(true, true, 0, 0, null);
+
+    Services.tm.currentThread.dispatch(DevToolsUtils.makeInfallible(() => {
+      dumpn("Received bulk packet " + serial);
+      if (!this.other.hooks) {
+        return;
+      }
+
+      // Receiver
+      let deferred = promise.defer();
+
+      this.other.hooks.onBulkPacket({
+        actor: actor,
+        type: type,
+        length: length,
+        copyTo: (output) => {
+          deferred.resolve(
+            StreamUtils.copyStream(pipe.inputStream, output, length));
+          return deferred.promise;
+        },
+        stream: pipe.inputStream,
+        done: deferred
+      });
+
+      // Await the result of reading from the stream
+      deferred.promise.then(() => pipe.inputStream.close(), this.close);
+    }, "LocalDebuggerTransport instance's this.other.hooks.onBulkPacket"), 0);
+
+    // Sender
+    let sendDeferred = promise.defer();
+
+    // The remote transport is not capable of resolving immediately here, so we
+    // shouldn't be able to either.
+    Services.tm.currentThread.dispatch(() => {
+      let copyDeferred = promise.defer();
+
+      sendDeferred.resolve({
+        copyFrom: (input) => {
+          copyDeferred.resolve(
+            StreamUtils.copyStream(input, pipe.outputStream, length));
+          return copyDeferred.promise;
+        },
+        stream: pipe.outputStream,
+        done: copyDeferred
+      });
+
+      // Await the result of writing to the stream
+      copyDeferred.promise.then(() => pipe.outputStream.close(), this.close);
+    }, 0);
+
+    return sendDeferred.promise;
+  },
+
+  /**
    * Close the transport.
    */
   close: function() {
     if (this.other) {
       // Remove the reference to the other endpoint before calling close(), to
       // avoid infinite recursion.
       let other = this.other;
       this.other = null;
@@ -365,14 +698,18 @@ ChildDebuggerTransport.prototype = {
   },
 
   receiveMessage: function ({data}) {
     this.hooks.onPacket(data);
   },
 
   send: function (packet) {
     this._sender.sendAsyncMessage(this._messageName, packet);
+  },
+
+  startBulkSend: function() {
+    throw new Error("Can't send bulk data to child processes.");
   }
 };
 
 exports.ChildDebuggerTransport = ChildDebuggerTransport;
 
 });