Bug 790739: Patch 1 - UnixSocket changes for server sockets; r=cjones
authorKyle Machulis <kyle@nonpolynomial.com>
Sun, 30 Sep 2012 22:54:27 -0700
changeset 115582 e8e7e0cf43d85ddca5dcde5054df6c5abe02a481
parent 115581 1983304d2e60e8de09205c7e1f724109afdcc606
child 115583 826ef2970e19fe08b1587f2f1ffbb43cbe90ae2a
push id239
push userakeybl@mozilla.com
push dateThu, 03 Jan 2013 21:54:43 +0000
treeherdermozilla-release@3a7b66445659 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerscjones
bugs790739
milestone18.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 790739: Patch 1 - UnixSocket changes for server sockets; r=cjones
ipc/unixsocket/UnixSocket.cpp
ipc/unixsocket/UnixSocket.h
--- a/ipc/unixsocket/UnixSocket.cpp
+++ b/ipc/unixsocket/UnixSocket.cpp
@@ -19,26 +19,41 @@
 #include "mozilla/Monitor.h"
 #include "mozilla/Util.h"
 #include "mozilla/FileUtils.h"
 #include "nsString.h"
 #include "nsThreadUtils.h"
 #include "nsTArray.h"
 #include "nsXULAppAPI.h"
 
+#undef LOG
+#if defined(MOZ_WIDGET_GONK)
+#include <android/log.h>
+#define LOG(args...)  __android_log_print(ANDROID_LOG_INFO, "GonkDBus", args);
+#else
+#define BTDEBUG true
+#define LOG(args...) if (BTDEBUG) printf(args);
+#endif
+
+static const int SOCKET_RETRY_TIME_MS = 1000;
+
 namespace mozilla {
 namespace ipc {
 
 class UnixSocketImpl : public MessageLoopForIO::Watcher
 {
 public:
-  UnixSocketImpl(UnixSocketConsumer* aConsumer, int aFd)
+  UnixSocketImpl(UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector,
+                 const nsACString& aAddress)
     : mConsumer(aConsumer)
     , mIOLoop(nullptr)
-    , mFd(aFd)
+    , mFd(-1)
+    , mConnector(aConnector)
+    , mCurrentTaskIsCanceled(false)
+    , mAddress(aAddress)
   {
   }
 
   ~UnixSocketImpl()
   {
     mReadWatcher.StopWatchingFileDescriptor();
     mWriteWatcher.StopWatchingFileDescriptor();
   }
@@ -49,32 +64,95 @@ public:
     OnFileCanWriteWithoutBlocking(mFd);
   }
 
   bool isFdValid()
   {
     return mFd > 0;
   }
 
+  void CancelTask()
+  {
+    if (!mTask) {
+      return;
+    }
+    mTask->Cancel();
+    mTask = nullptr;
+    mCurrentTaskIsCanceled = true;
+  }
+  
+  void UnsetTask()
+  {
+    mTask = nullptr;
+  }
+
+  void EnqueueTask(int aDelayMs, CancelableTask* aTask)
+  {
+    MessageLoopForIO* ioLoop = MessageLoopForIO::current();
+    if (!ioLoop) {
+      NS_WARNING("No IOLoop to attach to, cancelling self!");
+      return;
+    }
+    if (mTask) {
+      return;
+    }
+    if (mCurrentTaskIsCanceled) {
+      return;
+    }
+    mTask = aTask;
+    if (aDelayMs) {
+      ioLoop->PostDelayedTask(FROM_HERE, mTask, aDelayMs);
+    } else {
+      ioLoop->PostTask(FROM_HERE, mTask);
+    }
+  }
+  
   void SetUpIO()
   {
     MOZ_ASSERT(!mIOLoop);
     mIOLoop = MessageLoopForIO::current();
     mIOLoop->WatchFileDescriptor(mFd,
                                  true,
                                  MessageLoopForIO::WATCH_READ,
                                  &mReadWatcher,
                                  this);
   }
 
   void PrepareRemoval()
   {
     mConsumer.forget();
   }
 
+  /** 
+   * Connect to a socket
+   */
+  void Connect();
+
+  /** 
+   * Run bind/listen to prepare for further runs of accept()
+   */
+  void Listen();
+
+  /** 
+   * Accept an incoming connection
+   */
+  void Accept();
+
+  /** 
+   * Stop whatever connect/accept task is running
+   */
+  void Stop();
+
+  /** 
+   * Set up nonblocking flags on whatever our current file descriptor is.
+   *
+   * @return true if successful, false otherwise
+   */
+  bool SetNonblockFlags();
+
   /**
    * Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
    * directly from main thread. All non-main-thread accesses should happen with
    * mImpl as container.
    */
   RefPtr<UnixSocketConsumer> mConsumer;
 
 private:
@@ -101,35 +179,56 @@ private:
 
   /**
    * Raw data queue. Must be pushed/popped from IO thread only.
    */
   typedef nsTArray<UnixSocketRawData* > UnixSocketRawDataQueue;
   UnixSocketRawDataQueue mOutgoingQ;
 
   /**
-   * File descriptor to read from/write to. Connection happens on user provided
-   * thread. Read/write/close happens on IO thread.
-   */
-  ScopedClose mFd;
-
-  /**
    * Incoming packet. Only to be accessed on IO Thread.
    */
   nsAutoPtr<UnixSocketRawData> mIncoming;
 
   /**
    * Read watcher for libevent. Only to be accessed on IO Thread.
    */
   MessageLoopForIO::FileDescriptorWatcher mReadWatcher;
 
   /**
    * Write watcher for libevent. Only to be accessed on IO Thread.
    */
   MessageLoopForIO::FileDescriptorWatcher mWriteWatcher;
+
+  /**
+   * File descriptor to read from/write to. Connection happens on user provided
+   * thread. Read/write/close happens on IO thread.
+   */
+  ScopedClose mFd;
+
+  /** 
+   * Connector object used to create the connection we are currently using.
+   */
+  nsAutoPtr<UnixSocketConnector> mConnector;
+
+  /**
+   * If true, do not requeue whatever task we're running
+   */
+  bool mCurrentTaskIsCanceled;
+
+  /**
+   * Pointer to the task we're currently running. DO NOT DELETE MANUALLY. This
+   * will be taken care of by the IO loop. Just set to nullptr.
+   */
+  CancelableTask* mTask;
+
+  /**
+   * Address we are connecting to, assuming we are creating a client connection.
+   */
+  nsCString mAddress;
 };
 
 static void
 DestroyImpl(UnixSocketImpl* impl)
 {
   delete impl;
 }
 
@@ -196,37 +295,170 @@ public:
   void Run()
   {
     mImpl->SetUpIO();
   }
 private:
   UnixSocketImpl* mImpl;
 };
 
-bool
-UnixSocketConnector::Connect(int aFd, const char* aAddress)
+class SocketAcceptTask : public CancelableTask {
+  virtual void Run();
+
+  bool mCanceled;
+  UnixSocketImpl* mImpl;
+public:
+  virtual void Cancel() { mCanceled = true; }
+  SocketAcceptTask(UnixSocketImpl* aImpl) : mCanceled(false), mImpl(aImpl) { }
+};
+
+void SocketAcceptTask::Run() {
+  mImpl->UnsetTask();
+  if (mCanceled) {
+    return;
+  }
+  mImpl->Accept();
+}
+
+class SocketConnectTask : public CancelableTask {
+  virtual void Run();
+
+  bool mCanceled;
+  UnixSocketImpl* mImpl;
+public:
+  SocketConnectTask(UnixSocketImpl* aImpl) : mCanceled(false), mImpl(aImpl) { }
+  virtual void Cancel() { mCanceled = true; }  
+};
+
+void SocketConnectTask::Run() {
+  mImpl->UnsetTask();
+  if (mCanceled) {
+    return;
+  }
+  mImpl->Connect();
+}
+
+void
+UnixSocketImpl::Accept()
 {
-  if (!ConnectInternal(aFd, aAddress))
+  socklen_t addr_sz;
+  struct sockaddr addr;
+
+  // This will set things we don't particularly care about, but it will hand
+  // back the correct structure size which is what we do care about.
+  mConnector->CreateAddr(true, addr_sz, &addr, nullptr);
+
+  if(mFd.get() < 0)
   {
-    return false;
+    mFd = mConnector->Create();
+    if (mFd.get() < 0) {
+      return;
+    }
+
+    if (!SetNonblockFlags()) {
+      return;
+    }
+
+    if (bind(mFd.get(), &addr, addr_sz)) {
+#ifdef DEBUG
+      LOG("...bind(%d) gave errno %d", mFd.get(), errno);
+#endif
+      return;
+    }
+
+    if (listen(mFd.get(), 1)) {
+#ifdef DEBUG
+      LOG("...listen(%d) gave errno %d", mFd.get(), errno);
+#endif
+      return;
+    }
+
   }
 
+  int client_fd;
+  client_fd = accept(mFd.get(), &addr, &addr_sz);
+  if (client_fd < 0) {
+#if DEBUG
+    LOG("Socket accept errno=%d\n", errno);
+#endif
+    EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
+    return;
+  }
+
+  if(client_fd < 0)
+  {
+    EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
+    return;
+  }
+
+  if (!mConnector->Setup(client_fd)) {
+    NS_WARNING("Could not set up socket!");
+    return;
+  }
+  mFd.reset(client_fd);
+  XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
+                                   new StartImplReadingTask(this));
+}
+
+void
+UnixSocketImpl::Connect()
+{
+  if(mFd.get() < 0)
+  {
+    mFd = mConnector->Create();
+    if (mFd.get() < 0) {
+      return;
+    }
+  }
+
+  int ret;
+  socklen_t addr_sz;
+  struct sockaddr addr;
+
+  mConnector->CreateAddr(false, addr_sz, &addr, mAddress.get());
+
+  ret = connect(mFd.get(), &addr, addr_sz);
+
+  if (ret) {
+#if DEBUG
+    LOG("Socket connect errno=%d\n", errno);
+#endif
+    mFd.reset(-1);
+    return;
+  }
+
+  if (!mConnector->Setup(mFd)) {
+    NS_WARNING("Could not set up socket!");
+    return;
+  }
+
+  XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
+                                   new StartImplReadingTask(this));
+}
+
+bool
+UnixSocketImpl::SetNonblockFlags()
+{
+  // Set socket addr to be reused even if kernel is still waiting to close
+  int n = 1;
+  setsockopt(mFd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n));
+
   // Set close-on-exec bit.
-  int flags = fcntl(aFd, F_GETFD);
+  int flags = fcntl(mFd, F_GETFD);
   if (-1 == flags) {
     return false;
   }
 
   flags |= FD_CLOEXEC;
-  if (-1 == fcntl(aFd, F_SETFD, flags)) {
+  if (-1 == fcntl(mFd, F_SETFD, flags)) {
     return false;
   }
 
   // Select non-blocking IO.
-  if (-1 == fcntl(aFd, F_SETFL, O_NONBLOCK)) {
+  if (-1 == fcntl(mFd, F_SETFL, O_NONBLOCK)) {
     return false;
   }
 
   return true;
 }
 
 UnixSocketConsumer::~UnixSocketConsumer()
 {
@@ -368,28 +600,53 @@ UnixSocketImpl::OnFileCanWriteWithoutBlo
       return;
     }
     mOutgoingQ.RemoveElementAt(0);
     delete data;
   }
 }
 
 bool
-UnixSocketConsumer::ConnectSocket(UnixSocketConnector& aConnector,
+UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector,
                                   const char* aAddress)
 {
   MOZ_ASSERT(!NS_IsMainThread());
-  MOZ_ASSERT(!mImpl);
-  ScopedClose fd(aConnector.Create());
-  if (fd < 0) {
+  MOZ_ASSERT(aConnector);
+  if (mImpl) {
+    NS_WARNING("Socket already connecting/connected!");
     return false;
   }
-  if (!aConnector.Connect(fd, aAddress)) {
+  nsCString addr;
+  addr.Assign(aAddress);
+  mImpl = new UnixSocketImpl(this, aConnector, addr);
+  XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
+                                   new SocketConnectTask(mImpl));
+  return true;
+}
+
+bool
+UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector)
+{
+  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(aConnector);
+  if (mImpl) {
+    NS_WARNING("Socket already connecting/connected!");
     return false;
   }
-  mImpl = new UnixSocketImpl(this, fd.forget());
+  nsCString addr;
+  mImpl = new UnixSocketImpl(this, aConnector, addr);
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
-                                   new StartImplReadingTask(mImpl));
+                                   new SocketAcceptTask(mImpl));
   return true;
 }
 
+void
+UnixSocketConsumer::CancelSocketTask()
+{
+  if(!mImpl) {
+    NS_WARNING("No socket implementation to cancel task on!");
+    return;
+  }
+  mImpl->CancelTask();
+}
+
 } // namespace ipc
 } // namespace mozilla
--- a/ipc/unixsocket/UnixSocket.h
+++ b/ipc/unixsocket/UnixSocket.h
@@ -19,43 +19,43 @@ struct UnixSocketRawData
 {
   static const size_t MAX_DATA_SIZE = 1024;
   uint8_t mData[MAX_DATA_SIZE];
 
   // Number of octets in mData.
   size_t mSize;
   size_t mCurrentWriteOffset;
 
-  /** 
+  /**
    * Constructor for situations where size is not known beforehand. (for
    * example, when reading a packet)
    *
    */
   UnixSocketRawData() :
     mSize(0),
     mCurrentWriteOffset(0)
   {
   }
 
-  /** 
+  /**
    * Constructor for situations where size is known beforehand (for example,
    * when being assigned strings)
    *
    */
   UnixSocketRawData(int aSize) :
     mSize(aSize),
     mCurrentWriteOffset(0)
   {
   }
 
 };
 
 class UnixSocketImpl;
 
-/** 
+/**
  * UnixSocketConnector defines the socket creation and connection/listening
  * functions for a UnixSocketConsumer. Due to the fact that socket setup can
  * vary between protocols (unix sockets, tcp sockets, bluetooth sockets, etc),
  * this allows the user to create whatever connection mechanism they need while
  * still depending on libevent for non-blocking communication handling.
  *
  * FIXME/Bug 793980: Currently only virtual, since we only support bluetooth.
  * Should make connection functions for other unix sockets so we can support
@@ -65,106 +65,117 @@ class UnixSocketConnector
 {
 public:
   UnixSocketConnector()
   {}
 
   virtual ~UnixSocketConnector()
   {}
 
-  /** 
+  /**
    * Establishs a file descriptor for a socket.
    *
    * @return File descriptor for socket
    */
   virtual int Create() = 0;
 
   /** 
-   * Runs connect function on a file descriptor for the address specified. Makes
-   * sure socket is marked with flags expected by the UnixSocket handler
-   * (non-block, etc...)
+   * Since most socket specifics are related to address formation into a
+   * sockaddr struct, this function is defined by subclasses and fills in the
+   * structure as needed for whatever connection it is trying to build
    *
-   * @param aFd File descriptor created by Create() function
-   * @param aAddress Address to connect to
-   *
-   * @return true if connected, false otherwise
+   * @param aIsServer True is we are acting as a server socket
+   * @param aAddrSize Size of the struct 
+   * @param aAddr Struct to fill
+   * @param aAddress If aIsServer is false, Address to connect to. nullptr otherwise.
    */
-  bool Connect(int aFd, const char* aAddress);
-  
-protected:
+  virtual void CreateAddr(bool aIsServer,
+                          socklen_t& aAddrSize,
+                          struct sockaddr *aAddr,
+                          const char* aAddress) = 0;
+
   /** 
-   * Internal type-specific connection function to be overridden by child
-   * classes.
+   * Does any socket type specific setup that may be needed
    *
-   * @param aFd File descriptor created by Create() function
-   * @param aAddress Address to connect to
+   * @param aFd File descriptor for opened socket
    *
-   * @return true if connected, false otherwise
+   * @return true is successful, false otherwise
    */
-  virtual bool ConnectInternal(int aFd, const char* aAddress) = 0;
+  virtual bool Setup(int aFd) = 0;  
 };
 
 class UnixSocketConsumer : public RefCounted<UnixSocketConsumer>
 {
 public:
   UnixSocketConsumer()
     : mImpl(nullptr)
   {}
 
   virtual ~UnixSocketConsumer();
-  
-  /** 
+
+  /**
    * Function to be called whenever data is received. This is only called on the
    * main thread.
    *
    * @param aMessage Data received from the socket.
    */
   virtual void ReceiveSocketData(UnixSocketRawData* aMessage) = 0;
 
-  /** 
+  /**
    * Queue data to be sent to the socket on the IO thread. Can only be called on
    * originating thread.
    *
    * @param aMessage Data to be sent to socket
    *
    * @return true if data is queued, false otherwise (i.e. not connected)
    */
   bool SendSocketData(UnixSocketRawData* aMessage);
 
-  /** 
+  /**
    * Convenience function for sending strings to the socket (common in bluetooth
    * profile usage). Converts to a UnixSocketRawData struct. Can only be called
    * on originating thread.
    *
    * @param aMessage String to be sent to socket
    *
    * @return true if data is queued, false otherwise (i.e. not connected)
    */
   bool SendSocketData(const nsACString& aMessage);
 
-  /** 
-   * Connects to a socket. Due to the fact that this is a blocking connect (and
-   * for things such as bluetooth, it /will/ block), it is expected to be run on
-   * a thread provided by the user. It cannot run on the main thread. Runs the
-   * Create() and Connect() functions in the UnixSocketConnector.
+  /**
+   * Starts a task on the socket that will try to connect to a socket in a
+   * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @param aAddress Address to connect to.
    *
-   * @return true on connection, false otherwise.
+   * @return true on connect task started, false otherwise.
    */
-  bool ConnectSocket(UnixSocketConnector& aConnector, const char* aAddress);
+  bool ConnectSocket(UnixSocketConnector* aConnector, const char* aAddress);
 
   /** 
+   * Starts a task on the socket that will try to accept a new connection in a
+   * non-blocking manner.
+   *
+   * @param aConnector Connector object for socket type specific functions
+   *
+   * @return true on listen started, false otherwise
+   */
+  bool ListenSocket(UnixSocketConnector* aConnector);
+
+  /**
    * Queues the internal representation of socket for deletion. Can be called
    * from main thread.
-   *
    */
   void CloseSocket();
 
+  /** 
+   * Cancels connect/accept task loop, if one is currently running.
+   */
+  void CancelSocketTask();
 private:
   nsAutoPtr<UnixSocketImpl> mImpl;
 };
 
 } // namespace ipc
 } // namepsace mozilla
 
 #endif // mozilla_ipc_Socket_h