Bug 836523 - Wait for incoming connections in UnixSocketImpl. r=qdot, r=echou
authorThomas Zimmermann <tdz@users.sourceforge.net>
Tue, 12 Feb 2013 09:16:45 -0500
changeset 131509 4251e6dd02180f830a61dab98eed7451d9cb4749
parent 131508 89edfdd1a350f85a3f1d2b427c8d6e735a4a95e7
child 131510 164c9a8f3711b395f380588cb8ffcd922e7da7a4
push id2323
push userbbajaj@mozilla.com
push dateMon, 01 Apr 2013 19:47:02 +0000
treeherdermozilla-beta@7712be144d91 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersqdot, echou
bugs836523
milestone21.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 836523 - Wait for incoming connections in UnixSocketImpl. r=qdot, r=echou The UnixSocketImpl currently polls the socket file descriptor while listening for incoming connections and schedules itself to run again if no connection requests have been received. This behavior interferes with closing the socket and deleting the socket structure in the main thread. It can happen that the I/O thread dispatches a SocketAcceptTask to poll the listening socket and the main thread dispatches a DeleteInstanceRunnable for the UnixSocketImpl, such that the delete operation gets dispatched before the poll operation. The latter then operates on the just deleted UnixSocketImpl. With this patch, the I/O thread watches the listing socket for incoming connection requests and only attempts to run accept when connection requests are pending. This allows to serialize polling and close operations within the I/O thread in a sound order. A side effect of this patch is that we don't constantly run code for polling the listing socket, which should result in less CPU overhead and save battery power.
ipc/unixsocket/UnixSocket.cpp
--- a/ipc/unixsocket/UnixSocket.cpp
+++ b/ipc/unixsocket/UnixSocket.cpp
@@ -486,38 +486,17 @@ UnixSocketImpl::Accept()
 #ifdef DEBUG
       LOG("...listen(%d) gave errno %d", mFd.get(), errno);
 #endif
       return;
     }
 
   }
 
-  int client_fd;
-  client_fd = accept(mFd.get(), &mAddr, &mAddrSize);
-  if (client_fd < 0) {
-    EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
-    return;
-  }
-
-  if (!mConnector->SetUp(client_fd)) {
-    NS_WARNING("Could not set up socket!");
-    return;
-  }
-  mFd.reset(client_fd);
-
-  nsRefPtr<OnSocketEventTask> t =
-    new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
-  NS_DispatchToMainThread(t);
-
-  // Due to the fact that we've dispatched our OnConnectSuccess message before
-  // starting reading, we're guaranteed that any subsequent read tasks will
-  // happen after the object has been notified of a successful connect.
-  XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
-                                   new StartImplReadingTask(this));
+  SetUpIO();
 }
 
 void
 UnixSocketImpl::Connect()
 {
   if(mFd.get() < 0)
   {
     mFd = mConnector->Create();
@@ -572,21 +551,16 @@ UnixSocketImpl::SetNonblockFlags()
     return false;
   }
 
   flags |= FD_CLOEXEC;
   if (-1 == fcntl(mFd, F_SETFD, flags)) {
     return false;
   }
 
-  // Select non-blocking IO.
-  if (-1 == fcntl(mFd, F_SETFL, O_NONBLOCK)) {
-    return false;
-  }
-
   return true;
 }
 
 UnixSocketConsumer::UnixSocketConsumer() : mImpl(nullptr)
                                          , mConnectionStatus(SOCKET_DISCONNECTED)
 {
 }
 
@@ -651,60 +625,92 @@ UnixSocketConsumer::CloseSocket()
   t.forget();
 
   NotifyDisconnect();
 }
 
 void
 UnixSocketImpl::OnFileCanReadWithoutBlocking(int aFd)
 {
-  // Keep reading data until either
-  //
-  //   - mIncoming is completely read
-  //     If so, sConsumer->MessageReceived(mIncoming.forget())
-  //
-  //   - mIncoming isn't completely read, but there's no more
-  //     data available on the socket
-  //     If so, break;
-  while (true) {
-    if (!mIncoming) {
-      uint8_t data[MAX_READ_SIZE];
-      ssize_t ret = read(aFd, data, MAX_READ_SIZE);
-      if (ret < 0) {
-        if (ret == -1) {
-          if (errno == EINTR) {
-            continue; // retry system call when interrupted
-          }
-          else if (errno == EAGAIN || errno == EWOULDBLOCK) {
-            return; // no data available: return and re-poll
+  enum SocketConnectionStatus status = mConsumer->GetConnectionStatus();
+
+  if (status == SOCKET_CONNECTED) {
+
+    // Keep reading data until either
+    //
+    //   - mIncoming is completely read
+    //     If so, sConsumer->MessageReceived(mIncoming.forget())
+    //
+    //   - mIncoming isn't completely read, but there's no more
+    //     data available on the socket
+    //     If so, break;
+    while (true) {
+      if (!mIncoming) {
+        uint8_t data[MAX_READ_SIZE];
+        ssize_t ret = read(aFd, data, MAX_READ_SIZE);
+        if (ret < 0) {
+          if (ret == -1) {
+            if (errno == EINTR) {
+              continue; // retry system call when interrupted
+            }
+            else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+              return; // no data available: return and re-poll
+            }
+            // else fall through to error handling on other errno's
           }
-          // else fall through to error handling on other errno's
-        }
 #ifdef DEBUG
-        NS_WARNING("Cannot read from network");
+          NS_WARNING("Cannot read from network");
 #endif
-        // At this point, assume that we can't actually access
-        // the socket anymore
-        mReadWatcher.StopWatchingFileDescriptor();
-        mWriteWatcher.StopWatchingFileDescriptor();
-        nsRefPtr<SocketCloseTask> t = new SocketCloseTask(this);
-        NS_DispatchToMainThread(t);
-        return;
-      }
-      if (ret) {
-        mIncoming = new UnixSocketRawData(ret);
-        memcpy(mIncoming->mData, data, ret);
-        nsRefPtr<SocketReceiveTask> t =
-          new SocketReceiveTask(this, mIncoming.forget());
-        NS_DispatchToMainThread(t);
-      }
-      if (ret < ssize_t(MAX_READ_SIZE)) {
-        return;
+          // At this point, assume that we can't actually access
+          // the socket anymore
+          mReadWatcher.StopWatchingFileDescriptor();
+          mWriteWatcher.StopWatchingFileDescriptor();
+          nsRefPtr<SocketCloseTask> t = new SocketCloseTask(this);
+          NS_DispatchToMainThread(t);
+          return;
+        }
+        if (ret) {
+          mIncoming = new UnixSocketRawData(ret);
+          memcpy(mIncoming->mData, data, ret);
+          nsRefPtr<SocketReceiveTask> t =
+            new SocketReceiveTask(this, mIncoming.forget());
+          NS_DispatchToMainThread(t);
+        }
+        if (ret < ssize_t(MAX_READ_SIZE)) {
+          return;
+        }
       }
     }
+  } else if (status == SOCKET_LISTENING) {
+
+    int client_fd = accept(mFd.get(), &mAddr, &mAddrSize);
+
+    if (client_fd < 0) {
+      return;
+    }
+
+    if (!mConnector->SetUp(client_fd)) {
+      NS_WARNING("Could not set up socket!");
+      return;
+    }
+
+    mReadWatcher.StopWatchingFileDescriptor();
+    mWriteWatcher.StopWatchingFileDescriptor();
+
+    mFd.reset(client_fd);
+
+    nsRefPtr<OnSocketEventTask> t =
+      new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
+    NS_DispatchToMainThread(t);
+
+    // Due to the fact that we've dispatched our OnConnectSuccess message before
+    // starting reading, we're guaranteed that any subsequent read tasks will
+    // happen after the object has been notified of a successful connect.
+    XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
+                                     new StartImplReadingTask(this));
   }
 }
 
 void
 UnixSocketImpl::OnFileCanWriteWithoutBlocking(int aFd)
 {
   // Try to write the bytes of mCurrentRilRawData.  If all were written, continue.
   //
@@ -793,39 +799,39 @@ UnixSocketConsumer::ConnectSocket(UnixSo
   if (mImpl) {
     NS_WARNING("Socket already connecting/connected!");
     return false;
   }
   nsCString addr;
   addr.Assign(aAddress);
   mImpl = new UnixSocketImpl(this, aConnector, addr);
   MessageLoop* ioLoop = XRE_GetIOMessageLoop();
+  mConnectionStatus = SOCKET_CONNECTING;
   if (aDelayMs > 0) {
     ioLoop->PostDelayedTask(FROM_HERE, new SocketConnectTask(mImpl), aDelayMs);
   } else {
     ioLoop->PostTask(FROM_HERE, new SocketConnectTask(mImpl));
   }
-  mConnectionStatus = SOCKET_CONNECTING;
   return true;
 }
 
 bool
 UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector)
 {
   MOZ_ASSERT(aConnector);
   MOZ_ASSERT(NS_IsMainThread());
   if (mImpl) {
     NS_WARNING("Socket already connecting/connected!");
     return false;
   }
   nsCString addr;
   mImpl = new UnixSocketImpl(this, aConnector, addr);
+  mConnectionStatus = SOCKET_LISTENING;
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
                                    new SocketAcceptTask(mImpl));
-  mConnectionStatus = SOCKET_LISTENING;
   return true;
 }
 
 void
 UnixSocketConsumer::CancelSocketTask()
 {
   mConnectionStatus = SOCKET_DISCONNECTED;
   if(!mImpl) {