Bug 974410: Inherit UnixSocketImpl from UnixSocketWatcher, r=kyle
authorThomas Zimmermann <tdz@users.sourceforge.net>
Wed, 26 Feb 2014 17:52:09 +0100
changeset 188965 89c5263f9485a1aa2ff3743c671468ee8217b701
parent 188964 85cddaeeb0ec5fdcf3e3d4a986396da5a2119592
child 188966 0572e3c063ed7cf86843427965bd2652baf10dfe
push id474
push userasasaki@mozilla.com
push dateMon, 02 Jun 2014 21:01:02 +0000
treeherdermozilla-release@967f4cf1b31c [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerskyle
bugs974410
milestone30.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 974410: Inherit UnixSocketImpl from UnixSocketWatcher, r=kyle The base class UnixSocketWatcher handles the connection state of the socket. UnixSocketImpl overrides UnixSocketWatcher's callback methods for implementing it's functionality.
ipc/unixsocket/UnixSocket.cpp
--- a/ipc/unixsocket/UnixSocket.cpp
+++ b/ipc/unixsocket/UnixSocket.cpp
@@ -34,43 +34,41 @@ static const size_t MAX_READ_SIZE = 1 <<
 #define CHROMIUM_LOG(args...) if (BTDEBUG) printf(args);
 #endif
 
 static const int SOCKET_RETRY_TIME_MS = 1000;
 
 namespace mozilla {
 namespace ipc {
 
-class UnixSocketImpl : public UnixFdWatcher
+class UnixSocketImpl : public UnixSocketWatcher
 {
 public:
   UnixSocketImpl(MessageLoop* mIOLoop,
                  UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector,
-                 const nsACString& aAddress,
-                 SocketConnectionStatus aConnectionStatus)
-    : UnixFdWatcher(mIOLoop)
+                 const nsACString& aAddress)
+    : UnixSocketWatcher(mIOLoop)
     , mConsumer(aConsumer)
     , mConnector(aConnector)
     , mShuttingDownOnIOThread(false)
     , mAddress(aAddress)
     , mDelayedConnectTask(nullptr)
-    , mConnectionStatus(aConnectionStatus)
   {
   }
 
   ~UnixSocketImpl()
   {
     MOZ_ASSERT(NS_IsMainThread());
     MOZ_ASSERT(IsShutdownOnMainThread());
   }
 
   void QueueWriteData(UnixSocketRawData* aData)
   {
     mOutgoingQ.AppendElement(aData);
-    OnFileCanWriteWithoutBlocking(GetFd());
+    AddWatchers(WRITE_WATCHER, false);
   }
 
   bool IsShutdownOnMainThread()
   {
     MOZ_ASSERT(NS_IsMainThread());
     return mConsumer == nullptr;
   }
 
@@ -95,16 +93,19 @@ public:
 
     mShuttingDownOnIOThread = true;
   }
 
   void SetUpIO()
   {
     MOZ_ASSERT(IsOpen());
     AddWatchers(READ_WATCHER, true);
+    if (!mOutgoingQ.IsEmpty()) {
+      AddWatchers(WRITE_WATCHER, false);
+    }
   }
 
   void SetDelayedConnectTask(CancelableTask* aTask)
   {
     MOZ_ASSERT(NS_IsMainThread());
     mDelayedConnectTask = aTask;
   }
 
@@ -130,26 +131,21 @@ public:
   void Connect();
 
   /**
    * Run bind/listen to prepare for further runs of accept()
    */
   void Listen();
 
   /**
-   * Accept an incoming connection
-   */
-  void Accept();
-
-  /**
    * Set up flags on whatever our current file descriptor is.
    *
    * @return true if successful, false otherwise
    */
-  bool SetSocketFlags();
+  bool SetSocketFlags(int aFd);
 
   void GetSocketAddr(nsAString& aAddrStr)
   {
     if (!mConnector) {
       NS_WARNING("No connector to get socket address from!");
       aAddrStr.Truncate();
       return;
     }
@@ -158,37 +154,28 @@ public:
 
   /**
    * Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
    * directly from main thread. All non-main-thread accesses should happen with
    * mImpl as container.
    */
   RefPtr<UnixSocketConsumer> mConsumer;
 
+  void OnAccepted(int aFd) MOZ_OVERRIDE;
+  void OnConnected() MOZ_OVERRIDE;
+  void OnError(const char* aFunction, int aErrno) MOZ_OVERRIDE;
+  void OnListening() MOZ_OVERRIDE;
+  void OnSocketCanReceiveWithoutBlocking() MOZ_OVERRIDE;
+  void OnSocketCanSendWithoutBlocking() MOZ_OVERRIDE;
+
 private:
 
   void FireSocketError();
 
   /**
-   * libevent triggered functions that reads data from socket when available and
-   * guarenteed non-blocking. Only to be called on IO thread.
-   *
-   * @param aFd File descriptor to read from
-   */
-  virtual void OnFileCanReadWithoutBlocking(int aFd);
-
-  /**
-   * libevent or developer triggered functions that writes data to socket when
-   * available and guarenteed non-blocking. Only to be called on IO thread.
-   *
-   * @param aFd File descriptor to read from
-   */
-  virtual void OnFileCanWriteWithoutBlocking(int aFd);
-
-  /**
    * Raw data queue. Must be pushed/popped from IO thread only.
    */
   typedef nsTArray<UnixSocketRawData* > UnixSocketRawDataQueue;
   UnixSocketRawDataQueue mOutgoingQ;
 
   /**
    * Connector object used to create the connection we are currently using.
    */
@@ -213,22 +200,16 @@ private:
    * Address struct of the socket currently in use
    */
   sockaddr_any mAddr;
 
   /**
    * Task member for delayed connect task. Should only be access on main thread.
    */
   CancelableTask* mDelayedConnectTask;
-
-  /**
-   * Socket connection status. Duplicate from UnixSocketConsumer. Should only
-   * be accessed on I/O thread.
-   */
-  SocketConnectionStatus mConnectionStatus;
 };
 
 template<class T>
 class DeleteInstanceRunnable : public nsRunnable
 {
 public:
   DeleteInstanceRunnable(T* aInstance)
   : mInstance(aInstance)
@@ -367,36 +348,37 @@ public:
     // upper layer
     mImpl->mConsumer->CloseSocket();
     return NS_OK;
   }
 private:
   UnixSocketImpl* mImpl;
 };
 
-class SocketAcceptTask : public CancelableTask {
+class SocketListenTask : public CancelableTask
+{
   virtual void Run();
 
   UnixSocketImpl* mImpl;
 public:
-  SocketAcceptTask(UnixSocketImpl* aImpl) : mImpl(aImpl) { }
+  SocketListenTask(UnixSocketImpl* aImpl) : mImpl(aImpl) { }
 
   virtual void Cancel()
   {
     MOZ_ASSERT(!NS_IsMainThread());
     mImpl = nullptr;
   }
 };
 
-void SocketAcceptTask::Run()
+void SocketListenTask::Run()
 {
   MOZ_ASSERT(!NS_IsMainThread());
 
   if (mImpl) {
-    mImpl->Accept();
+    mImpl->Listen();
   }
 }
 
 class SocketConnectTask : public Task {
   virtual void Run();
 
   UnixSocketImpl* mImpl;
 public:
@@ -442,46 +424,45 @@ public:
   ShutdownSocketTask(UnixSocketImpl* aImpl) : mImpl(aImpl) { }
 };
 
 void ShutdownSocketTask::Run()
 {
   MOZ_ASSERT(!NS_IsMainThread());
 
   // At this point, there should be no new events on the IO thread after this
-  // one with the possible exception of a SocketAcceptTask that
+  // one with the possible exception of a SocketListenTask that
   // ShutdownOnIOThread will cancel for us. We are now fully shut down, so we
   // can send a message to the main thread that will delete mImpl safely knowing
   // that no more tasks reference it.
   mImpl->ShutdownOnIOThread();
 
   nsRefPtr<nsIRunnable> t(new DeleteInstanceRunnable<UnixSocketImpl>(mImpl));
   nsresult rv = NS_DispatchToMainThread(t);
   NS_ENSURE_SUCCESS_VOID(rv);
 }
 
 void
 UnixSocketImpl::FireSocketError()
 {
-  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
 
   // Clean up watchers, statuses, fds
   Close();
-  mConnectionStatus = SOCKET_DISCONNECTED;
 
   // Tell the main thread we've errored
   nsRefPtr<OnSocketEventTask> t =
     new OnSocketEventTask(this, OnSocketEventTask::CONNECT_ERROR);
   NS_DispatchToMainThread(t);
 }
 
 void
-UnixSocketImpl::Accept()
+UnixSocketImpl::Listen()
 {
-  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   MOZ_ASSERT(mConnector);
 
   // This will set things we don't particularly care about, but it will hand
   // back the correct structure size which is what we do care about.
   if (!mConnector->CreateAddr(true, mAddrSize, mAddr, nullptr)) {
     NS_WARNING("Cannot create socket address!");
     FireSocketError();
     return;
@@ -491,151 +472,240 @@ UnixSocketImpl::Accept()
     int fd = mConnector->Create();
     if (fd < 0) {
       NS_WARNING("Cannot create socket fd!");
       FireSocketError();
       return;
     }
     SetFd(fd);
 
-    if (!SetSocketFlags()) {
+    if (!SetSocketFlags(GetFd())) {
       NS_WARNING("Cannot set socket flags!");
       FireSocketError();
       return;
     }
 
-    if (bind(GetFd(), (struct sockaddr*)&mAddr, mAddrSize)) {
-#ifdef DEBUG
-      CHROMIUM_LOG("...bind(%d) gave errno %d", GetFd(), errno);
-#endif
-      FireSocketError();
-      return;
-    }
-
-    if (listen(GetFd(), 1)) {
-#ifdef DEBUG
-      CHROMIUM_LOG("...listen(%d) gave errno %d", GetFd(), errno);
-#endif
-      FireSocketError();
-      return;
-    }
-
-    if (!mConnector->SetUpListenSocket(GetFd())) {
-      NS_WARNING("Could not set up listen socket!");
-      FireSocketError();
-      return;
-    }
-
+    // calls OnListening on success, or OnError otherwise
+    nsresult rv = UnixSocketWatcher::Listen(
+      reinterpret_cast<struct sockaddr*>(&mAddr), mAddrSize);
+    NS_WARN_IF(NS_FAILED(rv));
   }
-
-  SetUpIO();
 }
 
 void
 UnixSocketImpl::Connect()
 {
-  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   MOZ_ASSERT(mConnector);
 
   if (!IsOpen()) {
     int fd = mConnector->Create();
     if (fd < 0) {
       NS_WARNING("Cannot create socket fd!");
       FireSocketError();
       return;
     }
     SetFd(fd);
   }
 
-  int ret;
-
   if (!mConnector->CreateAddr(false, mAddrSize, mAddr, mAddress.get())) {
     NS_WARNING("Cannot create socket address!");
     FireSocketError();
     return;
   }
 
-  // Select non-blocking IO.
-  if (-1 == fcntl(GetFd(), F_SETFL, O_NONBLOCK)) {
-    NS_WARNING("Cannot set nonblock!");
-    FireSocketError();
+  // calls OnConnected() on success, or OnError() otherwise
+  nsresult rv = UnixSocketWatcher::Connect(
+    reinterpret_cast<struct sockaddr*>(&mAddr), mAddrSize);
+  NS_WARN_IF(NS_FAILED(rv));
+}
+
+bool
+UnixSocketImpl::SetSocketFlags(int aFd)
+{
+  // Set socket addr to be reused even if kernel is still waiting to close
+  int n = 1;
+  setsockopt(aFd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n));
+
+  // Set close-on-exec bit.
+  int flags = fcntl(aFd, F_GETFD);
+  if (-1 == flags) {
+    return false;
+  }
+
+  flags |= FD_CLOEXEC;
+  if (-1 == fcntl(aFd, F_SETFD, flags)) {
+    return false;
+  }
+
+  return true;
+}
+
+void
+UnixSocketImpl::OnAccepted(int aFd)
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+  MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
+
+  if (!mConnector->SetUp(aFd)) {
+    NS_WARNING("Could not set up socket!");
     return;
   }
 
-  ret = connect(GetFd(), (struct sockaddr*)&mAddr, mAddrSize);
-
-  if (ret) {
-    if (errno == EINPROGRESS) {
-      // Select blocking IO again, since we've now at least queue'd the connect
-      // as nonblock.
-      int current_opts = fcntl(GetFd(), F_GETFL, 0);
-      if (-1 == current_opts) {
-        NS_WARNING("Cannot get socket opts!");
-        FireSocketError();
-        return;
-      }
-      if (-1 == fcntl(GetFd(), F_SETFL, current_opts & ~O_NONBLOCK)) {
-        NS_WARNING("Cannot set socket opts to blocking!");
-        FireSocketError();
-        return;
-      }
-
-      AddWatchers(WRITE_WATCHER, false);
-
-#ifdef DEBUG
-      CHROMIUM_LOG("UnixSocket Connection delayed!");
-#endif
-      return;
-    }
-#if DEBUG
-    CHROMIUM_LOG("Socket connect errno=%d\n", errno);
-#endif
-    FireSocketError();
+  RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
+  Close();
+  SetSocket(aFd, SOCKET_IS_CONNECTED);
+  if (!SetSocketFlags(GetFd())) {
     return;
   }
 
-  if (!SetSocketFlags()) {
+  nsRefPtr<OnSocketEventTask> t =
+    new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
+  NS_DispatchToMainThread(t);
+
+  SetUpIO();
+}
+
+void
+UnixSocketImpl::OnConnected()
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+  MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
+
+  if (!SetSocketFlags(GetFd())) {
     NS_WARNING("Cannot set socket flags!");
     FireSocketError();
     return;
   }
 
   if (!mConnector->SetUp(GetFd())) {
     NS_WARNING("Could not set up socket!");
     FireSocketError();
     return;
   }
 
   nsRefPtr<OnSocketEventTask> t =
     new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
   NS_DispatchToMainThread(t);
-  mConnectionStatus = SOCKET_CONNECTED;
+
+  SetUpIO();
+}
+
+void
+UnixSocketImpl::OnListening()
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+  MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
+
+  if (!mConnector->SetUpListenSocket(GetFd())) {
+    NS_WARNING("Could not set up listen socket!");
+    FireSocketError();
+    return;
+  }
 
   SetUpIO();
 }
 
-bool
-UnixSocketImpl::SetSocketFlags()
+void
+UnixSocketImpl::OnError(const char* aFunction, int aErrno)
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+
+  UnixFdWatcher::OnError(aFunction, aErrno);
+  FireSocketError();
+}
+
+void
+UnixSocketImpl::OnSocketCanReceiveWithoutBlocking()
 {
-  // Set socket addr to be reused even if kernel is still waiting to close
-  int n = 1;
-  setsockopt(GetFd(), SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n));
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+  MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
+
+  // Read all of the incoming data.
+  while (true) {
+    nsAutoPtr<UnixSocketRawData> incoming(new UnixSocketRawData(MAX_READ_SIZE));
+
+    ssize_t ret = read(GetFd(), incoming->mData, incoming->mSize);
+    if (ret <= 0) {
+      if (ret == -1) {
+        if (errno == EINTR) {
+          continue; // retry system call when interrupted
+        }
+        if (errno == EAGAIN || errno == EWOULDBLOCK) {
+          return; // no data available: return and re-poll
+        }
+
+#ifdef DEBUG
+        NS_WARNING("Cannot read from network");
+#endif
+        // else fall through to error handling on other errno's
+      }
+
+      // We're done with our descriptors. Ensure that spurious events don't
+      // cause us to end up back here.
+      RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
+      nsRefPtr<RequestClosingSocketTask> t = new RequestClosingSocketTask(this);
+      NS_DispatchToMainThread(t);
+      return;
+    }
+
+    incoming->mSize = ret;
+    nsRefPtr<SocketReceiveTask> t =
+      new SocketReceiveTask(this, incoming.forget());
+    NS_DispatchToMainThread(t);
 
-  // Set close-on-exec bit.
-  int flags = fcntl(GetFd(), F_GETFD);
-  if (-1 == flags) {
-    return false;
+    // If ret is less than MAX_READ_SIZE, there's no
+    // more data in the socket for us to read now.
+    if (ret < ssize_t(MAX_READ_SIZE)) {
+      return;
+    }
   }
+}
+
+void
+UnixSocketImpl::OnSocketCanSendWithoutBlocking()
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+  MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
 
-  flags |= FD_CLOEXEC;
-  if (-1 == fcntl(GetFd(), F_SETFD, flags)) {
-    return false;
+  // Try to write the bytes of mCurrentRilRawData.  If all were written, continue.
+  //
+  // Otherwise, save the byte position of the next byte to write
+  // within mCurrentWriteOffset, and request another write when the
+  // system won't block.
+  //
+  while (true) {
+    UnixSocketRawData* data;
+    if (mOutgoingQ.IsEmpty()) {
+      return;
+    }
+    data = mOutgoingQ.ElementAt(0);
+    const uint8_t *toWrite;
+    toWrite = data->mData;
+
+    while (data->mCurrentWriteOffset < data->mSize) {
+      ssize_t write_amount = data->mSize - data->mCurrentWriteOffset;
+      ssize_t written;
+      written = write (GetFd(), toWrite + data->mCurrentWriteOffset,
+                         write_amount);
+      if (written > 0) {
+        data->mCurrentWriteOffset += written;
+      }
+      if (written != write_amount) {
+        break;
+      }
+    }
+
+    if (data->mCurrentWriteOffset != data->mSize) {
+      AddWatchers(WRITE_WATCHER, false);
+      return;
+    }
+    mOutgoingQ.RemoveElementAt(0);
+    delete data;
   }
-
-  return true;
 }
 
 UnixSocketConsumer::UnixSocketConsumer() : mImpl(nullptr)
                                          , mConnectionStatus(SOCKET_DISCONNECTED)
                                          , mConnectTimestamp(0)
                                          , mConnectDelayMs(0)
 {
 }
@@ -696,167 +766,16 @@ UnixSocketConsumer::CloseSocket()
                                    new ShutdownSocketTask(mImpl));
 
   mImpl = nullptr;
 
   NotifyDisconnect();
 }
 
 void
-UnixSocketImpl::OnFileCanReadWithoutBlocking(int aFd)
-{
-  MOZ_ASSERT(!NS_IsMainThread());
-  MOZ_ASSERT(!mShuttingDownOnIOThread);
-
-  if (mConnectionStatus == SOCKET_CONNECTED) {
-    // Read all of the incoming data.
-    while (true) {
-      nsAutoPtr<UnixSocketRawData> incoming(new UnixSocketRawData(MAX_READ_SIZE));
-
-      ssize_t ret = read(aFd, incoming->mData, incoming->mSize);
-      if (ret <= 0) {
-        if (ret == -1) {
-          if (errno == EINTR) {
-            continue; // retry system call when interrupted
-          }
-          if (errno == EAGAIN || errno == EWOULDBLOCK) {
-            return; // no data available: return and re-poll
-          }
-
-#ifdef DEBUG
-          NS_WARNING("Cannot read from network");
-#endif
-          // else fall through to error handling on other errno's
-        }
-
-        // We're done with our descriptors. Ensure that spurious events don't
-        // cause us to end up back here.
-        RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
-        nsRefPtr<RequestClosingSocketTask> t = new RequestClosingSocketTask(this);
-        NS_DispatchToMainThread(t);
-        return;
-      }
-
-      incoming->mSize = ret;
-      nsRefPtr<SocketReceiveTask> t =
-        new SocketReceiveTask(this, incoming.forget());
-      NS_DispatchToMainThread(t);
-
-      // If ret is less than MAX_READ_SIZE, there's no
-      // more data in the socket for us to read now.
-      if (ret < ssize_t(MAX_READ_SIZE)) {
-        return;
-      }
-    }
-
-    MOZ_CRASH("We returned early");
-  } else if (mConnectionStatus == SOCKET_LISTENING) {
-    int client_fd = accept(GetFd(), (struct sockaddr*)&mAddr, &mAddrSize);
-
-    if (client_fd < 0) {
-      return;
-    }
-
-    if (!mConnector->SetUp(client_fd)) {
-      NS_WARNING("Could not set up socket!");
-      return;
-    }
-
-    RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
-    Close();
-    SetFd(client_fd);
-    if (!SetSocketFlags()) {
-      return;
-    }
-
-    nsRefPtr<OnSocketEventTask> t =
-      new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
-    NS_DispatchToMainThread(t);
-    mConnectionStatus = SOCKET_CONNECTED;
-
-    SetUpIO();
-  }
-}
-
-void
-UnixSocketImpl::OnFileCanWriteWithoutBlocking(int aFd)
-{
-  MOZ_ASSERT(!NS_IsMainThread());
-  MOZ_ASSERT(!mShuttingDownOnIOThread);
-  MOZ_ASSERT(aFd >= 0);
-
-  if (mConnectionStatus == SOCKET_CONNECTED) {
-    // Try to write the bytes of mCurrentRilRawData.  If all were written, continue.
-    //
-    // Otherwise, save the byte position of the next byte to write
-    // within mCurrentWriteOffset, and request another write when the
-    // system won't block.
-    //
-    while (true) {
-      UnixSocketRawData* data;
-      if (mOutgoingQ.IsEmpty()) {
-        return;
-      }
-      data = mOutgoingQ.ElementAt(0);
-      const uint8_t *toWrite;
-      toWrite = data->mData;
-
-      while (data->mCurrentWriteOffset < data->mSize) {
-        ssize_t write_amount = data->mSize - data->mCurrentWriteOffset;
-        ssize_t written;
-        written = write (aFd, toWrite + data->mCurrentWriteOffset,
-                         write_amount);
-        if (written > 0) {
-          data->mCurrentWriteOffset += written;
-        }
-        if (written != write_amount) {
-          break;
-        }
-      }
-
-      if (data->mCurrentWriteOffset != data->mSize) {
-        AddWatchers(WRITE_WATCHER, false);
-        return;
-      }
-      mOutgoingQ.RemoveElementAt(0);
-      delete data;
-    }
-  } else if (mConnectionStatus == SOCKET_CONNECTING) {
-    int error, ret;
-    socklen_t len = sizeof(error);
-    ret = getsockopt(GetFd(), SOL_SOCKET, SO_ERROR, &error, &len);
-
-    if (ret || error) {
-      NS_WARNING("getsockopt failure on async socket connect!");
-      FireSocketError();
-      return;
-    }
-
-    if (!SetSocketFlags()) {
-      NS_WARNING("Cannot set socket flags!");
-      FireSocketError();
-      return;
-    }
-
-    if (!mConnector->SetUp(GetFd())) {
-      NS_WARNING("Could not set up socket!");
-      FireSocketError();
-      return;
-    }
-
-    nsRefPtr<OnSocketEventTask> t =
-      new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
-    NS_DispatchToMainThread(t);
-    mConnectionStatus = SOCKET_CONNECTED;
-
-    SetUpIO();
-  }
-}
-
-void
 UnixSocketConsumer::GetSocketAddr(nsAString& aAddrStr)
 {
   aAddrStr.Truncate();
   if (!mImpl || mConnectionStatus != SOCKET_CONNECTED) {
     NS_WARNING("No socket currently open!");
     return;
   }
   mImpl->GetSocketAddr(aAddrStr);
@@ -901,17 +820,17 @@ UnixSocketConsumer::ConnectSocket(UnixSo
 
   if (mImpl) {
     NS_WARNING("Socket already connecting/connected!");
     return false;
   }
 
   nsCString addr(aAddress);
   MessageLoop* ioLoop = XRE_GetIOMessageLoop();
-  mImpl = new UnixSocketImpl(ioLoop, this, connector.forget(), addr, SOCKET_CONNECTING);
+  mImpl = new UnixSocketImpl(ioLoop, this, connector.forget(), addr);
   mConnectionStatus = SOCKET_CONNECTING;
   if (aDelayMs > 0) {
     SocketDelayedConnectTask* connectTask = new SocketDelayedConnectTask(mImpl);
     mImpl->SetDelayedConnectTask(connectTask);
     MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs);
   } else {
     ioLoop->PostTask(FROM_HERE, new SocketConnectTask(mImpl));
   }
@@ -927,20 +846,20 @@ UnixSocketConsumer::ListenSocket(UnixSoc
   nsAutoPtr<UnixSocketConnector> connector(aConnector);
 
   if (mImpl) {
     NS_WARNING("Socket already connecting/connected!");
     return false;
   }
 
   mImpl = new UnixSocketImpl(XRE_GetIOMessageLoop(), this, connector.forget(),
-                             EmptyCString(), SOCKET_LISTENING);
+                             EmptyCString());
   mConnectionStatus = SOCKET_LISTENING;
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
-                                   new SocketAcceptTask(mImpl));
+                                   new SocketListenTask(mImpl));
   return true;
 }
 
 uint32_t
 UnixSocketConsumer::CalculateConnectDelayMs() const
 {
   MOZ_ASSERT(NS_IsMainThread());