Bug 796176 - Patch 1: UnixSocket changes to get connect/listen running main thread, connect status to consumers; r=cjones
authorKyle Machulis <kyle@nonpolynomial.com>
Wed, 10 Oct 2012 22:48:40 -0700
changeset 109923 5521c75effb509656d0f3278aafd6c5062ec89c6
parent 109922 7065cefc10dddbdc5378142658d36752dd7611a5
child 109924 588d77a5701374847597b81faa07f8ad0a5c9892
push id23658
push useremorley@mozilla.com
push dateThu, 11 Oct 2012 19:03:51 +0000
treeherdermozilla-central@fb5be9cb0d4b [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerscjones
bugs796176
milestone19.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 796176 - Patch 1: UnixSocket changes to get connect/listen running main thread, connect status to consumers; r=cjones
ipc/unixsocket/UnixSocket.cpp
ipc/unixsocket/UnixSocket.h
--- a/ipc/unixsocket/UnixSocket.cpp
+++ b/ipc/unixsocket/UnixSocket.cpp
@@ -49,16 +49,17 @@ public:
     , mConnector(aConnector)
     , mCurrentTaskIsCanceled(false)
     , mAddress(aAddress)
   {
   }
 
   ~UnixSocketImpl()
   {
+    StopTask();
     mReadWatcher.StopWatchingFileDescriptor();
     mWriteWatcher.StopWatchingFileDescriptor();
   }
 
   void QueueWriteData(UnixSocketRawData* aData)
   {
     mOutgoingQ.AppendElement(aData);
     OnFileCanWriteWithoutBlocking(mFd);
@@ -111,22 +112,16 @@ public:
     mIOLoop = MessageLoopForIO::current();
     mIOLoop->WatchFileDescriptor(mFd,
                                  true,
                                  MessageLoopForIO::WATCH_READ,
                                  &mReadWatcher,
                                  this);
   }
 
-  void PrepareRemoval()
-  {
-    mTask = nullptr;
-    mCurrentTaskIsCanceled = true;
-  }
-
   /** 
    * Connect to a socket
    */
   void Connect();
 
   /** 
    * Run bind/listen to prepare for further runs of accept()
    */
@@ -135,17 +130,24 @@ public:
   /** 
    * Accept an incoming connection
    */
   void Accept();
 
   /** 
    * Stop whatever connect/accept task is running
    */
-  void Stop();
+  void StopTask()
+  {
+    if (mTask) {
+      mTask->Cancel();
+      mTask = nullptr;
+    }
+    mCurrentTaskIsCanceled = true;
+  }
 
   /** 
    * Set up nonblocking flags on whatever our current file descriptor is.
    *
    * @return true if successful, false otherwise
    */
   bool SetNonblockFlags();
 
@@ -229,16 +231,53 @@ private:
 
 static void
 DestroyImpl(UnixSocketImpl* impl)
 {
   MOZ_ASSERT(impl);
   delete impl;
 }
 
+class OnSocketEventTask : public nsRunnable
+{
+public:
+  enum SocketEvent {
+    CONNECT_SUCCESS,
+    CONNECT_ERROR,
+    DISCONNECT
+  };
+
+  OnSocketEventTask(UnixSocketImpl* aImpl, SocketEvent e) :
+    mImpl(aImpl),
+    mEvent(e)
+  {
+    MOZ_ASSERT(aImpl);
+  }
+
+  NS_IMETHOD Run()
+  {
+    MOZ_ASSERT(NS_IsMainThread());
+    if (!mImpl->mConsumer) {
+      NS_WARNING("CloseSocket has already been called! (mConsumer is null)");
+      // Since we've already explicitly closed and the close happened before
+      // this, this isn't really an error. Since we've warned, return OK.
+      return NS_OK;
+    }
+    if (mEvent == CONNECT_SUCCESS) {
+      mImpl->mConsumer->NotifySuccess();
+    } else if (mEvent == CONNECT_ERROR) {
+      mImpl->mConsumer->NotifyError();
+    }
+    return NS_OK;
+  }
+private:
+  UnixSocketImpl* mImpl;
+  SocketEvent mEvent;
+};
+
 class SocketReceiveTask : public nsRunnable
 {
 public:
   SocketReceiveTask(UnixSocketImpl* aImpl, UnixSocketRawData* aData) :
     mImpl(aImpl),
     mRawData(aData)
   {
     MOZ_ASSERT(aImpl);
@@ -270,17 +309,18 @@ public:
       mImpl(aImpl),
       mData(aData)
   {
     MOZ_ASSERT(aConsumer);
     MOZ_ASSERT(aImpl);
     MOZ_ASSERT(aData);
   }
 
-  void Run()
+  void
+  Run()
   {
     mImpl->QueueWriteData(mData);
   }
 
 private:
   nsRefPtr<UnixSocketConsumer> mConsumer;
   UnixSocketImpl* mImpl;
   UnixSocketRawData* mData;
@@ -289,17 +329,18 @@ private:
 class StartImplReadingTask : public Task
 {
 public:
   StartImplReadingTask(UnixSocketImpl* aImpl)
     : mImpl(aImpl)
   {
   }
 
-  void Run()
+  void
+  Run()
   {
     mImpl->SetUpIO();
   }
 private:
   UnixSocketImpl* mImpl;
 };
 
 class SocketAcceptTask : public CancelableTask {
@@ -339,16 +380,21 @@ void SocketConnectTask::Run() {
 }
 
 void
 UnixSocketImpl::Accept()
 {
   socklen_t addr_sz;
   struct sockaddr addr;
 
+  if (!mConnector) {
+    NS_WARNING("No connector object available!");
+    return;
+  }
+
   // This will set things we don't particularly care about, but it will hand
   // back the correct structure size which is what we do care about.
   mConnector->CreateAddr(true, addr_sz, &addr, nullptr);
 
   if(mFd.get() < 0)
   {
     mFd = mConnector->Create();
     if (mFd.get() < 0) {
@@ -373,34 +419,33 @@ UnixSocketImpl::Accept()
       return;
     }
 
   }
 
   int client_fd;
   client_fd = accept(mFd.get(), &addr, &addr_sz);
   if (client_fd < 0) {
-#if DEBUG
-    LOG("Socket accept errno=%d\n", errno);
-#endif
     EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
     return;
   }
 
-  if(client_fd < 0)
-  {
-    EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
-    return;
-  }
-
-  if (!mConnector->Setup(client_fd)) {
+  if (!mConnector->SetUp(client_fd)) {
     NS_WARNING("Could not set up socket!");
     return;
   }
   mFd.reset(client_fd);
+
+  nsRefPtr<OnSocketEventTask> t =
+    new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
+  NS_DispatchToMainThread(t);
+
+  // Due to the fact that we've dispatched our OnConnectSuccess message before
+  // starting reading, we're guaranteed that any subsequent read tasks will
+  // happen after the object has been notified of a successful connect.
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
                                    new StartImplReadingTask(this));
 }
 
 void
 UnixSocketImpl::Connect()
 {
   if(mFd.get() < 0)
@@ -419,24 +464,34 @@ UnixSocketImpl::Connect()
 
   ret = connect(mFd.get(), &addr, addr_sz);
 
   if (ret) {
 #if DEBUG
     LOG("Socket connect errno=%d\n", errno);
 #endif
     mFd.reset(-1);
+    nsRefPtr<OnSocketEventTask> t =
+      new OnSocketEventTask(this, OnSocketEventTask::CONNECT_ERROR);
+    NS_DispatchToMainThread(t);
     return;
   }
 
-  if (!mConnector->Setup(mFd)) {
+  if (!mConnector->SetUp(mFd)) {
     NS_WARNING("Could not set up socket!");
     return;
   }
 
+  nsRefPtr<OnSocketEventTask> t =
+    new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
+  NS_DispatchToMainThread(t);
+
+  // Due to the fact that we've dispatched our OnConnectSuccess message before
+  // starting reading, we're guaranteed that any subsequent read tasks will
+  // happen after the object has been notified of a successful connect.
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
                                    new StartImplReadingTask(this));
 }
 
 bool
 UnixSocketImpl::SetNonblockFlags()
 {
   // Set socket addr to be reused even if kernel is still waiting to close
@@ -457,34 +512,41 @@ UnixSocketImpl::SetNonblockFlags()
   // Select non-blocking IO.
   if (-1 == fcntl(mFd, F_SETFL, O_NONBLOCK)) {
     return false;
   }
 
   return true;
 }
 
+UnixSocketConsumer::UnixSocketConsumer() : mImpl(nullptr)
+                                         , mConnectionStatus(SOCKET_DISCONNECTED)
+{
+}
+
 UnixSocketConsumer::~UnixSocketConsumer()
 {
 }
 
 bool
 UnixSocketConsumer::SendSocketData(UnixSocketRawData* aData)
 {
+  MOZ_ASSERT(NS_IsMainThread());
   if (!mImpl) {
     return false;
   }
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
                                    new SocketSendTask(this, mImpl, aData));
   return true;
 }
 
 bool
 UnixSocketConsumer::SendSocketData(const nsACString& aStr)
 {
+  MOZ_ASSERT(NS_IsMainThread());
   if (!mImpl) {
     return false;
   }
   if (aStr.Length() > UnixSocketRawData::MAX_DATA_SIZE) {
     return false;
   }
   nsCString str(aStr);
   UnixSocketRawData* d = new UnixSocketRawData(aStr.Length());
@@ -496,18 +558,19 @@ UnixSocketConsumer::SendSocketData(const
 
 void
 UnixSocketConsumer::CloseSocket()
 {
   if (!mImpl) {
     return;
   }
   UnixSocketImpl* impl = mImpl;
-  mImpl->mConsumer.forget();
   mImpl = nullptr;
+  impl->mConsumer.forget();
+  impl->StopTask();
   // To make sure the owner doesn't die on the IOThread, remove pointer here
   // Line it up to be destructed on the IO Thread
   // Kill our pointer to it
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
                                    NewRunnableFunction(DestroyImpl,
                                                        impl));
 }
 
@@ -533,20 +596,17 @@ UnixSocketImpl::OnFileCanReadWithoutBloc
           }
           else if (errno == EAGAIN || errno == EWOULDBLOCK) {
             mIncoming.forget();
             return; // no data available: return and re-poll
           }
           // else fall through to error handling on other errno's
         }
 #ifdef DEBUG
-        nsAutoString str;
-        str.AssignLiteral("Cannot read from network, error ");
-        str += (int)ret;
-        NS_WARNING(NS_ConvertUTF16toUTF8(str).get());
+        NS_WARNING("Cannot read from network");
 #endif
         // At this point, assume that we can't actually access
         // the socket anymore
         mIncoming.forget();
         mReadWatcher.StopWatchingFileDescriptor();
         mWriteWatcher.StopWatchingFileDescriptor();
         mConsumer->CloseSocket();
         return;
@@ -603,53 +663,71 @@ UnixSocketImpl::OnFileCanWriteWithoutBlo
         this);
       return;
     }
     mOutgoingQ.RemoveElementAt(0);
     delete data;
   }
 }
 
+void
+UnixSocketConsumer::NotifySuccess()
+{
+  MOZ_ASSERT(NS_IsMainThread());
+  mConnectionStatus = SOCKET_CONNECTED;
+  OnConnectSuccess();
+}
+
+void
+UnixSocketConsumer::NotifyError()
+{
+  MOZ_ASSERT(NS_IsMainThread());
+  mConnectionStatus = SOCKET_DISCONNECTED;
+  OnConnectError();
+}
 bool
 UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector,
                                   const char* aAddress)
 {
-  MOZ_ASSERT(!NS_IsMainThread());
   MOZ_ASSERT(aConnector);
+  MOZ_ASSERT(NS_IsMainThread());
   if (mImpl) {
     NS_WARNING("Socket already connecting/connected!");
     return false;
   }
   nsCString addr;
   addr.Assign(aAddress);
   mImpl = new UnixSocketImpl(this, aConnector, addr);
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
                                    new SocketConnectTask(mImpl));
+  mConnectionStatus = SOCKET_CONNECTING;
   return true;
 }
 
 bool
 UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector)
 {
-  MOZ_ASSERT(!NS_IsMainThread());
   MOZ_ASSERT(aConnector);
+  MOZ_ASSERT(NS_IsMainThread());
   if (mImpl) {
     NS_WARNING("Socket already connecting/connected!");
     return false;
   }
   nsCString addr;
   mImpl = new UnixSocketImpl(this, aConnector, addr);
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
                                    new SocketAcceptTask(mImpl));
+  mConnectionStatus = SOCKET_CONNECTING;
   return true;
 }
 
 void
 UnixSocketConsumer::CancelSocketTask()
 {
+  mConnectionStatus = SOCKET_DISCONNECTED;
   if(!mImpl) {
     NS_WARNING("No socket implementation to cancel task on!");
     return;
   }
   mImpl->CancelTask();
 }
 
 } // namespace ipc
--- a/ipc/unixsocket/UnixSocket.h
+++ b/ipc/unixsocket/UnixSocket.h
@@ -95,28 +95,37 @@ public:
 
   /** 
    * Does any socket type specific setup that may be needed
    *
    * @param aFd File descriptor for opened socket
    *
    * @return true is successful, false otherwise
    */
-  virtual bool Setup(int aFd) = 0;  
+  virtual bool SetUp(int aFd) = 0;
+};
+
+enum SocketConnectionStatus {
+  SOCKET_DISCONNECTED = 0,
+  SOCKET_CONNECTING = 1,
+  SOCKET_CONNECTED = 2
 };
 
 class UnixSocketConsumer : public RefCounted<UnixSocketConsumer>
 {
 public:
-  UnixSocketConsumer()
-    : mImpl(nullptr)
-  {}
+  UnixSocketConsumer();
 
   virtual ~UnixSocketConsumer();
 
+  SocketConnectionStatus GetConnectionStatus()
+  {
+    return mConnectionStatus;
+  }
+
   /**
    * Function to be called whenever data is received. This is only called on the
    * main thread.
    *
    * @param aMessage Data received from the socket.
    */
   virtual void ReceiveSocketData(UnixSocketRawData* aMessage) = 0;
 
@@ -167,16 +176,40 @@ public:
    * from main thread.
    */
   void CloseSocket();
 
   /** 
    * Cancels connect/accept task loop, if one is currently running.
    */
   void CancelSocketTask();
+
+  /** 
+   * Callback for socket connect/accept success. Called after connect/accept has
+   * finished. Will be run on main thread, before any reads take place.
+   */
+  virtual void OnConnectSuccess() = 0;
+
+  /** 
+   * Callback for socket connect/accept error. Will be run on main thread.
+   */
+  virtual void OnConnectError() = 0;
+
+
+  /** 
+   * Called by implementation to notify consumer of success.
+   */
+  void NotifySuccess();
+
+  /** 
+   * Called by implementation to notify consumer of error.
+   */
+  void NotifyError();
+
 private:
   UnixSocketImpl* mImpl;
+  SocketConnectionStatus mConnectionStatus;
 };
 
 } // namespace ipc
 } // namepsace mozilla
 
 #endif // mozilla_ipc_Socket_h