Bug 1168806: Configurable consumer thread for socket IPC classes, r=kmachulis
authorThomas Zimmermann <tdz@users.sourceforge.net>
Tue, 02 Jun 2015 10:01:57 +0200
changeset 277554 075e96a57701c252935f7b8d9ecb517da459073f
parent 277553 ef212da7547858d3bc2c9a27ffcf75bdd7968669
child 277555 fd7c97d41cf392d9e8229e0031c5742d8d586f2b
push id4932
push userjlund@mozilla.com
push dateMon, 10 Aug 2015 18:23:06 +0000
treeherdermozilla-beta@6dd5a4f5f745 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerskmachulis
bugs1168806
milestone41.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1168806: Configurable consumer thread for socket IPC classes, r=kmachulis The consumer thread handles socket creation, destruction, and data processing for socket IPC. Traditionally this has been done on the main thread. This patch extends the socket IPC classes to support arbitrary consumer threads. The thread is configured when establishing a connection, and performs all of the above operations until the socket is closed.
dom/bluetooth/bluedroid/BluetoothSocket.cpp
dom/bluetooth/bluedroid/BluetoothSocket.h
dom/bluetooth/bluez/BluetoothSocket.cpp
dom/bluetooth/bluez/BluetoothSocket.h
ipc/bluetooth/BluetoothDaemonConnection.cpp
ipc/bluetooth/BluetoothDaemonConnection.h
ipc/unixsocket/ConnectionOrientedSocket.cpp
ipc/unixsocket/ConnectionOrientedSocket.h
ipc/unixsocket/DataSocket.cpp
ipc/unixsocket/DataSocket.h
ipc/unixsocket/ListenSocket.cpp
ipc/unixsocket/ListenSocket.h
ipc/unixsocket/SocketBase.cpp
ipc/unixsocket/SocketBase.h
ipc/unixsocket/StreamSocket.cpp
ipc/unixsocket/StreamSocket.h
--- a/dom/bluetooth/bluedroid/BluetoothSocket.cpp
+++ b/dom/bluetooth/bluedroid/BluetoothSocket.cpp
@@ -69,26 +69,29 @@ public:
    */
   enum ConnectionStatus {
     SOCKET_IS_DISCONNECTED = 0,
     SOCKET_IS_LISTENING,
     SOCKET_IS_CONNECTING,
     SOCKET_IS_CONNECTED
   };
 
-  DroidSocketImpl(MessageLoop* aIOLoop, BluetoothSocket* aConsumer)
+  DroidSocketImpl(nsIThread* aConsumerThread,
+                  MessageLoop* aIOLoop,
+                  BluetoothSocket* aConsumer)
     : ipc::UnixFdWatcher(aIOLoop)
+    , DataSocketIO(aConsumerThread)
     , mConsumer(aConsumer)
     , mShuttingDownOnIOThread(false)
     , mConnectionStatus(SOCKET_IS_DISCONNECTED)
   { }
 
   ~DroidSocketImpl()
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(IsConsumerThread());
   }
 
   void Send(UnixSocketIOBuffer* aBuffer)
   {
     EnqueueData(aBuffer);
     AddWatchers(WRITE_WATCHER, false);
   }
 
@@ -137,35 +140,36 @@ public:
 
   SocketBase* GetSocketBase() override
   {
     return GetDataSocket();
   }
 
   bool IsShutdownOnMainThread() const override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(IsConsumerThread());
+
     return mConsumer == nullptr;
   }
 
   bool IsShutdownOnIOThread() const override
   {
     return mShuttingDownOnIOThread;
   }
 
   void ShutdownOnMainThread() override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(IsConsumerThread());
     MOZ_ASSERT(!IsShutdownOnMainThread());
     mConsumer = nullptr;
   }
 
   void ShutdownOnIOThread() override
   {
-    MOZ_ASSERT(!NS_IsMainThread());
+    MOZ_ASSERT(!IsConsumerThread());
     MOZ_ASSERT(!mShuttingDownOnIOThread);
 
     Close(); // will also remove fd from I/O loop
     mShuttingDownOnIOThread = true;
   }
 
 private:
   class ReceiveRunnable;
@@ -209,17 +213,17 @@ class SocketConnectTask final : public S
 public:
   SocketConnectTask(DroidSocketImpl* aDroidSocketImpl, int aFd)
   : SocketIOTask<DroidSocketImpl>(aDroidSocketImpl)
   , mFd(aFd)
   { }
 
   void Run() override
   {
-    MOZ_ASSERT(!NS_IsMainThread());
+    MOZ_ASSERT(!GetIO()->IsConsumerThread());
     MOZ_ASSERT(!IsCanceled());
 
     GetIO()->Connect(mFd);
   }
 
 private:
   int mFd;
 };
@@ -229,17 +233,17 @@ class SocketListenTask final : public So
 public:
   SocketListenTask(DroidSocketImpl* aDroidSocketImpl, int aFd)
   : SocketIOTask<DroidSocketImpl>(aDroidSocketImpl)
   , mFd(aFd)
   { }
 
   void Run() override
   {
-    MOZ_ASSERT(!NS_IsMainThread());
+    MOZ_ASSERT(!GetIO()->IsConsumerThread());
 
     if (!IsCanceled()) {
       GetIO()->Listen(mFd);
     }
   }
 
 private:
   int mFd;
@@ -249,17 +253,17 @@ class SocketConnectClientFdTask final
 : public SocketIOTask<DroidSocketImpl>
 {
   SocketConnectClientFdTask(DroidSocketImpl* aImpl)
   : SocketIOTask<DroidSocketImpl>(aImpl)
   { }
 
   void Run() override
   {
-    MOZ_ASSERT(!NS_IsMainThread());
+    MOZ_ASSERT(!GetIO()->IsConsumerThread());
 
     GetIO()->ConnectClientFd();
   }
 };
 
 void
 DroidSocketImpl::Connect(int aFd)
 {
@@ -309,18 +313,19 @@ DroidSocketImpl::Accept(int aFd)
   if (!(flags & O_NONBLOCK)) {
     int res = TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFL, flags | O_NONBLOCK));
     NS_ENSURE_TRUE_VOID(!res);
   }
 
   SetFd(aFd);
   mConnectionStatus = SOCKET_IS_CONNECTED;
 
-  NS_DispatchToMainThread(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
+  GetConsumerThread()->Dispatch(
+    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
+    NS_DISPATCH_NORMAL);
 
   AddWatchers(READ_WATCHER, true);
   if (HasPendingData()) {
     AddWatchers(WRITE_WATCHER, false);
   }
 }
 
 void
@@ -333,17 +338,17 @@ DroidSocketImpl::OnFileCanReadWithoutBlo
   } else {
     NS_NOTREACHED("invalid connection state for reading");
   }
 }
 
 void
 DroidSocketImpl::OnSocketCanReceiveWithoutBlocking(int aFd)
 {
-  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(!IsConsumerThread());
   MOZ_ASSERT(!mShuttingDownOnIOThread);
 
   ssize_t res = ReceiveData(aFd);
   if (res < 0) {
     /* I/O error */
     RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
   } else if (!res) {
     /* EOF or peer shutdown */
@@ -356,17 +361,17 @@ class AcceptTask final : public SocketIO
 public:
   AcceptTask(DroidSocketImpl* aDroidSocketImpl, int aFd)
   : SocketIOTask<DroidSocketImpl>(aDroidSocketImpl)
   , mFd(aFd)
   { }
 
   void Run() override
   {
-    MOZ_ASSERT(!NS_IsMainThread());
+    MOZ_ASSERT(!GetIO()->IsConsumerThread());
     MOZ_ASSERT(!IsCanceled());
 
     GetIO()->Accept(mFd);
   }
 
 private:
   int mFd;
 };
@@ -378,17 +383,17 @@ public:
   : mImpl(aImpl)
   {
     MOZ_ASSERT(mImpl);
   }
 
   void Accept(int aFd, const nsAString& aBdAddress,
               int aConnectionStatus) override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(mImpl->IsConsumerThread());
 
     mozilla::ScopedClose fd(aFd); // Close received socket fd on error
 
     if (mImpl->IsShutdownOnMainThread()) {
       BT_LOGD("mConsumer is null, aborting receive!");
       return;
     }
 
@@ -399,17 +404,17 @@ public:
 
     mImpl->mConsumer->SetAddress(aBdAddress);
     mImpl->GetIOLoop()->PostTask(FROM_HERE,
                                  new AcceptTask(mImpl, fd.forget()));
   }
 
   void OnError(BluetoothStatus aStatus) override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(mImpl->IsConsumerThread());
     BT_LOGR("BluetoothSocketInterface::Accept failed: %d", (int)aStatus);
 
     if (!mImpl->IsShutdownOnMainThread()) {
       // Instead of NotifyError(), call NotifyDisconnect() to trigger
       // BluetoothOppManager::OnSocketDisconnect() as
       // DroidSocketImpl::OnFileCanReadWithoutBlocking() in Firefox OS 2.0 in
       // order to keep the same behavior and reduce regression risk.
       mImpl->mConsumer->NotifyDisconnect();
@@ -425,17 +430,17 @@ class AcceptRunnable final : public Sock
 public:
   AcceptRunnable(DroidSocketImpl* aImpl, int aFd)
   : SocketIORunnable<DroidSocketImpl>(aImpl)
   , mFd(aFd)
   { }
 
   NS_IMETHOD Run() override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(GetIO()->IsConsumerThread());
     MOZ_ASSERT(sBluetoothSocketInterface);
 
     BluetoothSocketResultHandler* res = new AcceptResultHandler(GetIO());
     GetIO()->mConsumer->SetCurrentResultHandler(res);
 
     sBluetoothSocketInterface->Accept(mFd, res);
 
     return NS_OK;
@@ -443,26 +448,26 @@ public:
 
 private:
   int mFd;
 };
 
 void
 DroidSocketImpl::OnSocketCanAcceptWithoutBlocking(int aFd)
 {
-  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(!IsConsumerThread());
   MOZ_ASSERT(!mShuttingDownOnIOThread);
 
   /* When a listening socket is ready for receiving data,
    * we can call |Accept| on it.
    */
 
   RemoveWatchers(READ_WATCHER);
-  nsRefPtr<AcceptRunnable> t = new AcceptRunnable(this, aFd);
-  NS_DispatchToMainThread(t);
+  GetConsumerThread()->Dispatch(new AcceptRunnable(this, aFd),
+                                NS_DISPATCH_NORMAL);
 }
 
 void
 DroidSocketImpl::OnFileCanWriteWithoutBlocking(int aFd)
 {
   if (mConnectionStatus == SOCKET_IS_CONNECTED) {
     OnSocketCanSendWithoutBlocking(aFd);
   } else if (mConnectionStatus == SOCKET_IS_CONNECTING) {
@@ -470,44 +475,45 @@ DroidSocketImpl::OnFileCanWriteWithoutBl
   } else {
     NS_NOTREACHED("invalid connection state for writing");
   }
 }
 
 void
 DroidSocketImpl::OnSocketCanSendWithoutBlocking(int aFd)
 {
-  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(!IsConsumerThread());
   MOZ_ASSERT(!mShuttingDownOnIOThread);
   MOZ_ASSERT(aFd >= 0);
 
   nsresult rv = SendPendingData(aFd);
   if (NS_FAILED(rv)) {
     return;
   }
 
   if (HasPendingData()) {
     AddWatchers(WRITE_WATCHER, false);
   }
 }
 
 void
 DroidSocketImpl::OnSocketCanConnectWithoutBlocking(int aFd)
 {
-  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(!IsConsumerThread());
   MOZ_ASSERT(!mShuttingDownOnIOThread);
 
   /* We follow Posix behaviour here: Connect operations are
    * complete once we can write to the connecting socket.
    */
 
   mConnectionStatus = SOCKET_IS_CONNECTED;
 
-  NS_DispatchToMainThread(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
+  GetConsumerThread()->Dispatch(
+    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
+    NS_DISPATCH_NORMAL);
 
   AddWatchers(READ_WATCHER, true);
   if (HasPendingData()) {
     AddWatchers(WRITE_WATCHER, false);
   }
 }
 
 // |DataSocketIO|
@@ -536,19 +542,19 @@ class DroidSocketImpl::ReceiveRunnable f
 public:
   ReceiveRunnable(DroidSocketImpl* aIO, UnixSocketBuffer* aBuffer)
     : SocketIORunnable<DroidSocketImpl>(aIO)
     , mBuffer(aBuffer)
   { }
 
   NS_IMETHOD Run() override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    DroidSocketImpl* io = SocketIORunnable<DroidSocketImpl>::GetIO();
 
-    DroidSocketImpl* io = SocketIORunnable<DroidSocketImpl>::GetIO();
+    MOZ_ASSERT(io->IsConsumerThread());
 
     if (NS_WARN_IF(io->IsShutdownOnMainThread())) {
       // Since we've already explicitly closed and the close
       // happened before this, this isn't really an error.
       return NS_OK;
     }
 
     BluetoothSocket* bluetoothSocket = io->GetBluetoothSocket();
@@ -561,17 +567,18 @@ public:
 
 private:
   nsAutoPtr<UnixSocketBuffer> mBuffer;
 };
 
 void
 DroidSocketImpl::ConsumeBuffer()
 {
-  NS_DispatchToMainThread(new ReceiveRunnable(this, mBuffer.forget()));
+  GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
+                                NS_DISPATCH_NORMAL);
 }
 
 void
 DroidSocketImpl::DiscardBuffer()
 {
   // Nothing to do.
 }
 
@@ -597,17 +604,17 @@ public:
   : mImpl(aImpl)
   {
     MOZ_ASSERT(mImpl);
   }
 
   void Connect(int aFd, const nsAString& aBdAddress,
                int aConnectionStatus) override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(mImpl->IsConsumerThread());
 
     if (mImpl->IsShutdownOnMainThread()) {
       BT_LOGD("mConsumer is null, aborting send!");
       return;
     }
 
     if (aConnectionStatus != 0) {
       mImpl->mConsumer->NotifyError();
@@ -616,17 +623,17 @@ public:
 
     mImpl->mConsumer->SetAddress(aBdAddress);
     mImpl->GetIOLoop()->PostTask(FROM_HERE,
                                  new SocketConnectTask(mImpl, aFd));
   }
 
   void OnError(BluetoothStatus aStatus) override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(mImpl->IsConsumerThread());
     BT_WARNING("Connect failed: %d", (int)aStatus);
 
     if (!mImpl->IsShutdownOnMainThread()) {
       // Instead of NotifyError(), call NotifyDisconnect() to trigger
       // BluetoothOppManager::OnSocketDisconnect() as
       // DroidSocketImpl::OnFileCanReadWithoutBlocking() in Firefox OS 2.0 in
       // order to keep the same behavior and reduce regression risk.
       mImpl->mConsumer->NotifyDisconnect();
@@ -638,24 +645,24 @@ private:
 };
 
 nsresult
 BluetoothSocket::Connect(const nsAString& aDeviceAddress,
                          const BluetoothUuid& aServiceUuid,
                          BluetoothSocketType aType,
                          int aChannel,
                          bool aAuth, bool aEncrypt,
+                         nsIThread* aConsumerThread,
                          MessageLoop* aIOLoop)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(!mImpl);
 
   SetConnectionStatus(SOCKET_CONNECTING);
 
-  mImpl = new DroidSocketImpl(aIOLoop, this);
+  mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this);
 
   BluetoothSocketResultHandler* res = new ConnectSocketResultHandler(mImpl);
   SetCurrentResultHandler(res);
 
   sBluetoothSocketInterface->Connect(
     aDeviceAddress, aType,
     aServiceUuid.mUuid, aChannel,
     aEncrypt, aAuth, res);
@@ -665,61 +672,67 @@ BluetoothSocket::Connect(const nsAString
 
 nsresult
 BluetoothSocket::Connect(const nsAString& aDeviceAddress,
                          const BluetoothUuid& aServiceUuid,
                          BluetoothSocketType aType,
                          int aChannel,
                          bool aAuth, bool aEncrypt)
 {
+  nsIThread* consumerThread = nullptr;
+  nsresult rv = NS_GetCurrentThread(&consumerThread);
+  if (NS_FAILED(rv)) {
+    return rv;
+  }
+
   return Connect(aDeviceAddress, aServiceUuid, aType, aChannel, aAuth,
-                 aEncrypt, XRE_GetIOMessageLoop());
+                 aEncrypt, consumerThread, XRE_GetIOMessageLoop());
 }
 
 class ListenResultHandler final : public BluetoothSocketResultHandler
 {
 public:
   ListenResultHandler(DroidSocketImpl* aImpl)
   : mImpl(aImpl)
   {
     MOZ_ASSERT(mImpl);
   }
 
   void Listen(int aFd) override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(mImpl->IsConsumerThread());
 
     mImpl->GetIOLoop()->PostTask(FROM_HERE, new SocketListenTask(mImpl, aFd));
   }
 
   void OnError(BluetoothStatus aStatus) override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(mImpl->IsConsumerThread());
 
     BT_WARNING("Listen failed: %d", (int)aStatus);
   }
 
 private:
   DroidSocketImpl* mImpl;
 };
 
 nsresult
 BluetoothSocket::Listen(const nsAString& aServiceName,
                         const BluetoothUuid& aServiceUuid,
                         BluetoothSocketType aType,
                         int aChannel,
                         bool aAuth, bool aEncrypt,
+                        nsIThread* aConsumerThread,
                         MessageLoop* aIOLoop)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(!mImpl);
 
   SetConnectionStatus(SOCKET_LISTENING);
 
-  mImpl = new DroidSocketImpl(aIOLoop, this);
+  mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this);
 
   BluetoothSocketResultHandler* res = new ListenResultHandler(mImpl);
   SetCurrentResultHandler(res);
 
   sBluetoothSocketInterface->Listen(
     aType,
     aServiceName, aServiceUuid.mUuid, aChannel,
     aEncrypt, aAuth, res);
@@ -729,54 +742,61 @@ BluetoothSocket::Listen(const nsAString&
 
 nsresult
 BluetoothSocket::Listen(const nsAString& aServiceName,
                         const BluetoothUuid& aServiceUuid,
                         BluetoothSocketType aType,
                         int aChannel,
                         bool aAuth, bool aEncrypt)
 {
+  nsIThread* consumerThread = nullptr;
+  nsresult rv = NS_GetCurrentThread(&consumerThread);
+  if (NS_FAILED(rv)) {
+    return rv;
+  }
+
   return Listen(aServiceName, aServiceUuid, aType, aChannel, aAuth, aEncrypt,
-                XRE_GetIOMessageLoop());
+                consumerThread, XRE_GetIOMessageLoop());
 }
 
 void
 BluetoothSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mObserver);
 
   mObserver->ReceiveSocketData(this, aBuffer);
 }
 
 // |DataSocket|
 
 void
 BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mImpl);
+  MOZ_ASSERT(mImpl->IsConsumerThread());
   MOZ_ASSERT(!mImpl->IsShutdownOnMainThread());
 
   mImpl->GetIOLoop()->PostTask(
     FROM_HERE,
     new SocketIOSendTask<DroidSocketImpl, UnixSocketIOBuffer>(mImpl, aBuffer));
 }
 
 // |SocketBase|
 
 void
 BluetoothSocket::Close()
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(sBluetoothSocketInterface);
+
   if (!mImpl) {
     return;
   }
 
+  MOZ_ASSERT(mImpl->IsConsumerThread());
+
   // Stop any watching |SocketMessageWatcher|
   if (mCurrentRes) {
     sBluetoothSocketInterface->Close(mCurrentRes);
   }
 
   // From this point on, we consider mImpl as being deleted.
   // We sever the relationship here so any future calls to listen or connect
   // will create a new implementation.
@@ -785,32 +805,29 @@ BluetoothSocket::Close()
   mImpl = nullptr;
 
   NotifyDisconnect();
 }
 
 void
 BluetoothSocket::OnConnectSuccess()
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mObserver);
 
   SetCurrentResultHandler(nullptr);
   mObserver->OnSocketConnectSuccess(this);
 }
 
 void
 BluetoothSocket::OnConnectError()
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mObserver);
 
   SetCurrentResultHandler(nullptr);
   mObserver->OnSocketConnectError(this);
 }
 
 void
 BluetoothSocket::OnDisconnect()
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mObserver);
   mObserver->OnSocketDisconnect(this);
 }
--- a/dom/bluetooth/bluedroid/BluetoothSocket.h
+++ b/dom/bluetooth/bluedroid/BluetoothSocket.h
@@ -6,16 +6,17 @@
 
 #ifndef mozilla_dom_bluetooth_BluetoothSocket_h
 #define mozilla_dom_bluetooth_BluetoothSocket_h
 
 #include "BluetoothCommon.h"
 #include "mozilla/ipc/DataSocket.h"
 
 class MessageLoop;
+class nsIThread;
 
 BEGIN_BLUETOOTH_NAMESPACE
 
 class BluetoothSocketObserver;
 class BluetoothSocketResultHandler;
 class DroidSocketImpl;
 
 class BluetoothSocket final : public mozilla::ipc::DataSocket
@@ -23,29 +24,31 @@ class BluetoothSocket final : public moz
 public:
   BluetoothSocket(BluetoothSocketObserver* aObserver);
 
   nsresult Connect(const nsAString& aDeviceAddress,
                    const BluetoothUuid& aServiceUuid,
                    BluetoothSocketType aType,
                    int aChannel,
                    bool aAuth, bool aEncrypt,
+                   nsIThread* aConsumerThread,
                    MessageLoop* aIOLoop);
 
   nsresult Connect(const nsAString& aDeviceAddress,
                    const BluetoothUuid& aServiceUuid,
                    BluetoothSocketType aType,
                    int aChannel,
                    bool aAuth, bool aEncrypt);
 
   nsresult Listen(const nsAString& aServiceName,
                   const BluetoothUuid& aServiceUuid,
                   BluetoothSocketType aType,
                   int aChannel,
                   bool aAuth, bool aEncrypt,
+                  nsIThread* aConsumerThread,
                   MessageLoop* aIOLoop);
 
   nsresult Listen(const nsAString& aServiceName,
                   const BluetoothUuid& aServiceUuid,
                   BluetoothSocketType aType,
                   int aChannel,
                   bool aAuth, bool aEncrypt);
 
--- a/dom/bluetooth/bluez/BluetoothSocket.cpp
+++ b/dom/bluetooth/bluez/BluetoothSocket.cpp
@@ -23,17 +23,18 @@ static const size_t MAX_READ_SIZE = 1 <<
 // BluetoothSocketIO
 //
 
 class BluetoothSocket::BluetoothSocketIO final
   : public UnixSocketWatcher
   , public DataSocketIO
 {
 public:
-  BluetoothSocketIO(MessageLoop* mIOLoop,
+  BluetoothSocketIO(nsIThread* aConsumerThread,
+                    MessageLoop* aIOLoop,
                     BluetoothSocket* aConsumer,
                     UnixSocketConnector* aConnector);
   ~BluetoothSocketIO();
 
   void GetSocketAddr(nsAString& aAddrStr) const;
 
   BluetoothSocket* GetBluetoothSocket();
   DataSocket* GetDataSocket();
@@ -127,33 +128,35 @@ private:
 
   /**
    * I/O buffer for received data
    */
   nsAutoPtr<UnixSocketRawData> mBuffer;
 };
 
 BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO(
-  MessageLoop* mIOLoop,
+  nsIThread* aConsumerThread,
+  MessageLoop* aIOLoop,
   BluetoothSocket* aConsumer,
   UnixSocketConnector* aConnector)
-  : UnixSocketWatcher(mIOLoop)
+  : UnixSocketWatcher(aIOLoop)
+  , DataSocketIO(aConsumerThread)
   , mConsumer(aConsumer)
   , mConnector(aConnector)
   , mShuttingDownOnIOThread(false)
   , mAddressLength(0)
   , mDelayedConnectTask(nullptr)
 {
   MOZ_ASSERT(mConsumer);
   MOZ_ASSERT(mConnector);
 }
 
 BluetoothSocket::BluetoothSocketIO::~BluetoothSocketIO()
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
   MOZ_ASSERT(IsShutdownOnMainThread());
 }
 
 void
 BluetoothSocket::BluetoothSocketIO::GetSocketAddr(nsAString& aAddrStr) const
 {
   if (!mConnector) {
     NS_WARNING("No connector to get socket address from!");
@@ -183,33 +186,33 @@ DataSocket*
 BluetoothSocket::BluetoothSocketIO::GetDataSocket()
 {
   return GetBluetoothSocket();
 }
 
 void
 BluetoothSocket::BluetoothSocketIO::SetDelayedConnectTask(CancelableTask* aTask)
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
 
   mDelayedConnectTask = aTask;
 }
 
 void
 BluetoothSocket::BluetoothSocketIO::ClearDelayedConnectTask()
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
 
   mDelayedConnectTask = nullptr;
 }
 
 void
 BluetoothSocket::BluetoothSocketIO::CancelDelayedConnectTask()
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
 
   if (!mDelayedConnectTask) {
     return;
   }
 
   mDelayedConnectTask->Cancel();
   ClearDelayedConnectTask();
 }
@@ -272,18 +275,19 @@ BluetoothSocket::BluetoothSocketIO::Send
 }
 
 void
 BluetoothSocket::BluetoothSocketIO::OnConnected()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
 
-  NS_DispatchToMainThread(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
+  GetConsumerThread()->Dispatch(
+    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
+    NS_DISPATCH_NORMAL);
 
   AddWatchers(READ_WATCHER, true);
   if (HasPendingData()) {
     AddWatchers(WRITE_WATCHER, false);
   }
 }
 
 void
@@ -321,18 +325,19 @@ BluetoothSocket::BluetoothSocketIO::OnSo
   if (NS_WARN_IF(NS_FAILED(rv))) {
     FireSocketError();
     return;
   }
 
   Close();
   SetSocket(fd, SOCKET_IS_CONNECTED);
 
-  NS_DispatchToMainThread(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
+  GetConsumerThread()->Dispatch(
+    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
+    NS_DISPATCH_NORMAL);
 
   AddWatchers(READ_WATCHER, true);
   if (HasPendingData()) {
     AddWatchers(WRITE_WATCHER, false);
   }
 }
 
 void
@@ -371,18 +376,19 @@ void
 BluetoothSocket::BluetoothSocketIO::FireSocketError()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
 
   // Clean up watchers, statuses, fds
   Close();
 
   // Tell the main thread we've errored
-  NS_DispatchToMainThread(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR));
+  GetConsumerThread()->Dispatch(
+    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
+    NS_DISPATCH_NORMAL);
 
 }
 
 // |DataSocketIO|
 
 nsresult
 BluetoothSocket::BluetoothSocketIO::QueryReceiveBuffer(
   UnixSocketIOBuffer** aBuffer)
@@ -407,19 +413,19 @@ class BluetoothSocket::BluetoothSocketIO
 public:
   ReceiveRunnable(BluetoothSocketIO* aIO, UnixSocketBuffer* aBuffer)
     : SocketIORunnable<BluetoothSocketIO>(aIO)
     , mBuffer(aBuffer)
   { }
 
   NS_IMETHOD Run() override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    BluetoothSocketIO* io = SocketIORunnable<BluetoothSocketIO>::GetIO();
 
-    BluetoothSocketIO* io = SocketIORunnable<BluetoothSocketIO>::GetIO();
+    MOZ_ASSERT(io->IsConsumerThread());
 
     if (NS_WARN_IF(io->IsShutdownOnMainThread())) {
       // Since we've already explicitly closed and the close
       // happened before this, this isn't really an error.
       return NS_OK;
     }
 
     BluetoothSocket* bluetoothSocket = io->GetBluetoothSocket();
@@ -432,17 +438,18 @@ public:
 
 private:
   nsAutoPtr<UnixSocketBuffer> mBuffer;
 };
 
 void
 BluetoothSocket::BluetoothSocketIO::ConsumeBuffer()
 {
-  NS_DispatchToMainThread(new ReceiveRunnable(this, mBuffer.forget()));
+  GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
+                                NS_DISPATCH_NORMAL);
 }
 
 void
 BluetoothSocket::BluetoothSocketIO::DiscardBuffer()
 {
   // Nothing to do.
 }
 
@@ -452,40 +459,40 @@ SocketBase*
 BluetoothSocket::BluetoothSocketIO::GetSocketBase()
 {
   return GetDataSocket();
 }
 
 bool
 BluetoothSocket::BluetoothSocketIO::IsShutdownOnMainThread() const
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
 
   return mConsumer == nullptr;
 }
 
 void
 BluetoothSocket::BluetoothSocketIO::ShutdownOnMainThread()
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
   MOZ_ASSERT(!IsShutdownOnMainThread());
 
   mConsumer = nullptr;
 }
 
 bool
 BluetoothSocket::BluetoothSocketIO::IsShutdownOnIOThread() const
 {
   return mShuttingDownOnIOThread;
 }
 
 void
 BluetoothSocket::BluetoothSocketIO::ShutdownOnIOThread()
 {
-  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(!IsConsumerThread());
   MOZ_ASSERT(!mShuttingDownOnIOThread);
 
   Close(); // will also remove fd from I/O loop
   mShuttingDownOnIOThread = true;
 }
 
 
 //
@@ -497,17 +504,17 @@ class BluetoothSocket::ListenTask final
 {
 public:
   ListenTask(BluetoothSocketIO* aIO)
     : SocketIOTask<BluetoothSocketIO>(aIO)
   { }
 
   void Run() override
   {
-    MOZ_ASSERT(!NS_IsMainThread());
+    MOZ_ASSERT(!GetIO()->IsConsumerThread());
 
     if (!IsCanceled()) {
       GetIO()->Listen();
     }
   }
 };
 
 class BluetoothSocket::ConnectTask final
@@ -515,34 +522,34 @@ class BluetoothSocket::ConnectTask final
 {
 public:
   ConnectTask(BluetoothSocketIO* aIO)
     : SocketIOTask<BluetoothSocketIO>(aIO)
   { }
 
   void Run() override
   {
-    MOZ_ASSERT(!NS_IsMainThread());
+    MOZ_ASSERT(!GetIO()->IsConsumerThread());
     MOZ_ASSERT(!IsCanceled());
 
     GetIO()->Connect();
   }
 };
 
 class BluetoothSocket::DelayedConnectTask final
   : public SocketIOTask<BluetoothSocketIO>
 {
 public:
   DelayedConnectTask(BluetoothSocketIO* aIO)
     : SocketIOTask<BluetoothSocketIO>(aIO)
   { }
 
   void Run() override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(GetIO()->IsConsumerThread());
 
     if (IsCanceled()) {
       return;
     }
 
     BluetoothSocketIO* io = GetIO();
     if (io->IsShutdownOnMainThread()) {
       return;
@@ -571,17 +578,16 @@ BluetoothSocket::~BluetoothSocket()
 
 nsresult
 BluetoothSocket::Connect(const nsAString& aDeviceAddress,
                          const BluetoothUuid& aServiceUuid,
                          BluetoothSocketType aType,
                          int aChannel,
                          bool aAuth, bool aEncrypt)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(!aDeviceAddress.IsEmpty());
 
   nsAutoPtr<BluetoothUnixSocketConnector> connector(
     new BluetoothUnixSocketConnector(NS_ConvertUTF16toUTF8(aDeviceAddress),
                                      aType, aChannel, aAuth, aEncrypt));
 
   nsresult rv = Connect(connector);
   if (NS_FAILED(rv)) {
@@ -598,18 +604,16 @@ BluetoothSocket::Connect(const nsAString
 
 nsresult
 BluetoothSocket::Listen(const nsAString& aServiceName,
                         const BluetoothUuid& aServiceUuid,
                         BluetoothSocketType aType,
                         int aChannel,
                         bool aAuth, bool aEncrypt)
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   nsAutoPtr<BluetoothUnixSocketConnector> connector(
     new BluetoothUnixSocketConnector(NS_LITERAL_CSTRING(BLUETOOTH_ADDRESS_NONE),
                                      aType, aChannel, aAuth, aEncrypt));
 
   nsresult rv = Listen(connector);
   if (NS_FAILED(rv)) {
     nsAutoString addr;
     GetAddress(addr);
@@ -620,17 +624,16 @@ BluetoothSocket::Listen(const nsAString&
   connector.forget();
 
   return NS_OK;
 }
 
 void
 BluetoothSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mObserver);
 
   mObserver->ReceiveSocketData(this, aBuffer);
 }
 
 bool
 BluetoothSocket::SendSocketData(const nsACString& aStr)
 {
@@ -640,24 +643,25 @@ BluetoothSocket::SendSocketData(const ns
 
   SendSocketData(new UnixSocketRawData(aStr.BeginReading(), aStr.Length()));
 
   return true;
 }
 
 nsresult
 BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector,
-                         int aDelayMs, MessageLoop* aIOLoop)
+                         int aDelayMs,
+                         nsIThread* aConsumerThread, MessageLoop* aIOLoop)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(aConnector);
+  MOZ_ASSERT(aConsumerThread);
   MOZ_ASSERT(aIOLoop);
   MOZ_ASSERT(!mIO);
 
-  mIO = new BluetoothSocketIO(aIOLoop, this, aConnector);
+  mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector);
   SetConnectionStatus(SOCKET_CONNECTING);
 
   if (aDelayMs > 0) {
     DelayedConnectTask* connectTask = new DelayedConnectTask(mIO);
     mIO->SetDelayedConnectTask(connectTask);
     MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs);
   } else {
     aIOLoop->PostTask(FROM_HERE, new ConnectTask(mIO));
@@ -665,40 +669,52 @@ BluetoothSocket::Connect(BluetoothUnixSo
 
   return NS_OK;
 }
 
 nsresult
 BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector,
                          int aDelayMs)
 {
-  return Connect(aConnector, aDelayMs, XRE_GetIOMessageLoop());
+  nsIThread* consumerThread = nullptr;
+  nsresult rv = NS_GetCurrentThread(&consumerThread);
+  if (NS_FAILED(rv)) {
+    return rv;
+  }
+
+  return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop());
 }
 
 nsresult
 BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector,
-                        MessageLoop* aIOLoop)
+                        nsIThread* aConsumerThread, MessageLoop* aIOLoop)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(aConnector);
+  MOZ_ASSERT(aConsumerThread);
   MOZ_ASSERT(aIOLoop);
   MOZ_ASSERT(!mIO);
 
-  mIO = new BluetoothSocketIO(aIOLoop, this, aConnector);
+  mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector);
   SetConnectionStatus(SOCKET_LISTENING);
 
   aIOLoop->PostTask(FROM_HERE, new ListenTask(mIO));
 
   return NS_OK;
 }
 
 nsresult
 BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector)
 {
-  return Listen(aConnector, XRE_GetIOMessageLoop());
+  nsIThread* consumerThread = nullptr;
+  nsresult rv = NS_GetCurrentThread(&consumerThread);
+  if (NS_FAILED(rv)) {
+    return rv;
+  }
+
+  return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop());
 }
 
 void
 BluetoothSocket::GetAddress(nsAString& aAddrStr)
 {
   aAddrStr.Truncate();
   if (!mIO || GetConnectionStatus() != SOCKET_CONNECTED) {
     NS_WARNING("No socket currently open!");
@@ -707,64 +723,62 @@ BluetoothSocket::GetAddress(nsAString& a
   mIO->GetSocketAddr(aAddrStr);
 }
 
 // |DataSocket|
 
 void
 BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mIO);
+  MOZ_ASSERT(mIO->IsConsumerThread());
   MOZ_ASSERT(!mIO->IsShutdownOnMainThread());
 
   mIO->GetIOLoop()->PostTask(
     FROM_HERE,
     new SocketIOSendTask<BluetoothSocketIO, UnixSocketIOBuffer>(mIO, aBuffer));
 }
 
 // |SocketBase|
 
 void
 BluetoothSocket::Close()
 {
-  MOZ_ASSERT(NS_IsMainThread());
   if (!mIO) {
     return;
   }
 
+  MOZ_ASSERT(mIO->IsConsumerThread());
+
   mIO->CancelDelayedConnectTask();
 
   // From this point on, we consider mIO as being deleted.
   // We sever the relationship here so any future calls to listen or connect
   // will create a new implementation.
   mIO->ShutdownOnMainThread();
   mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
   mIO = nullptr;
 
   NotifyDisconnect();
 }
 
 void
 BluetoothSocket::OnConnectSuccess()
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mObserver);
   mObserver->OnSocketConnectSuccess(this);
 }
 
 void
 BluetoothSocket::OnConnectError()
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mObserver);
   mObserver->OnSocketConnectError(this);
 }
 
 void
 BluetoothSocket::OnDisconnect()
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mObserver);
   mObserver->OnSocketDisconnect(this);
 }
 
 END_BLUETOOTH_NAMESPACE
--- a/dom/bluetooth/bluez/BluetoothSocket.h
+++ b/dom/bluetooth/bluez/BluetoothSocket.h
@@ -61,21 +61,22 @@ public:
   bool SendSocketData(const nsACString& aMessage);
 
   /**
    * Starts a task on the socket that will try to connect to a socket in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @param aDelayMs Time delay in milli-seconds.
+   * @param aConsumerThread The socket's consumer thread.
    * @param aIOLoop The socket's I/O thread.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
-  nsresult Connect(BluetoothUnixSocketConnector* aConnector,
-                   int aDelayMs, MessageLoop* aIOLoop);
+  nsresult Connect(BluetoothUnixSocketConnector* aConnector, int aDelayMs,
+                   nsIThread* aConsumerThread, MessageLoop* aIOLoop);
 
   /**
    * Starts a task on the socket that will try to connect to a socket in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @param aDelayMs Time delay in milli-seconds.
    * @return NS_OK on success, or an XPCOM error code otherwise.
@@ -83,21 +84,22 @@ public:
   nsresult Connect(BluetoothUnixSocketConnector* aConnector,
                    int aDelayMs = 0);
 
   /**
    * Starts a task on the socket that will try to accept a new connection in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
+   * @param aConsumerThread The socket's consumer thread.
    * @param aIOLoop The socket's I/O thread.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   nsresult Listen(BluetoothUnixSocketConnector* aConnector,
-                  MessageLoop* aIOLoop);
+                  nsIThread* aConsumerThread, MessageLoop* aIOLoop);
 
   /**
    * Starts a task on the socket that will try to accept a new connection in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
--- a/ipc/bluetooth/BluetoothDaemonConnection.cpp
+++ b/ipc/bluetooth/BluetoothDaemonConnection.cpp
@@ -201,18 +201,19 @@ BluetoothDaemonPDUConsumer::~BluetoothDa
 // BluetoothDaemonConnectionIO
 //
 
 class BluetoothDaemonConnectionIO final
   : public UnixSocketWatcher
   , public ConnectionOrientedSocketIO
 {
 public:
-  BluetoothDaemonConnectionIO(MessageLoop* aIOLoop, int aFd,
-                              ConnectionStatus aConnectionStatus,
+  BluetoothDaemonConnectionIO(nsIThread* aConsumerThread,
+                              MessageLoop* aIOLoop,
+                              int aFd, ConnectionStatus aConnectionStatus,
                               BluetoothDaemonConnection* aConnection,
                               BluetoothDaemonPDUConsumer* aConsumer);
 
   // Task callback methods
   //
 
   void Send(UnixSocketIOBuffer* aBuffer);
 
@@ -250,24 +251,27 @@ public:
 private:
   BluetoothDaemonConnection* mConnection;
   BluetoothDaemonPDUConsumer* mConsumer;
   nsAutoPtr<BluetoothDaemonPDU> mPDU;
   bool mShuttingDownOnIOThread;
 };
 
 BluetoothDaemonConnectionIO::BluetoothDaemonConnectionIO(
-  MessageLoop* aIOLoop, int aFd,
+  nsIThread* aConsumerThread,
+  MessageLoop* aIOLoop,
+  int aFd,
   ConnectionStatus aConnectionStatus,
   BluetoothDaemonConnection* aConnection,
   BluetoothDaemonPDUConsumer* aConsumer)
-: UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus)
-, mConnection(aConnection)
-, mConsumer(aConsumer)
-, mShuttingDownOnIOThread(false)
+  : UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus)
+  , ConnectionOrientedSocketIO(aConsumerThread)
+  , mConnection(aConnection)
+  , mConsumer(aConsumer)
+  , mShuttingDownOnIOThread(false)
 {
   MOZ_ASSERT(mConnection);
   MOZ_ASSERT(mConsumer);
 }
 
 void
 BluetoothDaemonConnectionIO::Send(UnixSocketIOBuffer* aBuffer)
 {
@@ -303,18 +307,19 @@ BluetoothDaemonConnectionIO::OnSocketCan
 }
 
 void
 BluetoothDaemonConnectionIO::OnConnected()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
 
-  NS_DispatchToMainThread(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
+  GetConsumerThread()->Dispatch(
+    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
+    NS_DISPATCH_NORMAL);
 
   AddWatchers(READ_WATCHER, true);
   if (HasPendingData()) {
     AddWatchers(WRITE_WATCHER, false);
   }
 }
 
 void
@@ -323,18 +328,19 @@ BluetoothDaemonConnectionIO::OnError(con
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
 
   UnixFdWatcher::OnError(aFunction, aErrno);
 
   // Clean up watchers, status, fd
   Close();
 
   // Tell the main thread we've errored
-  NS_DispatchToMainThread(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR));
+  GetConsumerThread()->Dispatch(
+    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
+    NS_DISPATCH_NORMAL);
 }
 
 // |ConnectionOrientedSocketIO|
 
 nsresult
 BluetoothDaemonConnectionIO::Accept(int aFd,
                                     const struct sockaddr* aAddress,
                                     socklen_t aAddressLength)
@@ -394,40 +400,40 @@ SocketBase*
 BluetoothDaemonConnectionIO::GetSocketBase()
 {
   return mConnection;
 }
 
 bool
 BluetoothDaemonConnectionIO::IsShutdownOnMainThread() const
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
 
   return mConnection == nullptr;
 }
 
 bool
 BluetoothDaemonConnectionIO::IsShutdownOnIOThread() const
 {
   return mShuttingDownOnIOThread;
 }
 
 void
 BluetoothDaemonConnectionIO::ShutdownOnMainThread()
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
   MOZ_ASSERT(!IsShutdownOnMainThread());
 
   mConnection = nullptr;
 }
 
 void
 BluetoothDaemonConnectionIO::ShutdownOnIOThread()
 {
-  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(!IsConsumerThread());
   MOZ_ASSERT(!mShuttingDownOnIOThread);
 
   Close(); // will also remove fd from I/O loop
   mShuttingDownOnIOThread = true;
 }
 
 //
 // BluetoothDaemonConnection
@@ -447,87 +453,82 @@ BluetoothDaemonConnection::BluetoothDaem
 
 BluetoothDaemonConnection::~BluetoothDaemonConnection()
 { }
 
 // |ConnectionOrientedSocket|
 
 nsresult
 BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector,
+                                         nsIThread* aConsumerThread,
                                          MessageLoop* aIOLoop,
                                          ConnectionOrientedSocketIO*& aIO)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(!mIO);
 
   // |BluetoothDaemonConnection| now owns the connector, but doesn't
   // actually use it. So the connector is stored in an auto pointer
   // to be deleted at the end of the method.
   nsAutoPtr<UnixSocketConnector> connector(aConnector);
 
   SetConnectionStatus(SOCKET_CONNECTING);
 
   mIO = new BluetoothDaemonConnectionIO(
-    aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
+    aConsumerThread, aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
     this, mPDUConsumer);
   aIO = mIO;
 
   return NS_OK;
 }
 
 // |DataSocket|
 
 void
 BluetoothDaemonConnection::SendSocketData(UnixSocketIOBuffer* aBuffer)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mIO);
+  MOZ_ASSERT(mIO->IsConsumerThread());
 
   mIO->GetIOLoop()->PostTask(
     FROM_HERE,
     new SocketIOSendTask<BluetoothDaemonConnectionIO,
                          UnixSocketIOBuffer>(mIO, aBuffer));
 }
 
 // |SocketBase|
 
 void
 BluetoothDaemonConnection::Close()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   if (!mIO) {
     CHROMIUM_LOG("Bluetooth daemon already disconnected!");
     return;
   }
 
+  MOZ_ASSERT(mIO->IsConsumerThread());
+
+  mIO->ShutdownOnMainThread();
   mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
   mIO = nullptr;
 
   NotifyDisconnect();
 }
 
 void
 BluetoothDaemonConnection::OnConnectSuccess()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   mConsumer->OnConnectSuccess(mIndex);
 }
 
 void
 BluetoothDaemonConnection::OnConnectError()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   mConsumer->OnConnectError(mIndex);
 }
 
 void
 BluetoothDaemonConnection::OnDisconnect()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   mConsumer->OnDisconnect(mIndex);
 }
 
 }
 }
--- a/ipc/bluetooth/BluetoothDaemonConnection.h
+++ b/ipc/bluetooth/BluetoothDaemonConnection.h
@@ -120,16 +120,17 @@ public:
                             BluetoothDaemonConnectionConsumer* aConsumer,
                             int aIndex);
   virtual ~BluetoothDaemonConnection();
 
   // Methods for |ConnectionOrientedSocket|
   //
 
   nsresult PrepareAccept(UnixSocketConnector* aConnector,
+                         nsIThread* aConsumerThread,
                          MessageLoop* aIOLoop,
                          ConnectionOrientedSocketIO*& aIO) override;
 
   // Methods for |DataSocket|
   //
 
   void SendSocketData(UnixSocketIOBuffer* aBuffer) override;
 
--- a/ipc/unixsocket/ConnectionOrientedSocket.cpp
+++ b/ipc/unixsocket/ConnectionOrientedSocket.cpp
@@ -4,16 +4,29 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "ConnectionOrientedSocket.h"
 
 namespace mozilla {
 namespace ipc {
 
+//
+// ConnectionOrientedSocketIO
+//
+
+ConnectionOrientedSocketIO::ConnectionOrientedSocketIO(
+  nsIThread* aConsumerThread)
+  : DataSocketIO(aConsumerThread)
+{ }
+
 ConnectionOrientedSocketIO::~ConnectionOrientedSocketIO()
 { }
 
+//
+// ConnectionOrientedSocket
+//
+
 ConnectionOrientedSocket::~ConnectionOrientedSocket()
 { }
 
 }
 }
--- a/ipc/unixsocket/ConnectionOrientedSocket.h
+++ b/ipc/unixsocket/ConnectionOrientedSocket.h
@@ -21,37 +21,40 @@ class UnixSocketConnector;
  * |ConnectionOrientedSocketIO| and |ConnectionOrientedSocket| define
  * interfaces for implementing stream sockets on I/O and main thread.
  * |ListenSocket| uses these classes to handle accepted sockets.
  */
 
 class ConnectionOrientedSocketIO : public DataSocketIO
 {
 public:
+  ConnectionOrientedSocketIO(nsIThread* aConsumerThread);
   virtual ~ConnectionOrientedSocketIO();
 
   virtual nsresult Accept(int aFd,
                           const struct sockaddr* aAddress,
                           socklen_t aAddressLength) = 0;
 };
 
 class ConnectionOrientedSocket : public DataSocket
 {
 public:
   /**
    * Prepares an instance of |ConnectionOrientedSocket| in DISCONNECTED
    * state for accepting a connection. Main-thread only.
    *
    * @param aConnector The new connector object, owned by the
    *                   connection-oriented socket.
+   * @param aConsumerThread The socket's consumer thread.
    * @param aIOLoop The socket's I/O thread.
    * @param[out] aIO, Returns an instance of |ConnectionOrientedSocketIO|.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   virtual nsresult PrepareAccept(UnixSocketConnector* aConnector,
+                                 nsIThread* aConsumerThread,
                                  MessageLoop* aIOLoop,
                                  ConnectionOrientedSocketIO*& aIO) = 0;
 
 protected:
   virtual ~ConnectionOrientedSocket();
 };
 
 }
--- a/ipc/unixsocket/DataSocket.cpp
+++ b/ipc/unixsocket/DataSocket.cpp
@@ -45,30 +45,33 @@ ssize_t
 DataSocketIO::ReceiveData(int aFd)
 {
   MOZ_ASSERT(aFd >= 0);
 
   UnixSocketIOBuffer* incoming;
   nsresult rv = QueryReceiveBuffer(&incoming);
   if (NS_FAILED(rv)) {
     /* an error occured */
-    NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this));
+    GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
+                                  NS_DISPATCH_NORMAL);
     return -1;
   }
 
   ssize_t res = incoming->Receive(aFd);
   if (res < 0) {
     /* an I/O error occured */
     DiscardBuffer();
-    NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this));
+    GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
+                                  NS_DISPATCH_NORMAL);
     return -1;
   } else if (!res) {
     /* EOF or peer shut down sending */
     DiscardBuffer();
-    NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this));
+    GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
+                                  NS_DISPATCH_NORMAL);
     return 0;
   }
 
 #ifdef MOZ_TASK_TRACER
   /* Make unix socket creation events to be the source events of TaskTracer,
    * and originate the rest correlation tasks from here.
    */
   AutoSourceEvent taskTracerEvent(SourceEventType::Unixsocket);
@@ -85,32 +88,34 @@ DataSocketIO::SendPendingData(int aFd)
   MOZ_ASSERT(aFd >= 0);
 
   while (HasPendingData()) {
     UnixSocketIOBuffer* outgoing = mOutgoingQ.ElementAt(0);
 
     ssize_t res = outgoing->Send(aFd);
     if (res < 0) {
       /* an I/O error occured */
-      NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this));
+      GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
+                                    NS_DISPATCH_NORMAL);
       return NS_ERROR_FAILURE;
     } else if (!res && outgoing->GetSize()) {
       /* I/O is currently blocked; try again later */
       return NS_OK;
     }
     if (!outgoing->GetSize()) {
       mOutgoingQ.RemoveElementAt(0);
       delete outgoing;
     }
   }
 
   return NS_OK;
 }
 
-DataSocketIO::DataSocketIO()
+DataSocketIO::DataSocketIO(nsIThread* aConsumerThread)
+  : SocketIOBase(aConsumerThread)
 { }
 
 //
 // DataSocket
 //
 
 DataSocket::~DataSocket()
 { }
--- a/ipc/unixsocket/DataSocket.h
+++ b/ipc/unixsocket/DataSocket.h
@@ -85,17 +85,17 @@ public:
   void EnqueueData(UnixSocketIOBuffer* aBuffer);
   bool HasPendingData() const;
 
   ssize_t ReceiveData(int aFd);
 
   nsresult SendPendingData(int aFd);
 
 protected:
-  DataSocketIO();
+  DataSocketIO(nsIThread* aConsumerThread);
 
 private:
   /**
    * Raw data queue. Must be pushed/popped from I/O thread only.
    */
   nsTArray<UnixSocketIOBuffer*> mOutgoingQ;
 };
 
@@ -115,20 +115,20 @@ public:
   : SocketIOTask<Tio>(aIO)
   , mData(aData)
   {
     MOZ_ASSERT(aData);
   }
 
   void Run() override
   {
-    MOZ_ASSERT(!NS_IsMainThread());
     MOZ_ASSERT(!SocketIOTask<Tio>::IsCanceled());
 
     Tio* io = SocketIOTask<Tio>::GetIO();
+    MOZ_ASSERT(!io->IsConsumerThread());
     MOZ_ASSERT(!io->IsShutdownOnIOThread());
 
     io->Send(mData);
   }
 
 private:
   Tdata* mData;
 };
--- a/ipc/unixsocket/ListenSocket.cpp
+++ b/ipc/unixsocket/ListenSocket.cpp
@@ -22,17 +22,18 @@ namespace ipc {
 
 class ListenSocketIO final
   : public UnixSocketWatcher
   , public SocketIOBase
 {
 public:
   class ListenTask;
 
-  ListenSocketIO(MessageLoop* mIOLoop,
+  ListenSocketIO(nsIThread* aConsumerThread,
+                 MessageLoop* aIOLoop,
                  ListenSocket* aListenSocket,
                  UnixSocketConnector* aConnector);
   ~ListenSocketIO();
 
   UnixSocketConnector* GetConnector() const;
 
   // Task callback methods
   //
@@ -89,34 +90,35 @@ private:
   /**
    * Address structure of the socket currently in use
    */
   struct sockaddr_storage mAddress;
 
   ConnectionOrientedSocketIO* mCOSocketIO;
 };
 
-ListenSocketIO::ListenSocketIO(MessageLoop* mIOLoop,
+ListenSocketIO::ListenSocketIO(nsIThread* aConsumerThread,
+                               MessageLoop* aIOLoop,
                                ListenSocket* aListenSocket,
                                UnixSocketConnector* aConnector)
-  : UnixSocketWatcher(mIOLoop)
-  , SocketIOBase()
+  : UnixSocketWatcher(aIOLoop)
+  , SocketIOBase(aConsumerThread)
   , mListenSocket(aListenSocket)
   , mConnector(aConnector)
   , mShuttingDownOnIOThread(false)
   , mAddressLength(0)
   , mCOSocketIO(nullptr)
 {
   MOZ_ASSERT(mListenSocket);
   MOZ_ASSERT(mConnector);
 }
 
 ListenSocketIO::~ListenSocketIO()
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
   MOZ_ASSERT(IsShutdownOnMainThread());
 }
 
 UnixSocketConnector*
 ListenSocketIO::GetConnector() const
 {
   return mConnector;
 }
@@ -161,18 +163,19 @@ void
 ListenSocketIO::OnListening()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
 
   AddWatchers(READ_WATCHER, true);
 
   /* We signal a successful 'connection' to a local address for listening. */
-  NS_DispatchToMainThread(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
+  GetConsumerThread()->Dispatch(
+    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
+    NS_DISPATCH_NORMAL);
 }
 
 void
 ListenSocketIO::OnError(const char* aFunction, int aErrno)
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
 
   UnixFdWatcher::OnError(aFunction, aErrno);
@@ -183,18 +186,19 @@ void
 ListenSocketIO::FireSocketError()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
 
   // Clean up watchers, statuses, fds
   Close();
 
   // Tell the main thread we've errored
-  NS_DispatchToMainThread(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR));
+  GetConsumerThread()->Dispatch(
+    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
+    NS_DISPATCH_NORMAL);
 }
 
 void
 ListenSocketIO::OnSocketCanAcceptWithoutBlocking()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
   MOZ_ASSERT(mCOSocketIO);
@@ -225,40 +229,40 @@ SocketBase*
 ListenSocketIO::GetSocketBase()
 {
   return mListenSocket.get();
 }
 
 bool
 ListenSocketIO::IsShutdownOnMainThread() const
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
 
   return mListenSocket == nullptr;
 }
 
 bool
 ListenSocketIO::IsShutdownOnIOThread() const
 {
   return mShuttingDownOnIOThread;
 }
 
 void
 ListenSocketIO::ShutdownOnMainThread()
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
   MOZ_ASSERT(!IsShutdownOnMainThread());
 
   mListenSocket = nullptr;
 }
 
 void
 ListenSocketIO::ShutdownOnIOThread()
 {
-  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(!IsConsumerThread());
   MOZ_ASSERT(!mShuttingDownOnIOThread);
 
   Close(); // will also remove fd from I/O loop
   mShuttingDownOnIOThread = true;
 }
 
 //
 // Socket tasks
@@ -272,17 +276,17 @@ public:
   : SocketIOTask<ListenSocketIO>(aIO)
   , mCOSocketIO(aCOSocketIO)
   {
     MOZ_ASSERT(mCOSocketIO);
   }
 
   void Run() override
   {
-    MOZ_ASSERT(!NS_IsMainThread());
+    MOZ_ASSERT(!GetIO()->IsConsumerThread());
 
     if (!IsCanceled()) {
       GetIO()->Listen(mCOSocketIO);
     }
   }
 
 private:
   ConnectionOrientedSocketIO* mCOSocketIO;
@@ -302,60 +306,66 @@ ListenSocket::ListenSocket(ListenSocketC
 
 ListenSocket::~ListenSocket()
 {
   MOZ_ASSERT(!mIO);
 }
 
 nsresult
 ListenSocket::Listen(UnixSocketConnector* aConnector,
+                     nsIThread* aConsumerThread,
                      MessageLoop* aIOLoop,
                      ConnectionOrientedSocket* aCOSocket)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(!mIO);
 
-  mIO = new ListenSocketIO(aIOLoop, this, aConnector);
+  mIO = new ListenSocketIO(aConsumerThread, aIOLoop, this, aConnector);
 
   // Prepared I/O object, now start listening.
   nsresult rv = Listen(aCOSocket);
   if (NS_FAILED(rv)) {
     delete mIO;
     mIO = nullptr;
     return rv;
   }
 
   return NS_OK;
 }
 
 nsresult
 ListenSocket::Listen(UnixSocketConnector* aConnector,
                      ConnectionOrientedSocket* aCOSocket)
 {
-  return Listen(aConnector, XRE_GetIOMessageLoop(), aCOSocket);
+  nsIThread* consumerThread = nullptr;
+  nsresult rv = NS_GetCurrentThread(&consumerThread);
+  if (NS_FAILED(rv)) {
+    return rv;
+  }
+
+  return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop(), aCOSocket);
 }
 
 nsresult
 ListenSocket::Listen(ConnectionOrientedSocket* aCOSocket)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(aCOSocket);
   MOZ_ASSERT(mIO);
 
   // We first prepare the connection-oriented socket with a
   // socket connector and a socket I/O class.
 
   nsAutoPtr<UnixSocketConnector> connector;
   nsresult rv = mIO->GetConnector()->Duplicate(*connector.StartAssignment());
   if (NS_FAILED(rv)) {
     return rv;
   }
 
   nsAutoPtr<ConnectionOrientedSocketIO> io;
-  rv = aCOSocket->PrepareAccept(connector, mIO->GetIOLoop(),
+  rv = aCOSocket->PrepareAccept(connector,
+                                mIO->GetConsumerThread(), mIO->GetIOLoop(),
                                 *io.StartAssignment());
   if (NS_FAILED(rv)) {
     return rv;
   }
   connector.forget(); // now owned by |io|
 
   // Then we start listening for connection requests.
 
@@ -367,50 +377,44 @@ ListenSocket::Listen(ConnectionOrientedS
   return NS_OK;
 }
 
 // |SocketBase|
 
 void
 ListenSocket::Close()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   if (!mIO) {
     return;
   }
 
+  MOZ_ASSERT(mIO->IsConsumerThread());
+
   // From this point on, we consider mIO as being deleted. We sever
   // the relationship here so any future calls to listen or connect
   // will create a new implementation.
   mIO->ShutdownOnMainThread();
   mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
   mIO = nullptr;
 
   NotifyDisconnect();
 }
 
 void
 ListenSocket::OnConnectSuccess()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   mConsumer->OnConnectSuccess(mIndex);
 }
 
 void
 ListenSocket::OnConnectError()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   mConsumer->OnConnectError(mIndex);
 }
 
 void
 ListenSocket::OnDisconnect()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   mConsumer->OnDisconnect(mIndex);
 }
 
 } // namespace ipc
 } // namespace mozilla
--- a/ipc/unixsocket/ListenSocket.h
+++ b/ipc/unixsocket/ListenSocket.h
@@ -6,16 +6,17 @@
 
 #ifndef mozilla_ipc_listensocket_h
 #define mozilla_ipc_listensocket_h
 
 #include "nsString.h"
 #include "mozilla/ipc/SocketBase.h"
 
 class MessageLoop;
+class nsIThread;
 
 namespace mozilla {
 namespace ipc {
 
 class ConnectionOrientedSocket;
 class ListenSocketConsumer;
 class ListenSocketIO;
 class UnixSocketConnector;
@@ -31,22 +32,24 @@ public:
    */
   ListenSocket(ListenSocketConsumer* aConsumer, int aIndex);
 
   /**
    * Starts a task on the socket that will try to accept a new connection
    * in a non-blocking manner.
    *
    * @param aConnector Connector object for socket-type-specific functions
+   * @param aConsumerThread The socket's consumer thread.
    * @param aIOLoop The socket's I/O thread.
    * @param aCOSocket The connection-oriented socket for handling the
    *                  accepted connection.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   nsresult Listen(UnixSocketConnector* aConnector,
+                  nsIThread* aConsumerThread,
                   MessageLoop* aIOLoop,
                   ConnectionOrientedSocket* aCOSocket);
 
   /**
    * Starts a task on the socket that will try to accept a new connection
    * in a non-blocking manner.
    *
    * @param aConnector Connector object for socket-type-specific functions
--- a/ipc/unixsocket/SocketBase.cpp
+++ b/ipc/unixsocket/SocketBase.cpp
@@ -181,66 +181,54 @@ UnixSocketRawData::Send(int aFd)
 
 //
 // SocketBase
 //
 
 SocketConnectionStatus
 SocketBase::GetConnectionStatus() const
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   return mConnectionStatus;
 }
 
 int
 SocketBase::GetSuggestedConnectDelayMs() const
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   return mConnectDelayMs;
 }
 
 void
 SocketBase::NotifySuccess()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   mConnectionStatus = SOCKET_CONNECTED;
   mConnectTimestamp = PR_IntervalNow();
   OnConnectSuccess();
 }
 
 void
 SocketBase::NotifyError()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   mConnectionStatus = SOCKET_DISCONNECTED;
   mConnectDelayMs = CalculateConnectDelayMs();
   mConnectTimestamp = 0;
   OnConnectError();
 }
 
 void
 SocketBase::NotifyDisconnect()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   mConnectionStatus = SOCKET_DISCONNECTED;
   mConnectDelayMs = CalculateConnectDelayMs();
   mConnectTimestamp = 0;
   OnDisconnect();
 }
 
 uint32_t
 SocketBase::CalculateConnectDelayMs() const
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   uint32_t connectDelayMs = mConnectDelayMs;
 
   if (mConnectTimestamp && (PR_IntervalNow()-mConnectTimestamp) > connectDelayMs) {
     // reset delay if connection has been opened for a while, or...
     connectDelayMs = 0;
   } else if (!connectDelayMs) {
     // ...start with a delay of ~1 sec, or...
     connectDelayMs = 1<<10;
@@ -267,38 +255,57 @@ SocketBase::SetConnectionStatus(SocketCo
 {
   mConnectionStatus = aConnectionStatus;
 }
 
 //
 // SocketIOBase
 //
 
-SocketIOBase::SocketIOBase()
-{ }
+SocketIOBase::SocketIOBase(nsIThread* aConsumerThread)
+  : mConsumerThread(aConsumerThread)
+{
+  MOZ_ASSERT(mConsumerThread);
+}
 
 SocketIOBase::~SocketIOBase()
 { }
 
+nsIThread*
+SocketIOBase::GetConsumerThread() const
+{
+  return mConsumerThread;
+}
+
+bool
+SocketIOBase::IsConsumerThread() const
+{
+  nsIThread* thread = nullptr;
+  if (NS_FAILED(NS_GetCurrentThread(&thread))) {
+    return false;
+  }
+  return thread == GetConsumerThread();
+}
+
 //
 // SocketIOEventRunnable
 //
 
 SocketIOEventRunnable::SocketIOEventRunnable(SocketIOBase* aIO,
                                              SocketEvent aEvent)
   : SocketIORunnable<SocketIOBase>(aIO)
   , mEvent(aEvent)
 { }
 
 NS_METHOD
 SocketIOEventRunnable::Run()
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO();
 
-  SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO();
+  MOZ_ASSERT(io->IsConsumerThread());
 
   if (NS_WARN_IF(io->IsShutdownOnMainThread())) {
     // Since we've already explicitly closed and the close
     // happened before this, this isn't really an error.
     return NS_OK;
   }
 
   SocketBase* socketBase = io->GetSocketBase();
@@ -322,19 +329,19 @@ SocketIOEventRunnable::Run()
 SocketIORequestClosingRunnable::SocketIORequestClosingRunnable(
   SocketIOBase* aIO)
   : SocketIORunnable<SocketIOBase>(aIO)
 { }
 
 NS_METHOD
 SocketIORequestClosingRunnable::Run()
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO();
 
-  SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO();
+  MOZ_ASSERT(io->IsConsumerThread());
 
   if (NS_WARN_IF(io->IsShutdownOnMainThread())) {
     // Since we've already explicitly closed and the close
     // happened before this, this isn't really an error.
     return NS_OK;
   }
 
   SocketBase* socketBase = io->GetSocketBase();
@@ -368,25 +375,25 @@ SocketIODeleteInstanceRunnable::Run()
 
 SocketIOShutdownTask::SocketIOShutdownTask(SocketIOBase* aIO)
   : SocketIOTask<SocketIOBase>(aIO)
 { }
 
 void
 SocketIOShutdownTask::Run()
 {
-  MOZ_ASSERT(!NS_IsMainThread());
+  SocketIOBase* io = SocketIOTask<SocketIOBase>::GetIO();
 
-  SocketIOBase* io = SocketIOTask<SocketIOBase>::GetIO();
+  MOZ_ASSERT(!io->IsConsumerThread());
+  MOZ_ASSERT(!io->IsShutdownOnIOThread());
 
   // At this point, there should be no new events on the I/O thread
   // after this one with the possible exception of an accept task,
   // which ShutdownOnIOThread will cancel for us. We are now fully
   // shut down, so we can send a message to the main thread to delete
   // |io| safely knowing that it's not reference any longer.
-  MOZ_ASSERT(!io->IsShutdownOnIOThread());
   io->ShutdownOnIOThread();
-
-  NS_DispatchToMainThread(new SocketIODeleteInstanceRunnable(io));
+  io->GetConsumerThread()->Dispatch(new SocketIODeleteInstanceRunnable(io),
+                                    NS_DISPATCH_NORMAL);
 }
 
 }
 }
--- a/ipc/unixsocket/SocketBase.h
+++ b/ipc/unixsocket/SocketBase.h
@@ -348,18 +348,34 @@ public:
   virtual void ShutdownOnIOThread() = 0;
 
   /**
    * Signals to the socket I/O classes that the socket class has been
    * shut down.
    */
   virtual void ShutdownOnMainThread() = 0;
 
+  /**
+   * Returns the consumer thread.
+   *
+   * @return A pointer to the consumer thread.
+   */
+  nsIThread* GetConsumerThread() const;
+
+  /**
+   * @return True if the current thread is thre consumer thread, or false
+   *         otherwise.
+   */
+  bool IsConsumerThread() const;
+
 protected:
-  SocketIOBase();
+  SocketIOBase(nsIThread* nsConsumerThread);
+
+private:
+  nsCOMPtr<nsIThread> mConsumerThread;
 };
 
 //
 // Socket I/O runnables
 //
 
 /* |SocketIORunnable| is a runnable for sending a message from
  * the I/O thread to the main thread.
--- a/ipc/unixsocket/StreamSocket.cpp
+++ b/ipc/unixsocket/StreamSocket.cpp
@@ -24,20 +24,22 @@ class StreamSocketIO final
   : public UnixSocketWatcher
   , public ConnectionOrientedSocketIO
 {
 public:
   class ConnectTask;
   class DelayedConnectTask;
   class ReceiveRunnable;
 
-  StreamSocketIO(MessageLoop* mIOLoop,
+  StreamSocketIO(nsIThread* aConsumerThread,
+                 MessageLoop* mIOLoop,
                  StreamSocket* aStreamSocket,
                  UnixSocketConnector* aConnector);
-  StreamSocketIO(MessageLoop* mIOLoop, int aFd,
+  StreamSocketIO(nsIThread* aConsumerThread,
+                 MessageLoop* mIOLoop, int aFd,
                  ConnectionStatus aConnectionStatus,
                  StreamSocket* aStreamSocket,
                  UnixSocketConnector* aConnector);
   ~StreamSocketIO();
 
   StreamSocket* GetStreamSocket();
   DataSocket* GetDataSocket();
 
@@ -128,48 +130,52 @@ private:
   CancelableTask* mDelayedConnectTask;
 
   /**
    * I/O buffer for received data
    */
   nsAutoPtr<UnixSocketRawData> mBuffer;
 };
 
-StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop,
+StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
+                               MessageLoop* aIOLoop,
                                StreamSocket* aStreamSocket,
                                UnixSocketConnector* aConnector)
-  : UnixSocketWatcher(mIOLoop)
+  : UnixSocketWatcher(aIOLoop)
+  , ConnectionOrientedSocketIO(aConsumerThread)
   , mStreamSocket(aStreamSocket)
   , mConnector(aConnector)
   , mShuttingDownOnIOThread(false)
   , mAddressLength(0)
   , mDelayedConnectTask(nullptr)
 {
   MOZ_ASSERT(mStreamSocket);
   MOZ_ASSERT(mConnector);
 }
 
-StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, int aFd,
-                               ConnectionStatus aConnectionStatus,
+StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
+                               MessageLoop* aIOLoop,
+                               int aFd, ConnectionStatus aConnectionStatus,
                                StreamSocket* aStreamSocket,
                                UnixSocketConnector* aConnector)
-  : UnixSocketWatcher(mIOLoop, aFd, aConnectionStatus)
+  : UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus)
+  , ConnectionOrientedSocketIO(aConsumerThread)
   , mStreamSocket(aStreamSocket)
   , mConnector(aConnector)
   , mShuttingDownOnIOThread(false)
   , mAddressLength(0)
   , mDelayedConnectTask(nullptr)
 {
   MOZ_ASSERT(mStreamSocket);
   MOZ_ASSERT(mConnector);
 }
 
 StreamSocketIO::~StreamSocketIO()
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
   MOZ_ASSERT(IsShutdownOnMainThread());
 }
 
 StreamSocket*
 StreamSocketIO::GetStreamSocket()
 {
   return mStreamSocket.get();
 }
@@ -178,33 +184,33 @@ DataSocket*
 StreamSocketIO::GetDataSocket()
 {
   return mStreamSocket.get();
 }
 
 void
 StreamSocketIO::SetDelayedConnectTask(CancelableTask* aTask)
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
 
   mDelayedConnectTask = aTask;
 }
 
 void
 StreamSocketIO::ClearDelayedConnectTask()
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
 
   mDelayedConnectTask = nullptr;
 }
 
 void
 StreamSocketIO::CancelDelayedConnectTask()
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
 
   if (!mDelayedConnectTask) {
     return;
   }
 
   mDelayedConnectTask->Cancel();
   ClearDelayedConnectTask();
 }
@@ -241,18 +247,19 @@ StreamSocketIO::Send(UnixSocketIOBuffer*
 }
 
 void
 StreamSocketIO::OnConnected()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
 
-  NS_DispatchToMainThread(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
+  GetConsumerThread()->Dispatch(
+    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
+    NS_DISPATCH_NORMAL);
 
   AddWatchers(READ_WATCHER, true);
   if (HasPendingData()) {
     AddWatchers(WRITE_WATCHER, false);
   }
 }
 
 void
@@ -308,18 +315,19 @@ void
 StreamSocketIO::FireSocketError()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
 
   // Clean up watchers, statuses, fds
   Close();
 
   // Tell the main thread we've errored
-  NS_DispatchToMainThread(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR));
+  GetConsumerThread()->Dispatch(
+    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
+    NS_DISPATCH_NORMAL);
 }
 
 // |ConnectionOrientedSocketIO|
 
 nsresult
 StreamSocketIO::Accept(int aFd,
                        const struct sockaddr* aAddress,
                        socklen_t aAddressLength)
@@ -329,18 +337,19 @@ StreamSocketIO::Accept(int aFd,
 
   SetSocket(aFd, SOCKET_IS_CONNECTED);
 
   // Address setup
   mAddressLength = aAddressLength;
   memcpy(&mAddress, aAddress, mAddressLength);
 
   // Signal success
-  NS_DispatchToMainThread(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
+  GetConsumerThread()->Dispatch(
+    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
+    NS_DISPATCH_NORMAL);
 
   AddWatchers(READ_WATCHER, true);
   if (HasPendingData()) {
     AddWatchers(WRITE_WATCHER, false);
   }
 
   return NS_OK;
 }
@@ -370,19 +379,19 @@ class StreamSocketIO::ReceiveRunnable fi
 public:
   ReceiveRunnable(StreamSocketIO* aIO, UnixSocketBuffer* aBuffer)
     : SocketIORunnable<StreamSocketIO>(aIO)
     , mBuffer(aBuffer)
   { }
 
   NS_IMETHOD Run() override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    StreamSocketIO* io = SocketIORunnable<StreamSocketIO>::GetIO();
 
-    StreamSocketIO* io = SocketIORunnable<StreamSocketIO>::GetIO();
+    MOZ_ASSERT(io->IsConsumerThread());
 
     if (NS_WARN_IF(io->IsShutdownOnMainThread())) {
       // Since we've already explicitly closed and the close
       // happened before this, this isn't really an error.
       return NS_OK;
     }
 
     StreamSocket* streamSocket = io->GetStreamSocket();
@@ -395,17 +404,18 @@ public:
 
 private:
   nsAutoPtr<UnixSocketBuffer> mBuffer;
 };
 
 void
 StreamSocketIO::ConsumeBuffer()
 {
-  NS_DispatchToMainThread(new ReceiveRunnable(this, mBuffer.forget()));
+  GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
+                                NS_DISPATCH_NORMAL);
 }
 
 void
 StreamSocketIO::DiscardBuffer()
 {
   // Nothing to do.
 }
 
@@ -415,40 +425,40 @@ SocketBase*
 StreamSocketIO::GetSocketBase()
 {
   return GetDataSocket();
 }
 
 bool
 StreamSocketIO::IsShutdownOnMainThread() const
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
 
   return mStreamSocket == nullptr;
 }
 
 bool
 StreamSocketIO::IsShutdownOnIOThread() const
 {
   return mShuttingDownOnIOThread;
 }
 
 void
 StreamSocketIO::ShutdownOnMainThread()
 {
-  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsConsumerThread());
   MOZ_ASSERT(!IsShutdownOnMainThread());
 
   mStreamSocket = nullptr;
 }
 
 void
 StreamSocketIO::ShutdownOnIOThread()
 {
-  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(!IsConsumerThread());
   MOZ_ASSERT(!mShuttingDownOnIOThread);
 
   Close(); // will also remove fd from I/O loop
   mShuttingDownOnIOThread = true;
 }
 
 //
 // Socket tasks
@@ -459,34 +469,34 @@ class StreamSocketIO::ConnectTask final
 {
 public:
   ConnectTask(StreamSocketIO* aIO)
   : SocketIOTask<StreamSocketIO>(aIO)
   { }
 
   void Run() override
   {
-    MOZ_ASSERT(!NS_IsMainThread());
+    MOZ_ASSERT(!GetIO()->IsConsumerThread());
     MOZ_ASSERT(!IsCanceled());
 
     GetIO()->Connect();
   }
 };
 
 class StreamSocketIO::DelayedConnectTask final
   : public SocketIOTask<StreamSocketIO>
 {
 public:
   DelayedConnectTask(StreamSocketIO* aIO)
     : SocketIOTask<StreamSocketIO>(aIO)
   { }
 
   void Run() override
   {
-    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(GetIO()->IsConsumerThread());
 
     if (IsCanceled()) {
       return;
     }
 
     StreamSocketIO* io = GetIO();
     if (io->IsShutdownOnMainThread()) {
       return;
@@ -512,121 +522,119 @@ StreamSocket::StreamSocket(StreamSocketC
 StreamSocket::~StreamSocket()
 {
   MOZ_ASSERT(!mIO);
 }
 
 void
 StreamSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   mConsumer->ReceiveSocketData(mIndex, aBuffer);
 }
 
 nsresult
 StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs,
-                      MessageLoop* aIOLoop)
+                      nsIThread* aConsumerThread, MessageLoop* aIOLoop)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(!mIO);
 
-  mIO = new StreamSocketIO(aIOLoop, this, aConnector);
+  mIO = new StreamSocketIO(aConsumerThread, aIOLoop, this, aConnector);
   SetConnectionStatus(SOCKET_CONNECTING);
 
   if (aDelayMs > 0) {
     StreamSocketIO::DelayedConnectTask* connectTask =
       new StreamSocketIO::DelayedConnectTask(mIO);
     mIO->SetDelayedConnectTask(connectTask);
     MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs);
   } else {
     aIOLoop->PostTask(FROM_HERE, new StreamSocketIO::ConnectTask(mIO));
   }
+
   return NS_OK;
 }
 
 nsresult
 StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs)
 {
-  return Connect(aConnector, aDelayMs, XRE_GetIOMessageLoop());
+  nsIThread* consumerThread = nullptr;
+  nsresult rv = NS_GetCurrentThread(&consumerThread);
+  if (NS_FAILED(rv)) {
+    return rv;
+  }
+
+  return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop());
 }
 
 // |ConnectionOrientedSocket|
 
 nsresult
 StreamSocket::PrepareAccept(UnixSocketConnector* aConnector,
+                            nsIThread* aConsumerThread,
                             MessageLoop* aIOLoop,
                             ConnectionOrientedSocketIO*& aIO)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(!mIO);
   MOZ_ASSERT(aConnector);
 
   SetConnectionStatus(SOCKET_CONNECTING);
 
-  mIO = new StreamSocketIO(aIOLoop,
+  mIO = new StreamSocketIO(aConsumerThread, aIOLoop,
                            -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
                            this, aConnector);
   aIO = mIO;
 
   return NS_OK;
 }
 
 // |DataSocket|
 
 void
 StreamSocket::SendSocketData(UnixSocketIOBuffer* aBuffer)
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mIO);
+  MOZ_ASSERT(mIO->IsConsumerThread());
+  MOZ_ASSERT(!mIO->IsShutdownOnMainThread());
 
-  MOZ_ASSERT(!mIO->IsShutdownOnMainThread());
   mIO->GetIOLoop()->PostTask(
     FROM_HERE,
     new SocketIOSendTask<StreamSocketIO, UnixSocketIOBuffer>(mIO, aBuffer));
 }
 
 // |SocketBase|
 
 void
 StreamSocket::Close()
 {
-  MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mIO);
+  MOZ_ASSERT(mIO->IsConsumerThread());
 
   mIO->CancelDelayedConnectTask();
 
   // From this point on, we consider |mIO| as being deleted. We sever
   // the relationship here so any future calls to |Connect| will create
   // a new I/O object.
   mIO->ShutdownOnMainThread();
   mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
   mIO = nullptr;
 
   NotifyDisconnect();
 }
 
 void
 StreamSocket::OnConnectSuccess()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   mConsumer->OnConnectSuccess(mIndex);
 }
 
 void
 StreamSocket::OnConnectError()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   mConsumer->OnConnectError(mIndex);
 }
 
 void
 StreamSocket::OnDisconnect()
 {
-  MOZ_ASSERT(NS_IsMainThread());
-
   mConsumer->OnDisconnect(mIndex);
 }
 
 } // namespace ipc
 } // namespace mozilla
--- a/ipc/unixsocket/StreamSocket.h
+++ b/ipc/unixsocket/StreamSocket.h
@@ -37,36 +37,38 @@ public:
   void ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer);
 
   /**
    * Starts a task on the socket that will try to connect to a socket in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @param aDelayMs Time delay in milliseconds.
+   * @param aConsumerThread The socket's consumer thread.
    * @param aIOLoop The socket's I/O thread.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs,
-                   MessageLoop* aIOLoop);
+                   nsIThread* aConsumerThread, MessageLoop* aIOLoop);
 
   /**
    * Starts a task on the socket that will try to connect to a socket in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @param aDelayMs Time delay in milliseconds.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs = 0);
 
   // Methods for |ConnectionOrientedSocket|
   //
 
   nsresult PrepareAccept(UnixSocketConnector* aConnector,
+                         nsIThread* aConsumerThread,
                          MessageLoop* aIOLoop,
                          ConnectionOrientedSocketIO*& aIO) override;
 
   // Methods for |DataSocket|
   //
 
   void SendSocketData(UnixSocketIOBuffer* aBuffer) override;