Bug 1172479: Replace |nsIThread| by |MessageLoop| in socket I/O code, r=kmachulis
authorThomas Zimmermann <tdz@users.sourceforge.net>
Tue, 09 Jun 2015 09:50:10 +0200
changeset 247798 b6c31e8a24243b6a200da54542b9e24eeac10641
parent 247797 28c5bdba9276d5a5293aad8577f762fcaaa94870
child 247799 7ee2981f67ced700a8ab2884f50a109153c1b724
push id28884
push usercbook@mozilla.com
push dateWed, 10 Jun 2015 13:14:58 +0000
treeherdermozilla-central@49ec7e24b35b [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerskmachulis
bugs1172479
milestone41.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1172479: Replace |nsIThread| by |MessageLoop| in socket I/O code, r=kmachulis Dispatching events via |nsIThread| doesn't work with worker threads. This patch replaces all uses of |nsIThread| in the socket code by equivalent uses of |MessageLoop|.
dom/bluetooth/bluedroid/BluetoothSocket.cpp
dom/bluetooth/bluedroid/BluetoothSocket.h
dom/bluetooth/bluez/BluetoothSocket.cpp
dom/bluetooth/bluez/BluetoothSocket.h
ipc/bluetooth/BluetoothDaemonConnection.cpp
ipc/bluetooth/BluetoothDaemonConnection.h
ipc/unixsocket/ConnectionOrientedSocket.cpp
ipc/unixsocket/ConnectionOrientedSocket.h
ipc/unixsocket/DataSocket.cpp
ipc/unixsocket/DataSocket.h
ipc/unixsocket/ListenSocket.cpp
ipc/unixsocket/ListenSocket.h
ipc/unixsocket/SocketBase.cpp
ipc/unixsocket/SocketBase.h
ipc/unixsocket/StreamSocket.cpp
ipc/unixsocket/StreamSocket.h
--- a/dom/bluetooth/bluedroid/BluetoothSocket.cpp
+++ b/dom/bluetooth/bluedroid/BluetoothSocket.cpp
@@ -1,27 +1,23 @@
 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "BluetoothSocket.h"
-
 #include <fcntl.h>
 #include <sys/socket.h>
-
-#include "base/message_loop.h"
 #include "BluetoothSocketObserver.h"
 #include "BluetoothInterface.h"
 #include "BluetoothUtils.h"
 #include "mozilla/ipc/UnixSocketWatcher.h"
 #include "mozilla/FileUtils.h"
 #include "mozilla/RefPtr.h"
-#include "nsThreadUtils.h"
 #include "nsXULAppAPI.h"
 
 using namespace mozilla::ipc;
 USING_BLUETOOTH_NAMESPACE
 
 static const size_t MAX_READ_SIZE = 1 << 16;
 static BluetoothSocketInterface* sBluetoothSocketInterface;
 
@@ -69,21 +65,21 @@ public:
    */
   enum ConnectionStatus {
     SOCKET_IS_DISCONNECTED = 0,
     SOCKET_IS_LISTENING,
     SOCKET_IS_CONNECTING,
     SOCKET_IS_CONNECTED
   };
 
-  DroidSocketImpl(nsIThread* aConsumerThread,
+  DroidSocketImpl(MessageLoop* aConsumerLoop,
                   MessageLoop* aIOLoop,
                   BluetoothSocket* aConsumer)
     : ipc::UnixFdWatcher(aIOLoop)
-    , DataSocketIO(aConsumerThread)
+    , DataSocketIO(aConsumerLoop)
     , mConsumer(aConsumer)
     , mShuttingDownOnIOThread(false)
     , mConnectionStatus(SOCKET_IS_DISCONNECTED)
   { }
 
   ~DroidSocketImpl()
   {
     MOZ_ASSERT(IsConsumerThread());
@@ -168,17 +164,17 @@ public:
     MOZ_ASSERT(!IsConsumerThread());
     MOZ_ASSERT(!mShuttingDownOnIOThread);
 
     Close(); // will also remove fd from I/O loop
     mShuttingDownOnIOThread = true;
   }
 
 private:
-  class ReceiveRunnable;
+  class ReceiveTask;
 
   /**
    * libevent triggered functions that reads data from socket when available and
    * guarenteed non-blocking. Only to be called on IO thread.
    *
    * @param aFd [in] File descriptor to read from
    */
   virtual void OnFileCanReadWithoutBlocking(int aFd);
@@ -314,19 +310,18 @@ DroidSocketImpl::Accept(int aFd)
   if (!(flags & O_NONBLOCK)) {
     int res = TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFL, flags | O_NONBLOCK));
     NS_ENSURE_TRUE_VOID(!res);
   }
 
   SetFd(aFd);
   mConnectionStatus = SOCKET_IS_CONNECTED;
 
-  GetConsumerThread()->Dispatch(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
-    NS_DISPATCH_NORMAL);
+  GetConsumerThread()->PostTask(
+    FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
 
   AddWatchers(READ_WATCHER, true);
   if (HasPendingData()) {
     AddWatchers(WRITE_WATCHER, false);
   }
 }
 
 void
@@ -421,35 +416,33 @@ public:
       mImpl->mConsumer->NotifyDisconnect();
     }
   }
 
 private:
   DroidSocketImpl* mImpl;
 };
 
-class AcceptRunnable final : public SocketIORunnable<DroidSocketImpl>
+class InvokeAcceptTask final : public SocketTask<DroidSocketImpl>
 {
 public:
-  AcceptRunnable(DroidSocketImpl* aImpl, int aFd)
-  : SocketIORunnable<DroidSocketImpl>(aImpl)
-  , mFd(aFd)
+  InvokeAcceptTask(DroidSocketImpl* aImpl, int aFd)
+    : SocketTask<DroidSocketImpl>(aImpl)
+    , mFd(aFd)
   { }
 
-  NS_IMETHOD Run() override
+  void Run() override
   {
     MOZ_ASSERT(GetIO()->IsConsumerThread());
     MOZ_ASSERT(sBluetoothSocketInterface);
 
     BluetoothSocketResultHandler* res = new AcceptResultHandler(GetIO());
     GetIO()->mConsumer->SetCurrentResultHandler(res);
 
     sBluetoothSocketInterface->Accept(mFd, res);
-
-    return NS_OK;
   }
 
 private:
   int mFd;
 };
 
 void
 DroidSocketImpl::OnSocketCanAcceptWithoutBlocking(int aFd)
@@ -457,18 +450,17 @@ DroidSocketImpl::OnSocketCanAcceptWithou
   MOZ_ASSERT(!IsConsumerThread());
   MOZ_ASSERT(!mShuttingDownOnIOThread);
 
   /* When a listening socket is ready for receiving data,
    * we can call |Accept| on it.
    */
 
   RemoveWatchers(READ_WATCHER);
-  GetConsumerThread()->Dispatch(new AcceptRunnable(this, aFd),
-                                NS_DISPATCH_NORMAL);
+  GetConsumerThread()->PostTask(FROM_HERE, new InvokeAcceptTask(this, aFd));
 }
 
 void
 DroidSocketImpl::OnFileCanWriteWithoutBlocking(int aFd)
 {
   if (mConnectionStatus == SOCKET_IS_CONNECTED) {
     OnSocketCanSendWithoutBlocking(aFd);
   } else if (mConnectionStatus == SOCKET_IS_CONNECTING) {
@@ -502,19 +494,18 @@ DroidSocketImpl::OnSocketCanConnectWitho
   MOZ_ASSERT(!mShuttingDownOnIOThread);
 
   /* We follow Posix behaviour here: Connect operations are
    * complete once we can write to the connecting socket.
    */
 
   mConnectionStatus = SOCKET_IS_CONNECTED;
 
-  GetConsumerThread()->Dispatch(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
-    NS_DISPATCH_NORMAL);
+  GetConsumerThread()->PostTask(
+    FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
 
   AddWatchers(READ_WATCHER, true);
   if (HasPendingData()) {
     AddWatchers(WRITE_WATCHER, false);
   }
 }
 
 // |DataSocketIO|
@@ -532,54 +523,51 @@ DroidSocketImpl::QueryReceiveBuffer(
 
   return NS_OK;
 }
 
 /**
  * |ReceiveRunnable| transfers data received on the I/O thread
  * to an instance of |BluetoothSocket| on the consumer thread.
  */
-class DroidSocketImpl::ReceiveRunnable final
-  : public SocketIORunnable<DroidSocketImpl>
+class DroidSocketImpl::ReceiveTask final : public SocketTask<DroidSocketImpl>
 {
 public:
-  ReceiveRunnable(DroidSocketImpl* aIO, UnixSocketBuffer* aBuffer)
-    : SocketIORunnable<DroidSocketImpl>(aIO)
+  ReceiveTask(DroidSocketImpl* aIO, UnixSocketBuffer* aBuffer)
+    : SocketTask<DroidSocketImpl>(aIO)
     , mBuffer(aBuffer)
   { }
 
-  NS_IMETHOD Run() override
+  void Run() override
   {
-    DroidSocketImpl* io = SocketIORunnable<DroidSocketImpl>::GetIO();
+    DroidSocketImpl* io = SocketTask<DroidSocketImpl>::GetIO();
 
     MOZ_ASSERT(io->IsConsumerThread());
 
     if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
       // Since we've already explicitly closed and the close
       // happened before this, this isn't really an error.
-      return NS_OK;
+      return;
     }
 
     BluetoothSocket* bluetoothSocket = io->GetBluetoothSocket();
     MOZ_ASSERT(bluetoothSocket);
 
     bluetoothSocket->ReceiveSocketData(mBuffer);
-
-    return NS_OK;
   }
 
 private:
   nsAutoPtr<UnixSocketBuffer> mBuffer;
 };
 
 void
 DroidSocketImpl::ConsumeBuffer()
 {
-  GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
-                                NS_DISPATCH_NORMAL);
+  GetConsumerThread()->PostTask(FROM_HERE,
+                                new ReceiveTask(this, mBuffer.forget()));
 }
 
 void
 DroidSocketImpl::DiscardBuffer()
 {
   // Nothing to do.
 }
 
@@ -646,24 +634,24 @@ private:
 };
 
 nsresult
 BluetoothSocket::Connect(const nsAString& aDeviceAddress,
                          const BluetoothUuid& aServiceUuid,
                          BluetoothSocketType aType,
                          int aChannel,
                          bool aAuth, bool aEncrypt,
-                         nsIThread* aConsumerThread,
+                         MessageLoop* aConsumerLoop,
                          MessageLoop* aIOLoop)
 {
   MOZ_ASSERT(!mImpl);
 
   SetConnectionStatus(SOCKET_CONNECTING);
 
-  mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this);
+  mImpl = new DroidSocketImpl(aConsumerLoop, aIOLoop, this);
 
   BluetoothSocketResultHandler* res = new ConnectSocketResultHandler(mImpl);
   SetCurrentResultHandler(res);
 
   sBluetoothSocketInterface->Connect(
     aDeviceAddress, aType,
     aServiceUuid.mUuid, aChannel,
     aEncrypt, aAuth, res);
@@ -673,24 +661,18 @@ BluetoothSocket::Connect(const nsAString
 
 nsresult
 BluetoothSocket::Connect(const nsAString& aDeviceAddress,
                          const BluetoothUuid& aServiceUuid,
                          BluetoothSocketType aType,
                          int aChannel,
                          bool aAuth, bool aEncrypt)
 {
-  nsIThread* consumerThread = nullptr;
-  nsresult rv = NS_GetCurrentThread(&consumerThread);
-  if (NS_FAILED(rv)) {
-    return rv;
-  }
-
   return Connect(aDeviceAddress, aServiceUuid, aType, aChannel, aAuth,
-                 aEncrypt, consumerThread, XRE_GetIOMessageLoop());
+                 aEncrypt, MessageLoop::current(), XRE_GetIOMessageLoop());
 }
 
 class ListenResultHandler final : public BluetoothSocketResultHandler
 {
 public:
   ListenResultHandler(DroidSocketImpl* aImpl)
   : mImpl(aImpl)
   {
@@ -716,24 +698,24 @@ private:
 };
 
 nsresult
 BluetoothSocket::Listen(const nsAString& aServiceName,
                         const BluetoothUuid& aServiceUuid,
                         BluetoothSocketType aType,
                         int aChannel,
                         bool aAuth, bool aEncrypt,
-                        nsIThread* aConsumerThread,
+                        MessageLoop* aConsumerLoop,
                         MessageLoop* aIOLoop)
 {
   MOZ_ASSERT(!mImpl);
 
   SetConnectionStatus(SOCKET_LISTENING);
 
-  mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this);
+  mImpl = new DroidSocketImpl(aConsumerLoop, aIOLoop, this);
 
   BluetoothSocketResultHandler* res = new ListenResultHandler(mImpl);
   SetCurrentResultHandler(res);
 
   sBluetoothSocketInterface->Listen(
     aType,
     aServiceName, aServiceUuid.mUuid, aChannel,
     aEncrypt, aAuth, res);
@@ -743,24 +725,18 @@ BluetoothSocket::Listen(const nsAString&
 
 nsresult
 BluetoothSocket::Listen(const nsAString& aServiceName,
                         const BluetoothUuid& aServiceUuid,
                         BluetoothSocketType aType,
                         int aChannel,
                         bool aAuth, bool aEncrypt)
 {
-  nsIThread* consumerThread = nullptr;
-  nsresult rv = NS_GetCurrentThread(&consumerThread);
-  if (NS_FAILED(rv)) {
-    return rv;
-  }
-
   return Listen(aServiceName, aServiceUuid, aType, aChannel, aAuth, aEncrypt,
-                consumerThread, XRE_GetIOMessageLoop());
+                MessageLoop::current(), XRE_GetIOMessageLoop());
 }
 
 void
 BluetoothSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
 {
   MOZ_ASSERT(mObserver);
 
   mObserver->ReceiveSocketData(this, aBuffer);
--- a/dom/bluetooth/bluedroid/BluetoothSocket.h
+++ b/dom/bluetooth/bluedroid/BluetoothSocket.h
@@ -6,17 +6,16 @@
 
 #ifndef mozilla_dom_bluetooth_BluetoothSocket_h
 #define mozilla_dom_bluetooth_BluetoothSocket_h
 
 #include "BluetoothCommon.h"
 #include "mozilla/ipc/DataSocket.h"
 
 class MessageLoop;
-class nsIThread;
 
 BEGIN_BLUETOOTH_NAMESPACE
 
 class BluetoothSocketObserver;
 class BluetoothSocketResultHandler;
 class DroidSocketImpl;
 
 class BluetoothSocket final : public mozilla::ipc::DataSocket
@@ -24,31 +23,31 @@ class BluetoothSocket final : public moz
 public:
   BluetoothSocket(BluetoothSocketObserver* aObserver);
 
   nsresult Connect(const nsAString& aDeviceAddress,
                    const BluetoothUuid& aServiceUuid,
                    BluetoothSocketType aType,
                    int aChannel,
                    bool aAuth, bool aEncrypt,
-                   nsIThread* aConsumerThread,
+                   MessageLoop* aConsumerLoop,
                    MessageLoop* aIOLoop);
 
   nsresult Connect(const nsAString& aDeviceAddress,
                    const BluetoothUuid& aServiceUuid,
                    BluetoothSocketType aType,
                    int aChannel,
                    bool aAuth, bool aEncrypt);
 
   nsresult Listen(const nsAString& aServiceName,
                   const BluetoothUuid& aServiceUuid,
                   BluetoothSocketType aType,
                   int aChannel,
                   bool aAuth, bool aEncrypt,
-                  nsIThread* aConsumerThread,
+                  MessageLoop* aConsumerLoop,
                   MessageLoop* aIOLoop);
 
   nsresult Listen(const nsAString& aServiceName,
                   const BluetoothUuid& aServiceUuid,
                   BluetoothSocketType aType,
                   int aChannel,
                   bool aAuth, bool aEncrypt);
 
--- a/dom/bluetooth/bluez/BluetoothSocket.cpp
+++ b/dom/bluetooth/bluez/BluetoothSocket.cpp
@@ -3,19 +3,17 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "BluetoothSocket.h"
 #include <fcntl.h>
 #include "BluetoothSocketObserver.h"
 #include "BluetoothUnixSocketConnector.h"
-#include "mozilla/unused.h"
-#include "nsTArray.h"
-#include "nsThreadUtils.h"
+#include "mozilla/RefPtr.h"
 #include "nsXULAppAPI.h"
 
 using namespace mozilla::ipc;
 
 BEGIN_BLUETOOTH_NAMESPACE
 
 static const size_t MAX_READ_SIZE = 1 << 16;
 
@@ -23,17 +21,17 @@ static const size_t MAX_READ_SIZE = 1 <<
 // BluetoothSocketIO
 //
 
 class BluetoothSocket::BluetoothSocketIO final
   : public UnixSocketWatcher
   , public DataSocketIO
 {
 public:
-  BluetoothSocketIO(nsIThread* aConsumerThread,
+  BluetoothSocketIO(MessageLoop* aConsumerLoop,
                     MessageLoop* aIOLoop,
                     BluetoothSocket* aConsumer,
                     UnixSocketConnector* aConnector);
   ~BluetoothSocketIO();
 
   void GetSocketAddr(nsAString& aAddrStr) const;
 
   BluetoothSocket* GetBluetoothSocket();
@@ -85,17 +83,17 @@ public:
 
   bool IsShutdownOnConsumerThread() const override;
   bool IsShutdownOnIOThread() const override;
 
   void ShutdownOnConsumerThread() override;
   void ShutdownOnIOThread() override;
 
 private:
-  class ReceiveRunnable;
+  class ReceiveTask;
 
   void FireSocketError();
 
   /**
    * Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
    * directly from consumer thread. All non-consumer-thread accesses should
    * happen with mIO as container.
    */
@@ -129,22 +127,22 @@ private:
 
   /**
    * I/O buffer for received data
    */
   nsAutoPtr<UnixSocketRawData> mBuffer;
 };
 
 BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO(
-  nsIThread* aConsumerThread,
+  MessageLoop* aConsumerLoop,
   MessageLoop* aIOLoop,
   BluetoothSocket* aConsumer,
   UnixSocketConnector* aConnector)
   : UnixSocketWatcher(aIOLoop)
-  , DataSocketIO(aConsumerThread)
+  , DataSocketIO(aConsumerLoop)
   , mConsumer(aConsumer)
   , mConnector(aConnector)
   , mShuttingDownOnIOThread(false)
   , mAddressLength(0)
   , mDelayedConnectTask(nullptr)
 {
   MOZ_ASSERT(mConsumer);
   MOZ_ASSERT(mConnector);
@@ -276,19 +274,18 @@ BluetoothSocket::BluetoothSocketIO::Send
 }
 
 void
 BluetoothSocket::BluetoothSocketIO::OnConnected()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
 
-  GetConsumerThread()->Dispatch(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
-    NS_DISPATCH_NORMAL);
+  GetConsumerThread()->PostTask(
+    FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
 
   AddWatchers(READ_WATCHER, true);
   if (HasPendingData()) {
     AddWatchers(WRITE_WATCHER, false);
   }
 }
 
 void
@@ -326,19 +323,18 @@ BluetoothSocket::BluetoothSocketIO::OnSo
   if (NS_WARN_IF(NS_FAILED(rv))) {
     FireSocketError();
     return;
   }
 
   Close();
   SetSocket(fd, SOCKET_IS_CONNECTED);
 
-  GetConsumerThread()->Dispatch(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
-    NS_DISPATCH_NORMAL);
+  GetConsumerThread()->PostTask(
+    FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
 
   AddWatchers(READ_WATCHER, true);
   if (HasPendingData()) {
     AddWatchers(WRITE_WATCHER, false);
   }
 }
 
 void
@@ -377,20 +373,18 @@ void
 BluetoothSocket::BluetoothSocketIO::FireSocketError()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
 
   // Clean up watchers, statuses, fds
   Close();
 
   // Tell the consumer thread we've errored
-  GetConsumerThread()->Dispatch(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
-    NS_DISPATCH_NORMAL);
-
+  GetConsumerThread()->PostTask(
+    FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_ERROR));
 }
 
 // |DataSocketIO|
 
 nsresult
 BluetoothSocket::BluetoothSocketIO::QueryReceiveBuffer(
   UnixSocketIOBuffer** aBuffer)
 {
@@ -400,57 +394,55 @@ BluetoothSocket::BluetoothSocketIO::Quer
     mBuffer = new UnixSocketRawData(MAX_READ_SIZE);
   }
   *aBuffer = mBuffer.get();
 
   return NS_OK;
 }
 
 /**
- * |ReceiveRunnable| transfers data received on the I/O thread
+ * |ReceiveTask| transfers data received on the I/O thread
  * to an instance of |BluetoothSocket| on the consumer thread.
  */
-class BluetoothSocket::BluetoothSocketIO::ReceiveRunnable final
-  : public SocketIORunnable<BluetoothSocketIO>
+class BluetoothSocket::BluetoothSocketIO::ReceiveTask final
+  : public SocketTask<BluetoothSocketIO>
 {
 public:
-  ReceiveRunnable(BluetoothSocketIO* aIO, UnixSocketBuffer* aBuffer)
-    : SocketIORunnable<BluetoothSocketIO>(aIO)
+  ReceiveTask(BluetoothSocketIO* aIO, UnixSocketBuffer* aBuffer)
+    : SocketTask<BluetoothSocketIO>(aIO)
     , mBuffer(aBuffer)
   { }
 
-  NS_IMETHOD Run() override
+  void Run() override
   {
-    BluetoothSocketIO* io = SocketIORunnable<BluetoothSocketIO>::GetIO();
+    BluetoothSocketIO* io = SocketTask<BluetoothSocketIO>::GetIO();
 
     MOZ_ASSERT(io->IsConsumerThread());
 
     if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
       // Since we've already explicitly closed and the close
       // happened before this, this isn't really an error.
-      return NS_OK;
+      return;
     }
 
     BluetoothSocket* bluetoothSocket = io->GetBluetoothSocket();
     MOZ_ASSERT(bluetoothSocket);
 
     bluetoothSocket->ReceiveSocketData(mBuffer);
-
-    return NS_OK;
   }
 
 private:
   nsAutoPtr<UnixSocketBuffer> mBuffer;
 };
 
 void
 BluetoothSocket::BluetoothSocketIO::ConsumeBuffer()
 {
-  GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
-                                NS_DISPATCH_NORMAL);
+  GetConsumerThread()->PostTask(FROM_HERE,
+                                new ReceiveTask(this, mBuffer.forget()));
 }
 
 void
 BluetoothSocket::BluetoothSocketIO::DiscardBuffer()
 {
   // Nothing to do.
 }
 
@@ -645,24 +637,24 @@ BluetoothSocket::SendSocketData(const ns
   SendSocketData(new UnixSocketRawData(aStr.BeginReading(), aStr.Length()));
 
   return true;
 }
 
 nsresult
 BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector,
                          int aDelayMs,
-                         nsIThread* aConsumerThread, MessageLoop* aIOLoop)
+                         MessageLoop* aConsumerLoop, MessageLoop* aIOLoop)
 {
   MOZ_ASSERT(aConnector);
-  MOZ_ASSERT(aConsumerThread);
+  MOZ_ASSERT(aConsumerLoop);
   MOZ_ASSERT(aIOLoop);
   MOZ_ASSERT(!mIO);
 
-  mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector);
+  mIO = new BluetoothSocketIO(aConsumerLoop, aIOLoop, this, aConnector);
   SetConnectionStatus(SOCKET_CONNECTING);
 
   if (aDelayMs > 0) {
     DelayedConnectTask* connectTask = new DelayedConnectTask(mIO);
     mIO->SetDelayedConnectTask(connectTask);
     MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs);
   } else {
     aIOLoop->PostTask(FROM_HERE, new ConnectTask(mIO));
@@ -670,52 +662,41 @@ BluetoothSocket::Connect(BluetoothUnixSo
 
   return NS_OK;
 }
 
 nsresult
 BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector,
                          int aDelayMs)
 {
-  nsIThread* consumerThread = nullptr;
-  nsresult rv = NS_GetCurrentThread(&consumerThread);
-  if (NS_FAILED(rv)) {
-    return rv;
-  }
-
-  return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop());
+  return Connect(aConnector, aDelayMs, MessageLoop::current(),
+                 XRE_GetIOMessageLoop());
 }
 
 nsresult
 BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector,
-                        nsIThread* aConsumerThread, MessageLoop* aIOLoop)
+                        MessageLoop* aConsumerLoop, MessageLoop* aIOLoop)
 {
   MOZ_ASSERT(aConnector);
-  MOZ_ASSERT(aConsumerThread);
+  MOZ_ASSERT(aConsumerLoop);
   MOZ_ASSERT(aIOLoop);
   MOZ_ASSERT(!mIO);
 
-  mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector);
+  mIO = new BluetoothSocketIO(aConsumerLoop, aIOLoop, this, aConnector);
   SetConnectionStatus(SOCKET_LISTENING);
 
   aIOLoop->PostTask(FROM_HERE, new ListenTask(mIO));
 
   return NS_OK;
 }
 
 nsresult
 BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector)
 {
-  nsIThread* consumerThread = nullptr;
-  nsresult rv = NS_GetCurrentThread(&consumerThread);
-  if (NS_FAILED(rv)) {
-    return rv;
-  }
-
-  return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop());
+  return Listen(aConnector, MessageLoop::current(), XRE_GetIOMessageLoop());
 }
 
 void
 BluetoothSocket::GetAddress(nsAString& aAddrStr)
 {
   aAddrStr.Truncate();
   if (!mIO || GetConnectionStatus() != SOCKET_CONNECTED) {
     NS_WARNING("No socket currently open!");
--- a/dom/bluetooth/bluez/BluetoothSocket.h
+++ b/dom/bluetooth/bluez/BluetoothSocket.h
@@ -3,23 +3,20 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef mozilla_dom_bluetooth_BluetoothSocket_h
 #define mozilla_dom_bluetooth_BluetoothSocket_h
 
 #include "BluetoothCommon.h"
-#include <stdlib.h>
 #include "mozilla/ipc/DataSocket.h"
 #include "mozilla/ipc/UnixSocketWatcher.h"
-#include "mozilla/RefPtr.h"
 #include "nsAutoPtr.h"
 #include "nsString.h"
-#include "nsThreadUtils.h"
 
 class MessageLoop;
 
 BEGIN_BLUETOOTH_NAMESPACE
 
 class BluetoothSocketObserver;
 class BluetoothUnixSocketConnector;
 
@@ -61,22 +58,22 @@ public:
   bool SendSocketData(const nsACString& aMessage);
 
   /**
    * Starts a task on the socket that will try to connect to a socket in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @param aDelayMs Time delay in milli-seconds.
-   * @param aConsumerThread The socket's consumer thread.
+   * @param aConsumerLoop The socket's consumer thread.
    * @param aIOLoop The socket's I/O thread.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   nsresult Connect(BluetoothUnixSocketConnector* aConnector, int aDelayMs,
-                   nsIThread* aConsumerThread, MessageLoop* aIOLoop);
+                   MessageLoop* aConsumerLoop, MessageLoop* aIOLoop);
 
   /**
    * Starts a task on the socket that will try to connect to a socket in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @param aDelayMs Time delay in milli-seconds.
    * @return NS_OK on success, or an XPCOM error code otherwise.
@@ -84,22 +81,22 @@ public:
   nsresult Connect(BluetoothUnixSocketConnector* aConnector,
                    int aDelayMs = 0);
 
   /**
    * Starts a task on the socket that will try to accept a new connection in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
-   * @param aConsumerThread The socket's consumer thread.
+   * @param aConsumerLoop The socket's consumer thread.
    * @param aIOLoop The socket's I/O thread.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   nsresult Listen(BluetoothUnixSocketConnector* aConnector,
-                  nsIThread* aConsumerThread, MessageLoop* aIOLoop);
+                  MessageLoop* aConsumerLoop, MessageLoop* aIOLoop);
 
   /**
    * Starts a task on the socket that will try to accept a new connection in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
--- a/ipc/bluetooth/BluetoothDaemonConnection.cpp
+++ b/ipc/bluetooth/BluetoothDaemonConnection.cpp
@@ -209,17 +209,17 @@ BluetoothDaemonPDUConsumer::~BluetoothDa
 
 //
 // BluetoothDaemonConnectionIO
 //
 
 class BluetoothDaemonConnectionIO final : public ConnectionOrientedSocketIO
 {
 public:
-  BluetoothDaemonConnectionIO(nsIThread* aConsumerThread,
+  BluetoothDaemonConnectionIO(MessageLoop* aConsumerLoop,
                               MessageLoop* aIOLoop,
                               int aFd, ConnectionStatus aConnectionStatus,
                               UnixSocketConnector* aConnector,
                               BluetoothDaemonConnection* aConnection,
                               BluetoothDaemonPDUConsumer* aConsumer);
 
   // Methods for |DataSocketIO|
   //
@@ -242,24 +242,24 @@ public:
 private:
   BluetoothDaemonConnection* mConnection;
   BluetoothDaemonPDUConsumer* mConsumer;
   nsAutoPtr<BluetoothDaemonPDU> mPDU;
   bool mShuttingDownOnIOThread;
 };
 
 BluetoothDaemonConnectionIO::BluetoothDaemonConnectionIO(
-  nsIThread* aConsumerThread,
+  MessageLoop* aConsumerLoop,
   MessageLoop* aIOLoop,
   int aFd,
   ConnectionStatus aConnectionStatus,
   UnixSocketConnector* aConnector,
   BluetoothDaemonConnection* aConnection,
   BluetoothDaemonPDUConsumer* aConsumer)
-  : ConnectionOrientedSocketIO(aConsumerThread,
+  : ConnectionOrientedSocketIO(aConsumerLoop,
                                aIOLoop,
                                aFd,
                                aConnectionStatus,
                                aConnector)
   , mConnection(aConnection)
   , mConsumer(aConsumer)
   , mShuttingDownOnIOThread(false)
 {
@@ -356,26 +356,26 @@ BluetoothDaemonConnection::BluetoothDaem
 
 BluetoothDaemonConnection::~BluetoothDaemonConnection()
 { }
 
 // |ConnectionOrientedSocket|
 
 nsresult
 BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector,
-                                         nsIThread* aConsumerThread,
+                                         MessageLoop* aConsumerLoop,
                                          MessageLoop* aIOLoop,
                                          ConnectionOrientedSocketIO*& aIO)
 {
   MOZ_ASSERT(!mIO);
 
   SetConnectionStatus(SOCKET_CONNECTING);
 
   mIO = new BluetoothDaemonConnectionIO(
-    aConsumerThread, aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
+    aConsumerLoop, aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
     aConnector, this, mPDUConsumer);
   aIO = mIO;
 
   return NS_OK;
 }
 
 // |DataSocket|
 
--- a/ipc/bluetooth/BluetoothDaemonConnection.h
+++ b/ipc/bluetooth/BluetoothDaemonConnection.h
@@ -121,17 +121,17 @@ public:
                             BluetoothDaemonConnectionConsumer* aConsumer,
                             int aIndex);
   virtual ~BluetoothDaemonConnection();
 
   // Methods for |ConnectionOrientedSocket|
   //
 
   nsresult PrepareAccept(UnixSocketConnector* aConnector,
-                         nsIThread* aConsumerThread,
+                         MessageLoop* aConsumerLoop,
                          MessageLoop* aIOLoop,
                          ConnectionOrientedSocketIO*& aIO) override;
 
   // Methods for |DataSocket|
   //
 
   void SendSocketData(UnixSocketIOBuffer* aBuffer) override;
 
--- a/ipc/unixsocket/ConnectionOrientedSocket.cpp
+++ b/ipc/unixsocket/ConnectionOrientedSocket.cpp
@@ -10,34 +10,33 @@
 namespace mozilla {
 namespace ipc {
 
 //
 // ConnectionOrientedSocketIO
 //
 
 ConnectionOrientedSocketIO::ConnectionOrientedSocketIO(
-  nsIThread* aConsumerThread,
+  MessageLoop* aConsumerLoop,
   MessageLoop* aIOLoop,
-  int aFd,
-  ConnectionStatus aConnectionStatus,
+  int aFd, ConnectionStatus aConnectionStatus,
   UnixSocketConnector* aConnector)
-  : DataSocketIO(aConsumerThread)
+  : DataSocketIO(aConsumerLoop)
   , UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus)
   , mConnector(aConnector)
   , mPeerAddressLength(0)
 {
   MOZ_ASSERT(mConnector);
 }
 
 ConnectionOrientedSocketIO::ConnectionOrientedSocketIO(
-  nsIThread* aConsumerThread,
+  MessageLoop* aConsumerLoop,
   MessageLoop* aIOLoop,
   UnixSocketConnector* aConnector)
-  : DataSocketIO(aConsumerThread)
+  : DataSocketIO(aConsumerLoop)
   , UnixSocketWatcher(aIOLoop)
   , mConnector(aConnector)
   , mPeerAddressLength(0)
 {
   MOZ_ASSERT(mConnector);
 }
 
 ConnectionOrientedSocketIO::~ConnectionOrientedSocketIO()
@@ -74,19 +73,18 @@ ConnectionOrientedSocketIO::Connect()
   mPeerAddressLength = sizeof(mPeerAddress);
 
   int fd;
   nsresult rv = mConnector->CreateStreamSocket(peerAddress,
                                                &mPeerAddressLength,
                                                fd);
   if (NS_FAILED(rv)) {
     // Tell the consumer thread we've errored
-    GetConsumerThread()->Dispatch(
-      new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
-      NS_DISPATCH_NORMAL);
+    GetConsumerThread()->PostTask(
+      FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_ERROR));
     return NS_ERROR_FAILURE;
   }
 
   SetFd(fd);
 
   // calls OnConnected() on success, or OnError() otherwise
   rv = UnixSocketWatcher::Connect(peerAddress, mPeerAddressLength);
 
@@ -142,19 +140,18 @@ ConnectionOrientedSocketIO::OnSocketCanS
 }
 
 void
 ConnectionOrientedSocketIO::OnConnected()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
 
-  GetConsumerThread()->Dispatch(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
-    NS_DISPATCH_NORMAL);
+  GetConsumerThread()->PostTask(
+    FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
 
   AddWatchers(READ_WATCHER, true);
   if (HasPendingData()) {
     AddWatchers(WRITE_WATCHER, false);
   }
 }
 
 void
@@ -171,19 +168,18 @@ ConnectionOrientedSocketIO::OnError(cons
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
 
   UnixFdWatcher::OnError(aFunction, aErrno);
 
   // Clean up watchers, status, fd
   Close();
 
   // Tell the consumer thread we've errored
-  GetConsumerThread()->Dispatch(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
-    NS_DISPATCH_NORMAL);
+  GetConsumerThread()->PostTask(
+    FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_ERROR));
 }
 
 //
 // ConnectionOrientedSocket
 //
 
 ConnectionOrientedSocket::~ConnectionOrientedSocket()
 { }
--- a/ipc/unixsocket/ConnectionOrientedSocket.h
+++ b/ipc/unixsocket/ConnectionOrientedSocket.h
@@ -48,35 +48,35 @@ public:
   void OnListening() final;
   void OnConnected() final;
   void OnError(const char* aFunction, int aErrno) final;
 
 protected:
   /**
    * Constructs an instance of |ConnectionOrientedSocketIO|
    *
-   * @param aConsumerThread The socket's consumer thread.
+   * @param aConsumerLoop The socket's consumer thread.
    * @param aIOLoop The socket's I/O loop.
    * @param aFd The socket file descriptor.
    * @param aConnectionStatus The connection status for |aFd|.
    * @param aConnector Connector object for socket-type-specific methods.
    */
-  ConnectionOrientedSocketIO(nsIThread* aConsumerThread,
+  ConnectionOrientedSocketIO(MessageLoop* aConsumerLoop,
                              MessageLoop* aIOLoop,
                              int aFd, ConnectionStatus aConnectionStatus,
                              UnixSocketConnector* aConnector);
 
   /**
    * Constructs an instance of |ConnectionOrientedSocketIO|
    *
-   * @param aConsumerThread The socket's consumer thread.
+   * @param aConsumerLoop The socket's consumer thread.
    * @param aIOLoop The socket's I/O loop.
    * @param aConnector Connector object for socket-type-specific methods.
    */
-  ConnectionOrientedSocketIO(nsIThread* aConsumerThread,
+  ConnectionOrientedSocketIO(MessageLoop* aConsumerLoop,
                              MessageLoop* aIOLoop,
                              UnixSocketConnector* aConnector);
 
 private:
   /**
    * Connector object used to create the connection we are currently using.
    */
   nsAutoPtr<UnixSocketConnector> mConnector;
@@ -96,23 +96,23 @@ class ConnectionOrientedSocket : public 
 {
 public:
   /**
    * Prepares an instance of |ConnectionOrientedSocket| in DISCONNECTED
    * state for accepting a connection. Consumer-thread only.
    *
    * @param aConnector The new connector object, owned by the
    *                   connection-oriented socket.
-   * @param aConsumerThread The socket's consumer thread.
+   * @param aConsumerLoop The socket's consumer thread.
    * @param aIOLoop The socket's I/O thread.
    * @param[out] aIO, Returns an instance of |ConnectionOrientedSocketIO|.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   virtual nsresult PrepareAccept(UnixSocketConnector* aConnector,
-                                 nsIThread* aConsumerThread,
+                                 MessageLoop* aConsumerLoop,
                                  MessageLoop* aIOLoop,
                                  ConnectionOrientedSocketIO*& aIO) = 0;
 
 protected:
   virtual ~ConnectionOrientedSocket();
 };
 
 }
--- a/ipc/unixsocket/DataSocket.cpp
+++ b/ipc/unixsocket/DataSocket.cpp
@@ -45,33 +45,33 @@ ssize_t
 DataSocketIO::ReceiveData(int aFd)
 {
   MOZ_ASSERT(aFd >= 0);
 
   UnixSocketIOBuffer* incoming;
   nsresult rv = QueryReceiveBuffer(&incoming);
   if (NS_FAILED(rv)) {
     /* an error occured */
-    GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
-                                  NS_DISPATCH_NORMAL);
+    GetConsumerThread()->PostTask(FROM_HERE,
+                                  new SocketRequestClosingTask(this));
     return -1;
   }
 
   ssize_t res = incoming->Receive(aFd);
   if (res < 0) {
     /* an I/O error occured */
     DiscardBuffer();
-    GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
-                                  NS_DISPATCH_NORMAL);
+    GetConsumerThread()->PostTask(FROM_HERE,
+                                  new SocketRequestClosingTask(this));
     return -1;
   } else if (!res) {
     /* EOF or peer shut down sending */
     DiscardBuffer();
-    GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
-                                  NS_DISPATCH_NORMAL);
+    GetConsumerThread()->PostTask(FROM_HERE,
+                                  new SocketRequestClosingTask(this));
     return 0;
   }
 
 #ifdef MOZ_TASK_TRACER
   /* Make unix socket creation events to be the source events of TaskTracer,
    * and originate the rest correlation tasks from here.
    */
   AutoSourceEvent taskTracerEvent(SourceEventType::Unixsocket);
@@ -88,34 +88,34 @@ DataSocketIO::SendPendingData(int aFd)
   MOZ_ASSERT(aFd >= 0);
 
   while (HasPendingData()) {
     UnixSocketIOBuffer* outgoing = mOutgoingQ.ElementAt(0);
 
     ssize_t res = outgoing->Send(aFd);
     if (res < 0) {
       /* an I/O error occured */
-      GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
-                                    NS_DISPATCH_NORMAL);
+      GetConsumerThread()->PostTask(FROM_HERE,
+                                    new SocketRequestClosingTask(this));
       return NS_ERROR_FAILURE;
     } else if (!res && outgoing->GetSize()) {
       /* I/O is currently blocked; try again later */
       return NS_OK;
     }
     if (!outgoing->GetSize()) {
       mOutgoingQ.RemoveElementAt(0);
       delete outgoing;
     }
   }
 
   return NS_OK;
 }
 
-DataSocketIO::DataSocketIO(nsIThread* aConsumerThread)
-  : SocketIOBase(aConsumerThread)
+DataSocketIO::DataSocketIO(MessageLoop* aConsumerLoop)
+  : SocketIOBase(aConsumerLoop)
 { }
 
 //
 // DataSocket
 //
 
 DataSocket::~DataSocket()
 { }
--- a/ipc/unixsocket/DataSocket.h
+++ b/ipc/unixsocket/DataSocket.h
@@ -5,16 +5,17 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  */
 
 #ifndef mozilla_ipc_datasocket_h
 #define mozilla_ipc_datasocket_h
 
 #include "mozilla/ipc/SocketBase.h"
+#include "nsTArray.h"
 
 namespace mozilla {
 namespace ipc {
 
 //
 // DataSocket
 //
 
@@ -85,17 +86,17 @@ public:
   void EnqueueData(UnixSocketIOBuffer* aBuffer);
   bool HasPendingData() const;
 
   ssize_t ReceiveData(int aFd);
 
   nsresult SendPendingData(int aFd);
 
 protected:
-  DataSocketIO(nsIThread* aConsumerThread);
+  DataSocketIO(MessageLoop* aConsumerLoop);
 
 private:
   /**
    * Raw data queue. Must be pushed/popped from I/O thread only.
    */
   nsTArray<UnixSocketIOBuffer*> mOutgoingQ;
 };
 
--- a/ipc/unixsocket/ListenSocket.cpp
+++ b/ipc/unixsocket/ListenSocket.cpp
@@ -22,17 +22,17 @@ namespace ipc {
 
 class ListenSocketIO final
   : public UnixSocketWatcher
   , public SocketIOBase
 {
 public:
   class ListenTask;
 
-  ListenSocketIO(nsIThread* aConsumerThread,
+  ListenSocketIO(MessageLoop* aConsumerLoop,
                  MessageLoop* aIOLoop,
                  ListenSocket* aListenSocket,
                  UnixSocketConnector* aConnector);
   ~ListenSocketIO();
 
   UnixSocketConnector* GetConnector() const;
 
   // Task callback methods
@@ -90,22 +90,22 @@ private:
   /**
    * Address structure of the socket currently in use
    */
   struct sockaddr_storage mAddress;
 
   ConnectionOrientedSocketIO* mCOSocketIO;
 };
 
-ListenSocketIO::ListenSocketIO(nsIThread* aConsumerThread,
+ListenSocketIO::ListenSocketIO(MessageLoop* aConsumerLoop,
                                MessageLoop* aIOLoop,
                                ListenSocket* aListenSocket,
                                UnixSocketConnector* aConnector)
   : UnixSocketWatcher(aIOLoop)
-  , SocketIOBase(aConsumerThread)
+  , SocketIOBase(aConsumerLoop)
   , mListenSocket(aListenSocket)
   , mConnector(aConnector)
   , mShuttingDownOnIOThread(false)
   , mAddressLength(0)
   , mCOSocketIO(nullptr)
 {
   MOZ_ASSERT(mListenSocket);
   MOZ_ASSERT(mConnector);
@@ -163,19 +163,18 @@ void
 ListenSocketIO::OnListening()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
 
   AddWatchers(READ_WATCHER, true);
 
   /* We signal a successful 'connection' to a local address for listening. */
-  GetConsumerThread()->Dispatch(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
-    NS_DISPATCH_NORMAL);
+  GetConsumerThread()->PostTask(
+    FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
 }
 
 void
 ListenSocketIO::OnError(const char* aFunction, int aErrno)
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
 
   UnixFdWatcher::OnError(aFunction, aErrno);
@@ -186,19 +185,18 @@ void
 ListenSocketIO::FireSocketError()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
 
   // Clean up watchers, statuses, fds
   Close();
 
   // Tell the consumer thread we've errored
-  GetConsumerThread()->Dispatch(
-    new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
-    NS_DISPATCH_NORMAL);
+  GetConsumerThread()->PostTask(
+    FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_ERROR));
 }
 
 void
 ListenSocketIO::OnSocketCanAcceptWithoutBlocking()
 {
   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
   MOZ_ASSERT(mCOSocketIO);
@@ -306,46 +304,41 @@ ListenSocket::ListenSocket(ListenSocketC
 
 ListenSocket::~ListenSocket()
 {
   MOZ_ASSERT(!mIO);
 }
 
 nsresult
 ListenSocket::Listen(UnixSocketConnector* aConnector,
-                     nsIThread* aConsumerThread,
+                     MessageLoop* aConsumerLoop,
                      MessageLoop* aIOLoop,
                      ConnectionOrientedSocket* aCOSocket)
 {
   MOZ_ASSERT(!mIO);
 
-  mIO = new ListenSocketIO(aConsumerThread, aIOLoop, this, aConnector);
+  mIO = new ListenSocketIO(aConsumerLoop, aIOLoop, this, aConnector);
 
   // Prepared I/O object, now start listening.
   nsresult rv = Listen(aCOSocket);
   if (NS_FAILED(rv)) {
     delete mIO;
     mIO = nullptr;
     return rv;
   }
 
   return NS_OK;
 }
 
 nsresult
 ListenSocket::Listen(UnixSocketConnector* aConnector,
                      ConnectionOrientedSocket* aCOSocket)
 {
-  nsIThread* consumerThread = nullptr;
-  nsresult rv = NS_GetCurrentThread(&consumerThread);
-  if (NS_FAILED(rv)) {
-    return rv;
-  }
-
-  return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop(), aCOSocket);
+  return Listen(aConnector, MessageLoop::current(), XRE_GetIOMessageLoop(),
+                aCOSocket);
 }
 
 nsresult
 ListenSocket::Listen(ConnectionOrientedSocket* aCOSocket)
 {
   MOZ_ASSERT(aCOSocket);
   MOZ_ASSERT(mIO);
 
--- a/ipc/unixsocket/ListenSocket.h
+++ b/ipc/unixsocket/ListenSocket.h
@@ -6,17 +6,16 @@
 
 #ifndef mozilla_ipc_listensocket_h
 #define mozilla_ipc_listensocket_h
 
 #include "nsString.h"
 #include "mozilla/ipc/SocketBase.h"
 
 class MessageLoop;
-class nsIThread;
 
 namespace mozilla {
 namespace ipc {
 
 class ConnectionOrientedSocket;
 class ListenSocketConsumer;
 class ListenSocketIO;
 class UnixSocketConnector;
@@ -32,24 +31,24 @@ public:
    */
   ListenSocket(ListenSocketConsumer* aConsumer, int aIndex);
 
   /**
    * Starts a task on the socket that will try to accept a new connection
    * in a non-blocking manner.
    *
    * @param aConnector Connector object for socket-type-specific functions
-   * @param aConsumerThread The socket's consumer thread.
+   * @param aConsumerLoop The socket's consumer thread.
    * @param aIOLoop The socket's I/O thread.
    * @param aCOSocket The connection-oriented socket for handling the
    *                  accepted connection.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   nsresult Listen(UnixSocketConnector* aConnector,
-                  nsIThread* aConsumerThread,
+                  MessageLoop* aConsumerLoop,
                   MessageLoop* aIOLoop,
                   ConnectionOrientedSocket* aCOSocket);
 
   /**
    * Starts a task on the socket that will try to accept a new connection
    * in a non-blocking manner.
    *
    * @param aConnector Connector object for socket-type-specific functions
--- a/ipc/unixsocket/SocketBase.cpp
+++ b/ipc/unixsocket/SocketBase.cpp
@@ -249,123 +249,112 @@ SocketBase::SetConnectionStatus(SocketCo
 {
   mConnectionStatus = aConnectionStatus;
 }
 
 //
 // SocketIOBase
 //
 
-SocketIOBase::SocketIOBase(nsIThread* aConsumerThread)
-  : mConsumerThread(aConsumerThread)
+SocketIOBase::SocketIOBase(MessageLoop* aConsumerLoop)
+  : mConsumerLoop(aConsumerLoop)
 {
-  MOZ_ASSERT(mConsumerThread);
+  MOZ_ASSERT(mConsumerLoop);
 }
 
 SocketIOBase::~SocketIOBase()
 { }
 
-nsIThread*
+MessageLoop*
 SocketIOBase::GetConsumerThread() const
 {
-  return mConsumerThread;
+  return mConsumerLoop;
 }
 
 bool
 SocketIOBase::IsConsumerThread() const
 {
-  nsIThread* thread = nullptr;
-  if (NS_FAILED(NS_GetCurrentThread(&thread))) {
-    return false;
-  }
-  return thread == GetConsumerThread();
+  return GetConsumerThread() == MessageLoop::current();
 }
 
 //
-// SocketIOEventRunnable
+// SocketEventTask
 //
 
-SocketIOEventRunnable::SocketIOEventRunnable(SocketIOBase* aIO,
-                                             SocketEvent aEvent)
-  : SocketIORunnable<SocketIOBase>(aIO)
+SocketEventTask::SocketEventTask(SocketIOBase* aIO, SocketEvent aEvent)
+  : SocketTask<SocketIOBase>(aIO)
   , mEvent(aEvent)
 { }
 
-NS_METHOD
-SocketIOEventRunnable::Run()
+void
+SocketEventTask::Run()
 {
-  SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO();
+  SocketIOBase* io = SocketTask<SocketIOBase>::GetIO();
 
   MOZ_ASSERT(io->IsConsumerThread());
 
   if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
     // Since we've already explicitly closed and the close
     // happened before this, this isn't really an error.
-    return NS_OK;
+    return;
   }
 
   SocketBase* socketBase = io->GetSocketBase();
   MOZ_ASSERT(socketBase);
 
   if (mEvent == CONNECT_SUCCESS) {
     socketBase->NotifySuccess();
   } else if (mEvent == CONNECT_ERROR) {
     socketBase->NotifyError();
   } else if (mEvent == DISCONNECT) {
     socketBase->NotifyDisconnect();
   }
-
-  return NS_OK;
 }
 
 //
-// SocketIORequestClosingRunnable
+// SocketRequestClosingTask
 //
 
-SocketIORequestClosingRunnable::SocketIORequestClosingRunnable(
+SocketRequestClosingTask::SocketRequestClosingTask(
   SocketIOBase* aIO)
-  : SocketIORunnable<SocketIOBase>(aIO)
+  : SocketTask<SocketIOBase>(aIO)
 { }
 
-NS_METHOD
-SocketIORequestClosingRunnable::Run()
+void
+SocketRequestClosingTask::Run()
 {
-  SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO();
+  SocketIOBase* io = SocketTask<SocketIOBase>::GetIO();
 
   MOZ_ASSERT(io->IsConsumerThread());
 
   if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
     // Since we've already explicitly closed and the close
     // happened before this, this isn't really an error.
-    return NS_OK;
+    return;
   }
 
   SocketBase* socketBase = io->GetSocketBase();
   MOZ_ASSERT(socketBase);
 
   socketBase->Close();
-
-  return NS_OK;
 }
 
 //
-// SocketIODeleteInstanceRunnable
+// SocketDeleteInstanceTask
 //
 
-SocketIODeleteInstanceRunnable::SocketIODeleteInstanceRunnable(
+SocketDeleteInstanceTask::SocketDeleteInstanceTask(
   SocketIOBase* aIO)
   : mIO(aIO)
 { }
 
-NS_METHOD
-SocketIODeleteInstanceRunnable::Run()
+void
+SocketDeleteInstanceTask::Run()
 {
   mIO = nullptr; // delete instance
-
-  return NS_OK;
 }
 
 //
 // SocketIOShutdownTask
 //
 
 SocketIOShutdownTask::SocketIOShutdownTask(SocketIOBase* aIO)
   : SocketIOTask<SocketIOBase>(aIO)
@@ -380,14 +369,14 @@ SocketIOShutdownTask::Run()
   MOZ_ASSERT(!io->IsShutdownOnIOThread());
 
   // At this point, there should be no new events on the I/O thread
   // after this one with the possible exception of an accept task,
   // which ShutdownOnIOThread will cancel for us. We are now fully
   // shut down, so we can send a message to the consumer thread to
   // delete |io| safely knowing that it's not reference any longer.
   io->ShutdownOnIOThread();
-  io->GetConsumerThread()->Dispatch(new SocketIODeleteInstanceRunnable(io),
-                                    NS_DISPATCH_NORMAL);
+  io->GetConsumerThread()->PostTask(FROM_HERE,
+                                    new SocketDeleteInstanceTask(io));
 }
 
 }
 }
--- a/ipc/unixsocket/SocketBase.h
+++ b/ipc/unixsocket/SocketBase.h
@@ -6,18 +6,16 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  */
 
 #ifndef mozilla_ipc_SocketBase_h
 #define mozilla_ipc_SocketBase_h
 
 #include "base/message_loop.h"
 #include "nsAutoPtr.h"
-#include "nsTArray.h"
-#include "nsThreadUtils.h"
 
 namespace mozilla {
 namespace ipc {
 
 //
 // UnixSocketBuffer
 //
 
@@ -377,104 +375,103 @@ public:
    */
   virtual void ShutdownOnConsumerThread() = 0;
 
   /**
    * Returns the consumer thread.
    *
    * @return A pointer to the consumer thread.
    */
-  nsIThread* GetConsumerThread() const;
+  MessageLoop* GetConsumerThread() const;
 
   /**
-   * @return True if the current thread is thre consumer thread, or false
+   * @return True if the current thread is the consumer thread, or false
    *         otherwise.
    */
   bool IsConsumerThread() const;
 
 protected:
-  SocketIOBase(nsIThread* nsConsumerThread);
+  SocketIOBase(MessageLoop* aConsumerLoop);
 
 private:
-  nsCOMPtr<nsIThread> mConsumerThread;
+  MessageLoop* mConsumerLoop;
 };
 
 //
-// Socket I/O runnables
+// Socket tasks
 //
 
-/* |SocketIORunnable| is a runnable for sending a message from
+/* |SocketTask| is a task for sending a message from
  * the I/O thread to the consumer thread.
  */
 template <typename T>
-class SocketIORunnable : public nsRunnable
+class SocketTask : public Task
 {
 public:
-  virtual ~SocketIORunnable()
+  virtual ~SocketTask()
   { }
 
   T* GetIO() const
   {
     return mIO;
   }
 
 protected:
-  SocketIORunnable(T* aIO)
-  : mIO(aIO)
+  SocketTask(T* aIO)
+    : mIO(aIO)
   {
     MOZ_ASSERT(aIO);
   }
 
 private:
   T* mIO;
 };
 
 /**
- * |SocketIOEventRunnable| reports the connection state on the
+ * |SocketEventTask| reports the connection state on the
  * I/O thread back to the consumer thread.
  */
-class SocketIOEventRunnable final : public SocketIORunnable<SocketIOBase>
+class SocketEventTask final : public SocketTask<SocketIOBase>
 {
 public:
   enum SocketEvent {
     CONNECT_SUCCESS,
     CONNECT_ERROR,
     DISCONNECT
   };
 
-  SocketIOEventRunnable(SocketIOBase* aIO, SocketEvent aEvent);
+  SocketEventTask(SocketIOBase* aIO, SocketEvent aEvent);
 
-  NS_IMETHOD Run() override;
+  void Run() override;
 
 private:
   SocketEvent mEvent;
 };
 
 /**
- * |SocketIORequestClosingRunnable| closes an instance of |SocketBase|
- * to the consumer thread.
+ * |SocketRequestClosingTask| closes an instance of |SocketBase|
+ * on the consumer thread.
  */
-class SocketIORequestClosingRunnable final
-  : public SocketIORunnable<SocketIOBase>
+class SocketRequestClosingTask final : public SocketTask<SocketIOBase>
 {
 public:
-  SocketIORequestClosingRunnable(SocketIOBase* aIO);
+  SocketRequestClosingTask(SocketIOBase* aIO);
 
-  NS_IMETHOD Run() override;
+  void Run() override;
 };
 
 /**
- * |SocketIODeleteInstanceRunnable| deletes an object on the consumer thread.
+ * |SocketDeleteInstanceTask| deletes an object on the consumer thread.
  */
-class SocketIODeleteInstanceRunnable final : public nsRunnable
+class SocketDeleteInstanceTask final : public Task
 {
 public:
-  SocketIODeleteInstanceRunnable(SocketIOBase* aIO);
+  SocketDeleteInstanceTask(SocketIOBase* aIO);
 
-  NS_IMETHOD Run() override;
+  void Run() override;
 
 private:
   nsAutoPtr<SocketIOBase> mIO;
 };
 
 //
 // Socket I/O tasks
 //
--- a/ipc/unixsocket/StreamSocket.cpp
+++ b/ipc/unixsocket/StreamSocket.cpp
@@ -20,25 +20,25 @@ namespace ipc {
 // StreamSocketIO
 //
 
 class StreamSocketIO final : public ConnectionOrientedSocketIO
 {
 public:
   class ConnectTask;
   class DelayedConnectTask;
-  class ReceiveRunnable;
+  class ReceiveTask;
 
-  StreamSocketIO(nsIThread* aConsumerThread,
-                 MessageLoop* mIOLoop,
+  StreamSocketIO(MessageLoop* aConsumerLoop,
+                 MessageLoop* aIOLoop,
                  StreamSocket* aStreamSocket,
                  UnixSocketConnector* aConnector);
-  StreamSocketIO(nsIThread* aConsumerThread,
-                 MessageLoop* mIOLoop, int aFd,
-                 ConnectionStatus aConnectionStatus,
+  StreamSocketIO(MessageLoop* aConsumerLoop,
+                 MessageLoop* aIOLoop,
+                 int aFd, ConnectionStatus aConnectionStatus,
                  StreamSocket* aStreamSocket,
                  UnixSocketConnector* aConnector);
   ~StreamSocketIO();
 
   StreamSocket* GetStreamSocket();
   DataSocket* GetDataSocket();
 
   // Delayed-task handling
@@ -86,34 +86,34 @@ private:
   CancelableTask* mDelayedConnectTask;
 
   /**
    * I/O buffer for received data
    */
   nsAutoPtr<UnixSocketRawData> mBuffer;
 };
 
-StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
+StreamSocketIO::StreamSocketIO(MessageLoop* aConsumerLoop,
                                MessageLoop* aIOLoop,
                                StreamSocket* aStreamSocket,
                                UnixSocketConnector* aConnector)
-  : ConnectionOrientedSocketIO(aConsumerThread, aIOLoop, aConnector)
+  : ConnectionOrientedSocketIO(aConsumerLoop, aIOLoop, aConnector)
   , mStreamSocket(aStreamSocket)
   , mShuttingDownOnIOThread(false)
   , mDelayedConnectTask(nullptr)
 {
   MOZ_ASSERT(mStreamSocket);
 }
 
-StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
+StreamSocketIO::StreamSocketIO(MessageLoop* aConsumerLoop,
                                MessageLoop* aIOLoop,
                                int aFd, ConnectionStatus aConnectionStatus,
                                StreamSocket* aStreamSocket,
                                UnixSocketConnector* aConnector)
-  : ConnectionOrientedSocketIO(aConsumerThread,
+  : ConnectionOrientedSocketIO(aConsumerLoop,
                                aIOLoop,
                                aFd,
                                aConnectionStatus,
                                aConnector)
   , mStreamSocket(aStreamSocket)
   , mShuttingDownOnIOThread(false)
   , mDelayedConnectTask(nullptr)
 {
@@ -178,57 +178,54 @@ StreamSocketIO::QueryReceiveBuffer(UnixS
     mBuffer = new UnixSocketRawData(MAX_READ_SIZE);
   }
   *aBuffer = mBuffer.get();
 
   return NS_OK;
 }
 
 /**
- * |ReceiveRunnable| transfers data received on the I/O thread
+ * |ReceiveTask| transfers data received on the I/O thread
  * to an instance of |StreamSocket| on the consumer thread.
  */
-class StreamSocketIO::ReceiveRunnable final
-  : public SocketIORunnable<StreamSocketIO>
+class StreamSocketIO::ReceiveTask final : public SocketTask<StreamSocketIO>
 {
 public:
-  ReceiveRunnable(StreamSocketIO* aIO, UnixSocketBuffer* aBuffer)
-    : SocketIORunnable<StreamSocketIO>(aIO)
+  ReceiveTask(StreamSocketIO* aIO, UnixSocketBuffer* aBuffer)
+    : SocketTask<StreamSocketIO>(aIO)
     , mBuffer(aBuffer)
   { }
 
-  NS_IMETHOD Run() override
+  void Run() override
   {
-    StreamSocketIO* io = SocketIORunnable<StreamSocketIO>::GetIO();
+    StreamSocketIO* io = SocketTask<StreamSocketIO>::GetIO();
 
     MOZ_ASSERT(io->IsConsumerThread());
 
     if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
       // Since we've already explicitly closed and the close
       // happened before this, this isn't really an error.
-      return NS_OK;
+      return;
     }
 
     StreamSocket* streamSocket = io->GetStreamSocket();
     MOZ_ASSERT(streamSocket);
 
     streamSocket->ReceiveSocketData(mBuffer);
-
-    return NS_OK;
   }
 
 private:
   nsAutoPtr<UnixSocketBuffer> mBuffer;
 };
 
 void
 StreamSocketIO::ConsumeBuffer()
 {
-  GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
-                                NS_DISPATCH_NORMAL);
+  GetConsumerThread()->PostTask(FROM_HERE,
+                                new ReceiveTask(this, mBuffer.forget()));
 }
 
 void
 StreamSocketIO::DiscardBuffer()
 {
   // Nothing to do.
 }
 
@@ -340,21 +337,21 @@ StreamSocket::~StreamSocket()
 void
 StreamSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
 {
   mConsumer->ReceiveSocketData(mIndex, aBuffer);
 }
 
 nsresult
 StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs,
-                      nsIThread* aConsumerThread, MessageLoop* aIOLoop)
+                      MessageLoop* aConsumerLoop, MessageLoop* aIOLoop)
 {
   MOZ_ASSERT(!mIO);
 
-  mIO = new StreamSocketIO(aConsumerThread, aIOLoop, this, aConnector);
+  mIO = new StreamSocketIO(aConsumerLoop, aIOLoop, this, aConnector);
   SetConnectionStatus(SOCKET_CONNECTING);
 
   if (aDelayMs > 0) {
     StreamSocketIO::DelayedConnectTask* connectTask =
       new StreamSocketIO::DelayedConnectTask(mIO);
     mIO->SetDelayedConnectTask(connectTask);
     MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs);
   } else {
@@ -362,39 +359,34 @@ StreamSocket::Connect(UnixSocketConnecto
   }
 
   return NS_OK;
 }
 
 nsresult
 StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs)
 {
-  nsIThread* consumerThread = nullptr;
-  nsresult rv = NS_GetCurrentThread(&consumerThread);
-  if (NS_FAILED(rv)) {
-    return rv;
-  }
-
-  return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop());
+  return Connect(aConnector, aDelayMs,
+                 MessageLoop::current(), XRE_GetIOMessageLoop());
 }
 
 // |ConnectionOrientedSocket|
 
 nsresult
 StreamSocket::PrepareAccept(UnixSocketConnector* aConnector,
-                            nsIThread* aConsumerThread,
+                            MessageLoop* aConsumerLoop,
                             MessageLoop* aIOLoop,
                             ConnectionOrientedSocketIO*& aIO)
 {
   MOZ_ASSERT(!mIO);
   MOZ_ASSERT(aConnector);
 
   SetConnectionStatus(SOCKET_CONNECTING);
 
-  mIO = new StreamSocketIO(aConsumerThread, aIOLoop,
+  mIO = new StreamSocketIO(aConsumerLoop, aIOLoop,
                            -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
                            this, aConnector);
   aIO = mIO;
 
   return NS_OK;
 }
 
 // |DataSocket|
--- a/ipc/unixsocket/StreamSocket.h
+++ b/ipc/unixsocket/StreamSocket.h
@@ -37,38 +37,38 @@ public:
   void ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer);
 
   /**
    * Starts a task on the socket that will try to connect to a socket in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @param aDelayMs Time delay in milliseconds.
-   * @param aConsumerThread The socket's consumer thread.
+   * @param aConsumerLoop The socket's consumer thread.
    * @param aIOLoop The socket's I/O thread.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs,
-                   nsIThread* aConsumerThread, MessageLoop* aIOLoop);
+                   MessageLoop* aConsumerLoop, MessageLoop* aIOLoop);
 
   /**
    * Starts a task on the socket that will try to connect to a socket in a
    * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @param aDelayMs Time delay in milliseconds.
    * @return NS_OK on success, or an XPCOM error code otherwise.
    */
   nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs = 0);
 
   // Methods for |ConnectionOrientedSocket|
   //
 
   nsresult PrepareAccept(UnixSocketConnector* aConnector,
-                         nsIThread* aConsumerThread,
+                         MessageLoop* aConsumerLoop,
                          MessageLoop* aIOLoop,
                          ConnectionOrientedSocketIO*& aIO) override;
 
   // Methods for |DataSocket|
   //
 
   void SendSocketData(UnixSocketIOBuffer* aBuffer) override;