Bug 796176 - Patch 1: UnixSocket changes to get connect/listen running main thread, connect status to consumers. r=cjones, a=blocking-basecamp
authorKyle Machulis <kyle@nonpolynomial.com>
Wed, 10 Oct 2012 22:48:40 -0700
changeset 116620 e47353b30f18e46b660158753be885b2526410c5
parent 116619 d00caf574dd57c5598851d03d62ae7e01bf417ae
child 116621 fa62fe4387dde61c5fa25aee78807da9861b9b55
push id239
push userakeybl@mozilla.com
push dateThu, 03 Jan 2013 21:54:43 +0000
treeherdermozilla-release@3a7b66445659 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerscjones, blocking-basecamp
bugs796176
milestone18.0a2
Bug 796176 - Patch 1: UnixSocket changes to get connect/listen running main thread, connect status to consumers. r=cjones, a=blocking-basecamp
ipc/unixsocket/UnixSocket.cpp
ipc/unixsocket/UnixSocket.h
--- a/ipc/unixsocket/UnixSocket.cpp
+++ b/ipc/unixsocket/UnixSocket.cpp
@@ -49,16 +49,17 @@ public:
     , mConnector(aConnector)
     , mCurrentTaskIsCanceled(false)
     , mAddress(aAddress)
   {
   }
 
   ~UnixSocketImpl()
   {
+    StopTask();
     mReadWatcher.StopWatchingFileDescriptor();
     mWriteWatcher.StopWatchingFileDescriptor();
   }
 
   void QueueWriteData(UnixSocketRawData* aData)
   {
     mOutgoingQ.AppendElement(aData);
     OnFileCanWriteWithoutBlocking(mFd);
@@ -111,22 +112,16 @@ public:
     mIOLoop = MessageLoopForIO::current();
     mIOLoop->WatchFileDescriptor(mFd,
                                  true,
                                  MessageLoopForIO::WATCH_READ,
                                  &mReadWatcher,
                                  this);
   }
 
-  void PrepareRemoval()
-  {
-    mTask = nullptr;
-    mCurrentTaskIsCanceled = true;
-  }
-
   /** 
    * Connect to a socket
    */
   void Connect();
 
   /** 
    * Run bind/listen to prepare for further runs of accept()
    */
@@ -135,17 +130,24 @@ public:
   /** 
    * Accept an incoming connection
    */
   void Accept();
 
   /** 
    * Stop whatever connect/accept task is running
    */
-  void Stop();
+  void StopTask()
+  {
+    if (mTask) {
+      mTask->Cancel();
+      mTask = nullptr;
+    }
+    mCurrentTaskIsCanceled = true;
+  }
 
   /** 
    * Set up nonblocking flags on whatever our current file descriptor is.
    *
    * @return true if successful, false otherwise
    */
   bool SetNonblockFlags();
 
@@ -229,16 +231,53 @@ private:
 
 static void
 DestroyImpl(UnixSocketImpl* impl)
 {
   MOZ_ASSERT(impl);
   delete impl;
 }
 
+class OnSocketEventTask : public nsRunnable
+{
+public:
+  enum SocketEvent {
+    CONNECT_SUCCESS,
+    CONNECT_ERROR,
+    DISCONNECT
+  };
+
+  OnSocketEventTask(UnixSocketImpl* aImpl, SocketEvent e) :
+    mImpl(aImpl),
+    mEvent(e)
+  {
+    MOZ_ASSERT(aImpl);
+  }
+
+  NS_IMETHOD Run()
+  {
+    MOZ_ASSERT(NS_IsMainThread());
+    if (!mImpl->mConsumer) {
+      NS_WARNING("CloseSocket has already been called! (mConsumer is null)");
+      // Since we've already explicitly closed and the close happened before
+      // this, this isn't really an error. Since we've warned, return OK.
+      return NS_OK;
+    }
+    if (mEvent == CONNECT_SUCCESS) {
+      mImpl->mConsumer->NotifySuccess();
+    } else if (mEvent == CONNECT_ERROR) {
+      mImpl->mConsumer->NotifyError();
+    }
+    return NS_OK;
+  }
+private:
+  UnixSocketImpl* mImpl;
+  SocketEvent mEvent;
+};
+
 class SocketReceiveTask : public nsRunnable
 {
 public:
   SocketReceiveTask(UnixSocketImpl* aImpl, UnixSocketRawData* aData) :
     mImpl(aImpl),
     mRawData(aData)
   {
     MOZ_ASSERT(aImpl);
@@ -270,17 +309,18 @@ public:
       mImpl(aImpl),
       mData(aData)
   {
     MOZ_ASSERT(aConsumer);
     MOZ_ASSERT(aImpl);
     MOZ_ASSERT(aData);
   }
 
-  void Run()
+  void
+  Run()
   {
     mImpl->QueueWriteData(mData);
   }
 
 private:
   nsRefPtr<UnixSocketConsumer> mConsumer;
   UnixSocketImpl* mImpl;
   UnixSocketRawData* mData;
@@ -289,17 +329,18 @@ private:
 class StartImplReadingTask : public Task
 {
 public:
   StartImplReadingTask(UnixSocketImpl* aImpl)
     : mImpl(aImpl)
   {
   }
 
-  void Run()
+  void
+  Run()
   {
     mImpl->SetUpIO();
   }
 private:
   UnixSocketImpl* mImpl;
 };
 
 class SocketAcceptTask : public CancelableTask {
@@ -339,16 +380,21 @@ void SocketConnectTask::Run() {
 }
 
 void
 UnixSocketImpl::Accept()
 {
   socklen_t addr_sz;
   struct sockaddr addr;
 
+  if (!mConnector) {
+    NS_WARNING("No connector object available!");
+    return;
+  }
+
   // This will set things we don't particularly care about, but it will hand
   // back the correct structure size which is what we do care about.
   mConnector->CreateAddr(true, addr_sz, &addr, nullptr);
 
   if(mFd.get() < 0)
   {
     mFd = mConnector->Create();
     if (mFd.get() < 0) {
@@ -373,34 +419,33 @@ UnixSocketImpl::Accept()
       return;
     }
 
   }
 
   int client_fd;
   client_fd = accept(mFd.get(), &addr, &addr_sz);
   if (client_fd < 0) {
-#if DEBUG
-    LOG("Socket accept errno=%d\n", errno);
-#endif
     EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
     return;
   }
 
-  if(client_fd < 0)
-  {
-    EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
-    return;
-  }
-
-  if (!mConnector->Setup(client_fd)) {
+  if (!mConnector->SetUp(client_fd)) {
     NS_WARNING("Could not set up socket!");
     return;
   }
   mFd.reset(client_fd);
+
+  nsRefPtr<OnSocketEventTask> t =
+    new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
+  NS_DispatchToMainThread(t);
+
+  // Due to the fact that we've dispatched our OnConnectSuccess message before
+  // starting reading, we're guaranteed that any subsequent read tasks will
+  // happen after the object has been notified of a successful connect.
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
                                    new StartImplReadingTask(this));
 }
 
 void
 UnixSocketImpl::Connect()
 {
   if(mFd.get() < 0)
@@ -419,24 +464,34 @@ UnixSocketImpl::Connect()
 
   ret = connect(mFd.get(), &addr, addr_sz);
 
   if (ret) {
 #if DEBUG
     LOG("Socket connect errno=%d\n", errno);
 #endif
     mFd.reset(-1);
+    nsRefPtr<OnSocketEventTask> t =
+      new OnSocketEventTask(this, OnSocketEventTask::CONNECT_ERROR);
+    NS_DispatchToMainThread(t);
     return;
   }
 
-  if (!mConnector->Setup(mFd)) {
+  if (!mConnector->SetUp(mFd)) {
     NS_WARNING("Could not set up socket!");
     return;
   }
 
+  nsRefPtr<OnSocketEventTask> t =
+    new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
+  NS_DispatchToMainThread(t);
+
+  // Due to the fact that we've dispatched our OnConnectSuccess message before
+  // starting reading, we're guaranteed that any subsequent read tasks will
+  // happen after the object has been notified of a successful connect.
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
                                    new StartImplReadingTask(this));
 }
 
 bool
 UnixSocketImpl::SetNonblockFlags()
 {
   // Set socket addr to be reused even if kernel is still waiting to close
@@ -457,34 +512,41 @@ UnixSocketImpl::SetNonblockFlags()
   // Select non-blocking IO.
   if (-1 == fcntl(mFd, F_SETFL, O_NONBLOCK)) {
     return false;
   }
 
   return true;
 }
 
+UnixSocketConsumer::UnixSocketConsumer() : mImpl(nullptr)
+                                         , mConnectionStatus(SOCKET_DISCONNECTED)
+{
+}
+
 UnixSocketConsumer::~UnixSocketConsumer()
 {
 }
 
 bool
 UnixSocketConsumer::SendSocketData(UnixSocketRawData* aData)
 {
+  MOZ_ASSERT(NS_IsMainThread());
   if (!mImpl) {
     return false;
   }
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
                                    new SocketSendTask(this, mImpl, aData));
   return true;
 }
 
 bool
 UnixSocketConsumer::SendSocketData(const nsACString& aStr)
 {
+  MOZ_ASSERT(NS_IsMainThread());
   if (!mImpl) {
     return false;
   }
   if (aStr.Length() > UnixSocketRawData::MAX_DATA_SIZE) {
     return false;
   }
   nsCString str(aStr);
   UnixSocketRawData* d = new UnixSocketRawData(aStr.Length());
@@ -496,18 +558,19 @@ UnixSocketConsumer::SendSocketData(const
 
 void
 UnixSocketConsumer::CloseSocket()
 {
   if (!mImpl) {
     return;
   }
   UnixSocketImpl* impl = mImpl;
-  mImpl->mConsumer.forget();
   mImpl = nullptr;
+  impl->mConsumer.forget();
+  impl->StopTask();
   // To make sure the owner doesn't die on the IOThread, remove pointer here
   // Line it up to be destructed on the IO Thread
   // Kill our pointer to it
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
                                    NewRunnableFunction(DestroyImpl,
                                                        impl));
 }
 
@@ -533,20 +596,17 @@ UnixSocketImpl::OnFileCanReadWithoutBloc
           }
           else if (errno == EAGAIN || errno == EWOULDBLOCK) {
             mIncoming.forget();
             return; // no data available: return and re-poll
           }
           // else fall through to error handling on other errno's
         }
 #ifdef DEBUG
-        nsAutoString str;
-        str.AssignLiteral("Cannot read from network, error ");
-        str += (int)ret;
-        NS_WARNING(NS_ConvertUTF16toUTF8(str).get());
+        NS_WARNING("Cannot read from network");
 #endif
         // At this point, assume that we can't actually access
         // the socket anymore
         mIncoming.forget();
         mReadWatcher.StopWatchingFileDescriptor();
         mWriteWatcher.StopWatchingFileDescriptor();
         mConsumer->CloseSocket();
         return;
@@ -603,53 +663,71 @@ UnixSocketImpl::OnFileCanWriteWithoutBlo
         this);
       return;
     }
     mOutgoingQ.RemoveElementAt(0);
     delete data;
   }
 }
 
+void
+UnixSocketConsumer::NotifySuccess()
+{
+  MOZ_ASSERT(NS_IsMainThread());
+  mConnectionStatus = SOCKET_CONNECTED;
+  OnConnectSuccess();
+}
+
+void
+UnixSocketConsumer::NotifyError()
+{
+  MOZ_ASSERT(NS_IsMainThread());
+  mConnectionStatus = SOCKET_DISCONNECTED;
+  OnConnectError();
+}
 bool
 UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector,
                                   const char* aAddress)
 {
-  MOZ_ASSERT(!NS_IsMainThread());
   MOZ_ASSERT(aConnector);
+  MOZ_ASSERT(NS_IsMainThread());
   if (mImpl) {
     NS_WARNING("Socket already connecting/connected!");
     return false;
   }
   nsCString addr;
   addr.Assign(aAddress);
   mImpl = new UnixSocketImpl(this, aConnector, addr);
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
                                    new SocketConnectTask(mImpl));
+  mConnectionStatus = SOCKET_CONNECTING;
   return true;
 }
 
 bool
 UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector)
 {
-  MOZ_ASSERT(!NS_IsMainThread());
   MOZ_ASSERT(aConnector);
+  MOZ_ASSERT(NS_IsMainThread());
   if (mImpl) {
     NS_WARNING("Socket already connecting/connected!");
     return false;
   }
   nsCString addr;
   mImpl = new UnixSocketImpl(this, aConnector, addr);
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
                                    new SocketAcceptTask(mImpl));
+  mConnectionStatus = SOCKET_CONNECTING;
   return true;
 }
 
 void
 UnixSocketConsumer::CancelSocketTask()
 {
+  mConnectionStatus = SOCKET_DISCONNECTED;
   if(!mImpl) {
     NS_WARNING("No socket implementation to cancel task on!");
     return;
   }
   mImpl->CancelTask();
 }
 
 } // namespace ipc
--- a/ipc/unixsocket/UnixSocket.h
+++ b/ipc/unixsocket/UnixSocket.h
@@ -95,28 +95,37 @@ public:
 
   /** 
    * Does any socket type specific setup that may be needed
    *
    * @param aFd File descriptor for opened socket
    *
    * @return true is successful, false otherwise
    */
-  virtual bool Setup(int aFd) = 0;  
+  virtual bool SetUp(int aFd) = 0;
+};
+
+enum SocketConnectionStatus {
+  SOCKET_DISCONNECTED = 0,
+  SOCKET_CONNECTING = 1,
+  SOCKET_CONNECTED = 2
 };
 
 class UnixSocketConsumer : public RefCounted<UnixSocketConsumer>
 {
 public:
-  UnixSocketConsumer()
-    : mImpl(nullptr)
-  {}
+  UnixSocketConsumer();
 
   virtual ~UnixSocketConsumer();
 
+  SocketConnectionStatus GetConnectionStatus()
+  {
+    return mConnectionStatus;
+  }
+
   /**
    * Function to be called whenever data is received. This is only called on the
    * main thread.
    *
    * @param aMessage Data received from the socket.
    */
   virtual void ReceiveSocketData(UnixSocketRawData* aMessage) = 0;
 
@@ -167,16 +176,40 @@ public:
    * from main thread.
    */
   void CloseSocket();
 
   /** 
    * Cancels connect/accept task loop, if one is currently running.
    */
   void CancelSocketTask();
+
+  /** 
+   * Callback for socket connect/accept success. Called after connect/accept has
+   * finished. Will be run on main thread, before any reads take place.
+   */
+  virtual void OnConnectSuccess() = 0;
+
+  /** 
+   * Callback for socket connect/accept error. Will be run on main thread.
+   */
+  virtual void OnConnectError() = 0;
+
+
+  /** 
+   * Called by implementation to notify consumer of success.
+   */
+  void NotifySuccess();
+
+  /** 
+   * Called by implementation to notify consumer of error.
+   */
+  void NotifyError();
+
 private:
   UnixSocketImpl* mImpl;
+  SocketConnectionStatus mConnectionStatus;
 };
 
 } // namespace ipc
 } // namepsace mozilla
 
 #endif // mozilla_ipc_Socket_h