Bug 1168806: Configurable I/O thread for socket IPC classes, r=kmachulis
authorThomas Zimmermann <tdz@users.sourceforge.net>
Tue, 02 Jun 2015 10:01:57 +0200
changeset 246782 ef212da7547858d3bc2c9a27ffcf75bdd7968669
parent 246781 ad5f4c277ea3ef045bb033b14d2bb67992c2c5ba
child 246783 075e96a57701c252935f7b8d9ecb517da459073f
push id28840
push userkwierso@gmail.com
push dateWed, 03 Jun 2015 01:34:22 +0000
treeherdermozilla-central@b0a507af2b4a [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerskmachulis
bugs1168806
milestone41.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1168806: Configurable I/O thread for socket IPC classes, r=kmachulis The I/O thread sends and receives data on a file descriptor. This has traditionally been performed on a single I/O thread. This patch extends the socket IPC classes to support arbitrary I/O threads. The thread is configured when a connection is established and used until the socket gets closed.
dom/bluetooth/bluedroid/BluetoothSocket.cpp
dom/bluetooth/bluedroid/BluetoothSocket.h
dom/bluetooth/bluez/BluetoothSocket.cpp
dom/bluetooth/bluez/BluetoothSocket.h
ipc/bluetooth/BluetoothDaemonConnection.cpp
ipc/bluetooth/BluetoothDaemonConnection.h
ipc/unixsocket/ConnectionOrientedSocket.h
ipc/unixsocket/ListenSocket.cpp
ipc/unixsocket/ListenSocket.h
ipc/unixsocket/StreamSocket.cpp
ipc/unixsocket/StreamSocket.h
--- a/dom/bluetooth/bluedroid/BluetoothSocket.cpp
+++ b/dom/bluetooth/bluedroid/BluetoothSocket.cpp
@@ -393,18 +393,18 @@ public:
     }
 
     if (aConnectionStatus != 0) {
       mImpl->mConsumer->NotifyError();
       return;
     }
 
     mImpl->mConsumer->SetAddress(aBdAddress);
-    XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
-                                     new AcceptTask(mImpl, fd.forget()));
+    mImpl->GetIOLoop()->PostTask(FROM_HERE,
+                                 new AcceptTask(mImpl, fd.forget()));
   }
 
   void OnError(BluetoothStatus aStatus) override
   {
     MOZ_ASSERT(NS_IsMainThread());
     BT_LOGR("BluetoothSocketInterface::Accept failed: %d", (int)aStatus);
 
     if (!mImpl->IsShutdownOnMainThread()) {
@@ -610,18 +610,18 @@ public:
     }
 
     if (aConnectionStatus != 0) {
       mImpl->mConsumer->NotifyError();
       return;
     }
 
     mImpl->mConsumer->SetAddress(aBdAddress);
-    XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
-                                     new SocketConnectTask(mImpl, aFd));
+    mImpl->GetIOLoop()->PostTask(FROM_HERE,
+                                 new SocketConnectTask(mImpl, aFd));
   }
 
   void OnError(BluetoothStatus aStatus) override
   {
     MOZ_ASSERT(NS_IsMainThread());
     BT_WARNING("Connect failed: %d", (int)aStatus);
 
     if (!mImpl->IsShutdownOnMainThread()) {
@@ -637,51 +637,62 @@ private:
   DroidSocketImpl* mImpl;
 };
 
 nsresult
 BluetoothSocket::Connect(const nsAString& aDeviceAddress,
                          const BluetoothUuid& aServiceUuid,
                          BluetoothSocketType aType,
                          int aChannel,
-                         bool aAuth, bool aEncrypt)
+                         bool aAuth, bool aEncrypt,
+                         MessageLoop* aIOLoop)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(!mImpl);
 
   SetConnectionStatus(SOCKET_CONNECTING);
 
-  mImpl = new DroidSocketImpl(XRE_GetIOMessageLoop(), this);
+  mImpl = new DroidSocketImpl(aIOLoop, this);
 
   BluetoothSocketResultHandler* res = new ConnectSocketResultHandler(mImpl);
   SetCurrentResultHandler(res);
 
   sBluetoothSocketInterface->Connect(
     aDeviceAddress, aType,
     aServiceUuid.mUuid, aChannel,
     aEncrypt, aAuth, res);
 
   return NS_OK;
 }
 
+nsresult
+BluetoothSocket::Connect(const nsAString& aDeviceAddress,
+                         const BluetoothUuid& aServiceUuid,
+                         BluetoothSocketType aType,
+                         int aChannel,
+                         bool aAuth, bool aEncrypt)
+{
+  return Connect(aDeviceAddress, aServiceUuid, aType, aChannel, aAuth,
+                 aEncrypt, XRE_GetIOMessageLoop());
+}
+
 class ListenResultHandler final : public BluetoothSocketResultHandler
 {
 public:
   ListenResultHandler(DroidSocketImpl* aImpl)
   : mImpl(aImpl)
   {
     MOZ_ASSERT(mImpl);
   }
 
   void Listen(int aFd) override
   {
     MOZ_ASSERT(NS_IsMainThread());
 
-    XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
-                                     new SocketListenTask(mImpl, aFd));
+    mImpl->GetIOLoop()->PostTask(FROM_HERE, new SocketListenTask(mImpl, aFd));
   }
 
   void OnError(BluetoothStatus aStatus) override
   {
     MOZ_ASSERT(NS_IsMainThread());
 
     BT_WARNING("Listen failed: %d", (int)aStatus);
   }
@@ -690,36 +701,48 @@ private:
   DroidSocketImpl* mImpl;
 };
 
 nsresult
 BluetoothSocket::Listen(const nsAString& aServiceName,
                         const BluetoothUuid& aServiceUuid,
                         BluetoothSocketType aType,
                         int aChannel,
-                        bool aAuth, bool aEncrypt)
+                        bool aAuth, bool aEncrypt,
+                        MessageLoop* aIOLoop)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(!mImpl);
 
   SetConnectionStatus(SOCKET_LISTENING);
 
-  mImpl = new DroidSocketImpl(XRE_GetIOMessageLoop(), this);
+  mImpl = new DroidSocketImpl(aIOLoop, this);
 
   BluetoothSocketResultHandler* res = new ListenResultHandler(mImpl);
   SetCurrentResultHandler(res);
 
   sBluetoothSocketInterface->Listen(
     aType,
     aServiceName, aServiceUuid.mUuid, aChannel,
     aEncrypt, aAuth, res);
 
   return NS_OK;
 }
 
+nsresult
+BluetoothSocket::Listen(const nsAString& aServiceName,
+                        const BluetoothUuid& aServiceUuid,
+                        BluetoothSocketType aType,
+                        int aChannel,
+                        bool aAuth, bool aEncrypt)
+{
+  return Listen(aServiceName, aServiceUuid, aType, aChannel, aAuth, aEncrypt,
+                XRE_GetIOMessageLoop());
+}
+
 void
 BluetoothSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mObserver);
 
   mObserver->ReceiveSocketData(this, aBuffer);
 }
@@ -728,17 +751,17 @@ BluetoothSocket::ReceiveSocketData(nsAut
 
 void
 BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mImpl);
   MOZ_ASSERT(!mImpl->IsShutdownOnMainThread());
 
-  XRE_GetIOMessageLoop()->PostTask(
+  mImpl->GetIOLoop()->PostTask(
     FROM_HERE,
     new SocketIOSendTask<DroidSocketImpl, UnixSocketIOBuffer>(mImpl, aBuffer));
 }
 
 // |SocketBase|
 
 void
 BluetoothSocket::Close()
@@ -753,19 +776,17 @@ BluetoothSocket::Close()
   if (mCurrentRes) {
     sBluetoothSocketInterface->Close(mCurrentRes);
   }
 
   // From this point on, we consider mImpl as being deleted.
   // We sever the relationship here so any future calls to listen or connect
   // will create a new implementation.
   mImpl->ShutdownOnMainThread();
-
-  XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mImpl));
-
+  mImpl->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mImpl));
   mImpl = nullptr;
 
   NotifyDisconnect();
 }
 
 void
 BluetoothSocket::OnConnectSuccess()
 {
--- a/dom/bluetooth/bluedroid/BluetoothSocket.h
+++ b/dom/bluetooth/bluedroid/BluetoothSocket.h
@@ -5,37 +5,53 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef mozilla_dom_bluetooth_BluetoothSocket_h
 #define mozilla_dom_bluetooth_BluetoothSocket_h
 
 #include "BluetoothCommon.h"
 #include "mozilla/ipc/DataSocket.h"
 
+class MessageLoop;
+
 BEGIN_BLUETOOTH_NAMESPACE
 
 class BluetoothSocketObserver;
 class BluetoothSocketResultHandler;
 class DroidSocketImpl;
 
 class BluetoothSocket final : public mozilla::ipc::DataSocket
 {
 public:
   BluetoothSocket(BluetoothSocketObserver* aObserver);
 
   nsresult Connect(const nsAString& aDeviceAddress,
                    const BluetoothUuid& aServiceUuid,
                    BluetoothSocketType aType,
                    int aChannel,
+                   bool aAuth, bool aEncrypt,
+                   MessageLoop* aIOLoop);
+
+  nsresult Connect(const nsAString& aDeviceAddress,
+                   const BluetoothUuid& aServiceUuid,
+                   BluetoothSocketType aType,
+                   int aChannel,
                    bool aAuth, bool aEncrypt);
 
   nsresult Listen(const nsAString& aServiceName,
                   const BluetoothUuid& aServiceUuid,
                   BluetoothSocketType aType,
                   int aChannel,
+                  bool aAuth, bool aEncrypt,
+                  MessageLoop* aIOLoop);
+
+  nsresult Listen(const nsAString& aServiceName,
+                  const BluetoothUuid& aServiceUuid,
+                  BluetoothSocketType aType,
+                  int aChannel,
                   bool aAuth, bool aEncrypt);
 
   /**
    * Method to be called whenever data is received. This is only called on the
    * main thread.
    *
    * @param aBuffer Data received from the socket.
    */
--- a/dom/bluetooth/bluez/BluetoothSocket.cpp
+++ b/dom/bluetooth/bluez/BluetoothSocket.cpp
@@ -544,17 +544,17 @@ public:
     }
 
     BluetoothSocketIO* io = GetIO();
     if (io->IsShutdownOnMainThread()) {
       return;
     }
 
     io->ClearDelayedConnectTask();
-    XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new ConnectTask(io));
+    io->GetIOLoop()->PostTask(FROM_HERE, new ConnectTask(io));
   }
 };
 
 //
 // BluetoothSocket
 //
 
 BluetoothSocket::BluetoothSocket(BluetoothSocketObserver* aObserver)
@@ -640,49 +640,65 @@ BluetoothSocket::SendSocketData(const ns
 
   SendSocketData(new UnixSocketRawData(aStr.BeginReading(), aStr.Length()));
 
   return true;
 }
 
 nsresult
 BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector,
-                         int aDelayMs)
+                         int aDelayMs, MessageLoop* aIOLoop)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(aConnector);
+  MOZ_ASSERT(aIOLoop);
   MOZ_ASSERT(!mIO);
 
-  MessageLoop* ioLoop = XRE_GetIOMessageLoop();
-  mIO = new BluetoothSocketIO(ioLoop, this, aConnector);
+  mIO = new BluetoothSocketIO(aIOLoop, this, aConnector);
   SetConnectionStatus(SOCKET_CONNECTING);
+
   if (aDelayMs > 0) {
     DelayedConnectTask* connectTask = new DelayedConnectTask(mIO);
     mIO->SetDelayedConnectTask(connectTask);
     MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs);
   } else {
-    ioLoop->PostTask(FROM_HERE, new ConnectTask(mIO));
+    aIOLoop->PostTask(FROM_HERE, new ConnectTask(mIO));
   }
+
+  return NS_OK;
+}
+
+nsresult
+BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector,
+                         int aDelayMs)
+{
+  return Connect(aConnector, aDelayMs, XRE_GetIOMessageLoop());
+}
+
+nsresult
+BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector,
+                        MessageLoop* aIOLoop)
+{
+  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(aConnector);
+  MOZ_ASSERT(aIOLoop);
+  MOZ_ASSERT(!mIO);
+
+  mIO = new BluetoothSocketIO(aIOLoop, this, aConnector);
+  SetConnectionStatus(SOCKET_LISTENING);
+
+  aIOLoop->PostTask(FROM_HERE, new ListenTask(mIO));
+
   return NS_OK;
 }
 
 nsresult
 BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector)
 {
-  MOZ_ASSERT(NS_IsMainThread());
-  MOZ_ASSERT(aConnector);
-  MOZ_ASSERT(!mIO);
-
-  MessageLoop* ioLoop = XRE_GetIOMessageLoop();
-
-  mIO = new BluetoothSocketIO(ioLoop, this, aConnector);
-  SetConnectionStatus(SOCKET_LISTENING);
-  ioLoop->PostTask(FROM_HERE, new ListenTask(mIO));
-
-  return NS_OK;
+  return Listen(aConnector, XRE_GetIOMessageLoop());
 }
 
 void
 BluetoothSocket::GetAddress(nsAString& aAddrStr)
 {
   aAddrStr.Truncate();
   if (!mIO || GetConnectionStatus() != SOCKET_CONNECTED) {
     NS_WARNING("No socket currently open!");
@@ -695,17 +711,17 @@ BluetoothSocket::GetAddress(nsAString& a
 
 void
 BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mIO);
   MOZ_ASSERT(!mIO->IsShutdownOnMainThread());
 
-  XRE_GetIOMessageLoop()->PostTask(
+  mIO->GetIOLoop()->PostTask(
     FROM_HERE,
     new SocketIOSendTask<BluetoothSocketIO, UnixSocketIOBuffer>(mIO, aBuffer));
 }
 
 // |SocketBase|
 
 void
 BluetoothSocket::Close()
@@ -716,19 +732,17 @@ BluetoothSocket::Close()
   }
 
   mIO->CancelDelayedConnectTask();
 
   // From this point on, we consider mIO as being deleted.
   // We sever the relationship here so any future calls to listen or connect
   // will create a new implementation.
   mIO->ShutdownOnMainThread();
-
-  XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
-
+  mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
   mIO = nullptr;
 
   NotifyDisconnect();
 }
 
 void
 BluetoothSocket::OnConnectSuccess()
 {
--- a/dom/bluetooth/bluez/BluetoothSocket.h
+++ b/dom/bluetooth/bluez/BluetoothSocket.h
@@ -11,16 +11,18 @@
 #include <stdlib.h>
 #include "mozilla/ipc/DataSocket.h"
 #include "mozilla/ipc/UnixSocketWatcher.h"
 #include "mozilla/RefPtr.h"
 #include "nsAutoPtr.h"
 #include "nsString.h"
 #include "nsThreadUtils.h"
 
+class MessageLoop;
+
 BEGIN_BLUETOOTH_NAMESPACE
 
 class BluetoothSocketObserver;
 class BluetoothUnixSocketConnector;
 
 class BluetoothSocket final : public mozilla::ipc::DataSocket
 {
 public:
@@ -59,26 +61,49 @@ public:
   bool SendSocketData(const nsACString& aMessage);
 
   /**
    * Starts a task on the socket that will try to connect to a socket in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @param aDelayMs Time delay in milli-seconds.
+   * @param aIOLoop The socket's I/O thread.
+   * @return NS_OK on success, or an XPCOM error code otherwise.
+   */
+  nsresult Connect(BluetoothUnixSocketConnector* aConnector,
+                   int aDelayMs, MessageLoop* aIOLoop);
+
+  /**
+   * Starts a task on the socket that will try to connect to a socket in a
+   * non-blocking manner.
+   *
+   * @param aConnector Connector object for socket type specific functions
+   * @param aDelayMs Time delay in milli-seconds.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   nsresult Connect(BluetoothUnixSocketConnector* aConnector,
                    int aDelayMs = 0);
 
   /**
    * Starts a task on the socket that will try to accept a new connection in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
+   * @param aIOLoop The socket's I/O thread.
+   * @return NS_OK on success, or an XPCOM error code otherwise.
+   */
+  nsresult Listen(BluetoothUnixSocketConnector* aConnector,
+                  MessageLoop* aIOLoop);
+
+  /**
+   * Starts a task on the socket that will try to accept a new connection in a
+   * non-blocking manner.
+   *
+   * @param aConnector Connector object for socket type specific functions
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   nsresult Listen(BluetoothUnixSocketConnector* aConnector);
 
   /**
    * Get the current socket address.
    *
    * @param[out] aDeviceAddress Returns the address string.
--- a/ipc/bluetooth/BluetoothDaemonConnection.cpp
+++ b/ipc/bluetooth/BluetoothDaemonConnection.cpp
@@ -432,60 +432,61 @@ BluetoothDaemonConnectionIO::ShutdownOnI
 //
 // BluetoothDaemonConnection
 //
 
 BluetoothDaemonConnection::BluetoothDaemonConnection(
   BluetoothDaemonPDUConsumer* aPDUConsumer,
   BluetoothDaemonConnectionConsumer* aConsumer,
   int aIndex)
-  : mPDUConsumer(aPDUConsumer)
+  : mIO(nullptr)
+  , mPDUConsumer(aPDUConsumer)
   , mConsumer(aConsumer)
   , mIndex(aIndex)
-  , mIO(nullptr)
 {
   MOZ_ASSERT(mConsumer);
 }
 
 BluetoothDaemonConnection::~BluetoothDaemonConnection()
 { }
 
 // |ConnectionOrientedSocket|
 
 nsresult
 BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector,
+                                         MessageLoop* aIOLoop,
                                          ConnectionOrientedSocketIO*& aIO)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(!mIO);
 
   // |BluetoothDaemonConnection| now owns the connector, but doesn't
   // actually use it. So the connector is stored in an auto pointer
   // to be deleted at the end of the method.
   nsAutoPtr<UnixSocketConnector> connector(aConnector);
 
   SetConnectionStatus(SOCKET_CONNECTING);
 
   mIO = new BluetoothDaemonConnectionIO(
-    XRE_GetIOMessageLoop(), -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
+    aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
     this, mPDUConsumer);
   aIO = mIO;
 
   return NS_OK;
 }
 
 // |DataSocket|
 
 void
 BluetoothDaemonConnection::SendSocketData(UnixSocketIOBuffer* aBuffer)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mIO);
 
-  XRE_GetIOMessageLoop()->PostTask(
+  mIO->GetIOLoop()->PostTask(
     FROM_HERE,
     new SocketIOSendTask<BluetoothDaemonConnectionIO,
                          UnixSocketIOBuffer>(mIO, aBuffer));
 }
 
 // |SocketBase|
 
 void
@@ -493,18 +494,17 @@ BluetoothDaemonConnection::Close()
 {
   MOZ_ASSERT(NS_IsMainThread());
 
   if (!mIO) {
     CHROMIUM_LOG("Bluetooth daemon already disconnected!");
     return;
   }
 
-  XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
-
+  mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
   mIO = nullptr;
 
   NotifyDisconnect();
 }
 
 void
 BluetoothDaemonConnection::OnConnectSuccess()
 {
--- a/ipc/bluetooth/BluetoothDaemonConnection.h
+++ b/ipc/bluetooth/BluetoothDaemonConnection.h
@@ -7,16 +7,18 @@
 #ifndef mozilla_ipc_bluetooth_BluetoothDaemonConnection_h
 #define mozilla_ipc_bluetooth_BluetoothDaemonConnection_h
 
 #include "mozilla/Attributes.h"
 #include "mozilla/FileUtils.h"
 #include "mozilla/ipc/ConnectionOrientedSocket.h"
 #include "nsAutoPtr.h"
 
+class MessageLoop;
+
 namespace mozilla {
 namespace ipc {
 
 class BluetoothDaemonConnectionConsumer;
 class BluetoothDaemonConnectionIO;
 class BluetoothDaemonPDUConsumer;
 
 /*
@@ -118,34 +120,35 @@ public:
                             BluetoothDaemonConnectionConsumer* aConsumer,
                             int aIndex);
   virtual ~BluetoothDaemonConnection();
 
   // Methods for |ConnectionOrientedSocket|
   //
 
   nsresult PrepareAccept(UnixSocketConnector* aConnector,
+                         MessageLoop* aIOLoop,
                          ConnectionOrientedSocketIO*& aIO) override;
 
   // Methods for |DataSocket|
   //
 
   void SendSocketData(UnixSocketIOBuffer* aBuffer) override;
 
   // Methods for |SocketBase|
   //
 
   void Close() override;
   void OnConnectSuccess() override;
   void OnConnectError() override;
   void OnDisconnect() override;
 
 private:
+  BluetoothDaemonConnectionIO* mIO;
   BluetoothDaemonPDUConsumer* mPDUConsumer;
   BluetoothDaemonConnectionConsumer* mConsumer;
   int mIndex;
-  BluetoothDaemonConnectionIO* mIO;
 };
 
 }
 }
 
 #endif
--- a/ipc/unixsocket/ConnectionOrientedSocket.h
+++ b/ipc/unixsocket/ConnectionOrientedSocket.h
@@ -5,16 +5,18 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef mozilla_ipc_connectionorientedsocket_h
 #define mozilla_ipc_connectionorientedsocket_h
 
 #include <sys/socket.h>
 #include "DataSocket.h"
 
+class MessageLoop;
+
 namespace mozilla {
 namespace ipc {
 
 class UnixSocketConnector;
 
 /*
  * |ConnectionOrientedSocketIO| and |ConnectionOrientedSocket| define
  * interfaces for implementing stream sockets on I/O and main thread.
@@ -35,20 +37,22 @@ class ConnectionOrientedSocket : public 
 {
 public:
   /**
    * Prepares an instance of |ConnectionOrientedSocket| in DISCONNECTED
    * state for accepting a connection. Main-thread only.
    *
    * @param aConnector The new connector object, owned by the
    *                   connection-oriented socket.
+   * @param aIOLoop The socket's I/O thread.
    * @param[out] aIO, Returns an instance of |ConnectionOrientedSocketIO|.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   virtual nsresult PrepareAccept(UnixSocketConnector* aConnector,
+                                 MessageLoop* aIOLoop,
                                  ConnectionOrientedSocketIO*& aIO) = 0;
 
 protected:
   virtual ~ConnectionOrientedSocket();
 };
 
 }
 }
--- a/ipc/unixsocket/ListenSocket.cpp
+++ b/ipc/unixsocket/ListenSocket.cpp
@@ -288,76 +288,85 @@ private:
   ConnectionOrientedSocketIO* mCOSocketIO;
 };
 
 //
 // UnixSocketConsumer
 //
 
 ListenSocket::ListenSocket(ListenSocketConsumer* aConsumer, int aIndex)
-  : mConsumer(aConsumer)
+  : mIO(nullptr)
+  , mConsumer(aConsumer)
   , mIndex(aIndex)
-  , mIO(nullptr)
 {
   MOZ_ASSERT(mConsumer);
 }
 
 ListenSocket::~ListenSocket()
 {
   MOZ_ASSERT(!mIO);
 }
 
 nsresult
 ListenSocket::Listen(UnixSocketConnector* aConnector,
+                     MessageLoop* aIOLoop,
                      ConnectionOrientedSocket* aCOSocket)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(!mIO);
 
-  mIO = new ListenSocketIO(XRE_GetIOMessageLoop(), this, aConnector);
+  mIO = new ListenSocketIO(aIOLoop, this, aConnector);
 
   // Prepared I/O object, now start listening.
   nsresult rv = Listen(aCOSocket);
   if (NS_FAILED(rv)) {
     delete mIO;
     mIO = nullptr;
     return rv;
   }
 
   return NS_OK;
 }
 
 nsresult
+ListenSocket::Listen(UnixSocketConnector* aConnector,
+                     ConnectionOrientedSocket* aCOSocket)
+{
+  return Listen(aConnector, XRE_GetIOMessageLoop(), aCOSocket);
+}
+
+nsresult
 ListenSocket::Listen(ConnectionOrientedSocket* aCOSocket)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(aCOSocket);
   MOZ_ASSERT(mIO);
 
   // We first prepare the connection-oriented socket with a
   // socket connector and a socket I/O class.
 
   nsAutoPtr<UnixSocketConnector> connector;
   nsresult rv = mIO->GetConnector()->Duplicate(*connector.StartAssignment());
   if (NS_FAILED(rv)) {
     return rv;
   }
 
   nsAutoPtr<ConnectionOrientedSocketIO> io;
-  rv = aCOSocket->PrepareAccept(connector, *io.StartAssignment());
+  rv = aCOSocket->PrepareAccept(connector, mIO->GetIOLoop(),
+                                *io.StartAssignment());
   if (NS_FAILED(rv)) {
     return rv;
   }
   connector.forget(); // now owned by |io|
 
   // Then we start listening for connection requests.
 
   SetConnectionStatus(SOCKET_LISTENING);
 
-  XRE_GetIOMessageLoop()->PostTask(
+  mIO->GetIOLoop()->PostTask(
     FROM_HERE, new ListenSocketIO::ListenTask(mIO, io.forget()));
 
   return NS_OK;
 }
 
 // |SocketBase|
 
 void
@@ -368,19 +377,17 @@ ListenSocket::Close()
   if (!mIO) {
     return;
   }
 
   // From this point on, we consider mIO as being deleted. We sever
   // the relationship here so any future calls to listen or connect
   // will create a new implementation.
   mIO->ShutdownOnMainThread();
-
-  XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
-
+  mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
   mIO = nullptr;
 
   NotifyDisconnect();
 }
 
 void
 ListenSocket::OnConnectSuccess()
 {
--- a/ipc/unixsocket/ListenSocket.h
+++ b/ipc/unixsocket/ListenSocket.h
@@ -5,31 +5,50 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef mozilla_ipc_listensocket_h
 #define mozilla_ipc_listensocket_h
 
 #include "nsString.h"
 #include "mozilla/ipc/SocketBase.h"
 
+class MessageLoop;
+
 namespace mozilla {
 namespace ipc {
 
 class ConnectionOrientedSocket;
 class ListenSocketConsumer;
 class ListenSocketIO;
 class UnixSocketConnector;
 
 class ListenSocket final : public SocketBase
 {
-protected:
-  virtual ~ListenSocket();
+public:
+  /**
+   * Constructs an instance of |ListenSocket|.
+   *
+   * @param aConsumer The consumer for the socket.
+   * @param aIndex An arbitrary index.
+   */
+  ListenSocket(ListenSocketConsumer* aConsumer, int aIndex);
 
-public:
-  ListenSocket(ListenSocketConsumer* aConsumer, int aIndex);
+  /**
+   * Starts a task on the socket that will try to accept a new connection
+   * in a non-blocking manner.
+   *
+   * @param aConnector Connector object for socket-type-specific functions
+   * @param aIOLoop The socket's I/O thread.
+   * @param aCOSocket The connection-oriented socket for handling the
+   *                  accepted connection.
+   * @return NS_OK on success, or an XPCOM error code otherwise.
+   */
+  nsresult Listen(UnixSocketConnector* aConnector,
+                  MessageLoop* aIOLoop,
+                  ConnectionOrientedSocket* aCOSocket);
 
   /**
    * Starts a task on the socket that will try to accept a new connection
    * in a non-blocking manner.
    *
    * @param aConnector Connector object for socket-type-specific functions
    * @param aCOSocket The connection-oriented socket for handling the
    *                  accepted connection.
@@ -52,18 +71,21 @@ public:
   // Methods for |SocketBase|
   //
 
   void Close() override;
   void OnConnectSuccess() override;
   void OnConnectError() override;
   void OnDisconnect() override;
 
+protected:
+  virtual ~ListenSocket();
+
 private:
+  ListenSocketIO* mIO;
   ListenSocketConsumer* mConsumer;
   int mIndex;
-  ListenSocketIO* mIO;
 };
 
 } // namespace ipc
 } // namepsace mozilla
 
 #endif // mozilla_ipc_listensocket_h
--- a/ipc/unixsocket/StreamSocket.cpp
+++ b/ipc/unixsocket/StreamSocket.cpp
@@ -471,45 +471,45 @@ public:
   }
 };
 
 class StreamSocketIO::DelayedConnectTask final
   : public SocketIOTask<StreamSocketIO>
 {
 public:
   DelayedConnectTask(StreamSocketIO* aIO)
-  : SocketIOTask<StreamSocketIO>(aIO)
+    : SocketIOTask<StreamSocketIO>(aIO)
   { }
 
   void Run() override
   {
     MOZ_ASSERT(NS_IsMainThread());
 
     if (IsCanceled()) {
       return;
     }
 
     StreamSocketIO* io = GetIO();
     if (io->IsShutdownOnMainThread()) {
       return;
     }
 
     io->ClearDelayedConnectTask();
-    XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new ConnectTask(io));
+    io->GetIOLoop()->PostTask(FROM_HERE, new ConnectTask(io));
   }
 };
 
 //
 // StreamSocket
 //
 
 StreamSocket::StreamSocket(StreamSocketConsumer* aConsumer, int aIndex)
-  : mConsumer(aConsumer)
+  : mIO(nullptr)
+  , mConsumer(aConsumer)
   , mIndex(aIndex)
-  , mIO(nullptr)
 {
   MOZ_ASSERT(mConsumer);
 }
 
 StreamSocket::~StreamSocket()
 {
   MOZ_ASSERT(!mIO);
 }
@@ -518,67 +518,73 @@ void
 StreamSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
 {
   MOZ_ASSERT(NS_IsMainThread());
 
   mConsumer->ReceiveSocketData(mIndex, aBuffer);
 }
 
 nsresult
-StreamSocket::Connect(UnixSocketConnector* aConnector,
-                      int aDelayMs)
+StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs,
+                      MessageLoop* aIOLoop)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(!mIO);
 
-  MessageLoop* ioLoop = XRE_GetIOMessageLoop();
-  mIO = new StreamSocketIO(ioLoop, this, aConnector);
+  mIO = new StreamSocketIO(aIOLoop, this, aConnector);
   SetConnectionStatus(SOCKET_CONNECTING);
 
   if (aDelayMs > 0) {
     StreamSocketIO::DelayedConnectTask* connectTask =
       new StreamSocketIO::DelayedConnectTask(mIO);
     mIO->SetDelayedConnectTask(connectTask);
     MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs);
   } else {
-    ioLoop->PostTask(FROM_HERE, new StreamSocketIO::ConnectTask(mIO));
+    aIOLoop->PostTask(FROM_HERE, new StreamSocketIO::ConnectTask(mIO));
   }
   return NS_OK;
 }
 
+nsresult
+StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs)
+{
+  return Connect(aConnector, aDelayMs, XRE_GetIOMessageLoop());
+}
+
 // |ConnectionOrientedSocket|
 
 nsresult
 StreamSocket::PrepareAccept(UnixSocketConnector* aConnector,
+                            MessageLoop* aIOLoop,
                             ConnectionOrientedSocketIO*& aIO)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(!mIO);
   MOZ_ASSERT(aConnector);
 
   SetConnectionStatus(SOCKET_CONNECTING);
 
-  mIO = new StreamSocketIO(XRE_GetIOMessageLoop(),
+  mIO = new StreamSocketIO(aIOLoop,
                            -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
                            this, aConnector);
   aIO = mIO;
 
   return NS_OK;
 }
 
 // |DataSocket|
 
 void
 StreamSocket::SendSocketData(UnixSocketIOBuffer* aBuffer)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mIO);
 
   MOZ_ASSERT(!mIO->IsShutdownOnMainThread());
-  XRE_GetIOMessageLoop()->PostTask(
+  mIO->GetIOLoop()->PostTask(
     FROM_HERE,
     new SocketIOSendTask<StreamSocketIO, UnixSocketIOBuffer>(mIO, aBuffer));
 }
 
 // |SocketBase|
 
 void
 StreamSocket::Close()
@@ -587,19 +593,17 @@ StreamSocket::Close()
   MOZ_ASSERT(mIO);
 
   mIO->CancelDelayedConnectTask();
 
   // From this point on, we consider |mIO| as being deleted. We sever
   // the relationship here so any future calls to |Connect| will create
   // a new I/O object.
   mIO->ShutdownOnMainThread();
-
-  XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
-
+  mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
   mIO = nullptr;
 
   NotifyDisconnect();
 }
 
 void
 StreamSocket::OnConnectSuccess()
 {
--- a/ipc/unixsocket/StreamSocket.h
+++ b/ipc/unixsocket/StreamSocket.h
@@ -4,49 +4,70 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef mozilla_ipc_streamsocket_h
 #define mozilla_ipc_streamsocket_h
 
 #include "ConnectionOrientedSocket.h"
 
+class MessageLoop;
+
 namespace mozilla {
 namespace ipc {
 
 class StreamSocketConsumer;
 class StreamSocketIO;
 class UnixSocketConnector;
 
 class StreamSocket final : public ConnectionOrientedSocket
 {
 public:
+  /**
+   * Constructs an instance of |StreamSocket|.
+   *
+   * @param aConsumer The consumer for the socket.
+   * @param aIndex An arbitrary index.
+   */
   StreamSocket(StreamSocketConsumer* aConsumer, int aIndex);
 
   /**
    * Method to be called whenever data is received. Main-thread only.
    *
    * @param aBuffer Data received from the socket.
    */
   void ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer);
 
   /**
    * Starts a task on the socket that will try to connect to a socket in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @param aDelayMs Time delay in milliseconds.
+   * @param aIOLoop The socket's I/O thread.
+   * @return NS_OK on success, or an XPCOM error code otherwise.
+   */
+  nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs,
+                   MessageLoop* aIOLoop);
+
+  /**
+   * Starts a task on the socket that will try to connect to a socket in a
+   * non-blocking manner.
+   *
+   * @param aConnector Connector object for socket type specific functions
+   * @param aDelayMs Time delay in milliseconds.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs = 0);
 
   // Methods for |ConnectionOrientedSocket|
   //
 
   nsresult PrepareAccept(UnixSocketConnector* aConnector,
+                         MessageLoop* aIOLoop,
                          ConnectionOrientedSocketIO*& aIO) override;
 
   // Methods for |DataSocket|
   //
 
   void SendSocketData(UnixSocketIOBuffer* aBuffer) override;
 
   // Methods for |SocketBase|
@@ -56,17 +77,17 @@ public:
   void OnConnectSuccess() override;
   void OnConnectError() override;
   void OnDisconnect() override;
 
 protected:
   virtual ~StreamSocket();
 
 private:
+  StreamSocketIO* mIO;
   StreamSocketConsumer* mConsumer;
   int mIndex;
-  StreamSocketIO* mIO;
 };
 
 } // namespace ipc
 } // namepsace mozilla
 
 #endif // mozilla_ipc_streamsocket_h