Bug 790739: Patch 1 - UnixSocket changes for server sockets; r=cjones
authorKyle Machulis <kyle@nonpolynomial.com>
Sun, 30 Sep 2012 22:54:27 -0700
changeset 108823 e8e7e0cf43d85ddca5dcde5054df6c5abe02a481
parent 108822 1983304d2e60e8de09205c7e1f724109afdcc606
child 108824 826ef2970e19fe08b1587f2f1ffbb43cbe90ae2a
push id82
push usershu@rfrn.org
push dateFri, 05 Oct 2012 13:20:22 +0000
reviewerscjones
bugs790739
milestone18.0a1
Bug 790739: Patch 1 - UnixSocket changes for server sockets; r=cjones
ipc/unixsocket/UnixSocket.cpp
ipc/unixsocket/UnixSocket.h
--- a/ipc/unixsocket/UnixSocket.cpp
+++ b/ipc/unixsocket/UnixSocket.cpp
@@ -19,26 +19,41 @@
 #include "mozilla/Monitor.h"
 #include "mozilla/Util.h"
 #include "mozilla/FileUtils.h"
 #include "nsString.h"
 #include "nsThreadUtils.h"
 #include "nsTArray.h"
 #include "nsXULAppAPI.h"
 
+#undef LOG
+#if defined(MOZ_WIDGET_GONK)
+#include <android/log.h>
+#define LOG(args...)  __android_log_print(ANDROID_LOG_INFO, "GonkDBus", args);
+#else
+#define BTDEBUG true
+#define LOG(args...) if (BTDEBUG) printf(args);
+#endif
+
+static const int SOCKET_RETRY_TIME_MS = 1000;
+
 namespace mozilla {
 namespace ipc {
 
 class UnixSocketImpl : public MessageLoopForIO::Watcher
 {
 public:
-  UnixSocketImpl(UnixSocketConsumer* aConsumer, int aFd)
+  UnixSocketImpl(UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector,
+                 const nsACString& aAddress)
     : mConsumer(aConsumer)
     , mIOLoop(nullptr)
-    , mFd(aFd)
+    , mFd(-1)
+    , mConnector(aConnector)
+    , mCurrentTaskIsCanceled(false)
+    , mAddress(aAddress)
   {
   }
 
   ~UnixSocketImpl()
   {
     mReadWatcher.StopWatchingFileDescriptor();
     mWriteWatcher.StopWatchingFileDescriptor();
   }
@@ -49,32 +64,95 @@ public:
     OnFileCanWriteWithoutBlocking(mFd);
   }
 
   bool isFdValid()
   {
     return mFd > 0;
   }
 
+  void CancelTask()
+  {
+    if (!mTask) {
+      return;
+    }
+    mTask->Cancel();
+    mTask = nullptr;
+    mCurrentTaskIsCanceled = true;
+  }
+  
+  void UnsetTask()
+  {
+    mTask = nullptr;
+  }
+
+  void EnqueueTask(int aDelayMs, CancelableTask* aTask)
+  {
+    MessageLoopForIO* ioLoop = MessageLoopForIO::current();
+    if (!ioLoop) {
+      NS_WARNING("No IOLoop to attach to, cancelling self!");
+      return;
+    }
+    if (mTask) {
+      return;
+    }
+    if (mCurrentTaskIsCanceled) {
+      return;
+    }
+    mTask = aTask;
+    if (aDelayMs) {
+      ioLoop->PostDelayedTask(FROM_HERE, mTask, aDelayMs);
+    } else {
+      ioLoop->PostTask(FROM_HERE, mTask);
+    }
+  }
+  
   void SetUpIO()
   {
     MOZ_ASSERT(!mIOLoop);
     mIOLoop = MessageLoopForIO::current();
     mIOLoop->WatchFileDescriptor(mFd,
                                  true,
                                  MessageLoopForIO::WATCH_READ,
                                  &mReadWatcher,
                                  this);
   }
 
   void PrepareRemoval()
   {
     mConsumer.forget();
   }
 
+  /** 
+   * Connect to a socket
+   */
+  void Connect();
+
+  /** 
+   * Run bind/listen to prepare for further runs of accept()
+   */
+  void Listen();
+
+  /** 
+   * Accept an incoming connection
+   */
+  void Accept();
+
+  /** 
+   * Stop whatever connect/accept task is running
+   */
+  void Stop();
+
+  /** 
+   * Set up nonblocking flags on whatever our current file descriptor is.
+   *
+   * @return true if successful, false otherwise
+   */
+  bool SetNonblockFlags();
+
   /**
    * Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
    * directly from main thread. All non-main-thread accesses should happen with
    * mImpl as container.
    */
   RefPtr<UnixSocketConsumer> mConsumer;
 
 private:
@@ -101,35 +179,56 @@ private:
 
   /**
    * Raw data queue. Must be pushed/popped from IO thread only.
    */
   typedef nsTArray<UnixSocketRawData* > UnixSocketRawDataQueue;
   UnixSocketRawDataQueue mOutgoingQ;
 
   /**
-   * File descriptor to read from/write to. Connection happens on user provided
-   * thread. Read/write/close happens on IO thread.
-   */
-  ScopedClose mFd;
-
-  /**
    * Incoming packet. Only to be accessed on IO Thread.
    */
   nsAutoPtr<UnixSocketRawData> mIncoming;
 
   /**
    * Read watcher for libevent. Only to be accessed on IO Thread.
    */
   MessageLoopForIO::FileDescriptorWatcher mReadWatcher;
 
   /**
    * Write watcher for libevent. Only to be accessed on IO Thread.
    */
   MessageLoopForIO::FileDescriptorWatcher mWriteWatcher;
+
+  /**
+   * File descriptor to read from/write to. Connection happens on user provided
+   * thread. Read/write/close happens on IO thread.
+   */
+  ScopedClose mFd;
+
+  /** 
+   * Connector object used to create the connection we are currently using.
+   */
+  nsAutoPtr<UnixSocketConnector> mConnector;
+
+  /**
+   * If true, do not requeue whatever task we're running
+   */
+  bool mCurrentTaskIsCanceled;
+
+  /**
+   * Pointer to the task we're currently running. DO NOT DELETE MANUALLY. This
+   * will be taken care of by the IO loop. Just set to nullptr.
+   */
+  CancelableTask* mTask;
+
+  /**
+   * Address we are connecting to, assuming we are creating a client connection.
+   */
+  nsCString mAddress;
 };
 
 static void
 DestroyImpl(UnixSocketImpl* impl)
 {
   delete impl;
 }
 
@@ -196,37 +295,170 @@ public:
   void Run()
   {
     mImpl->SetUpIO();
   }
 private:
   UnixSocketImpl* mImpl;
 };
 
-bool
-UnixSocketConnector::Connect(int aFd, const char* aAddress)
+class SocketAcceptTask : public CancelableTask {
+  virtual void Run();
+
+  bool mCanceled;
+  UnixSocketImpl* mImpl;
+public:
+  virtual void Cancel() { mCanceled = true; }
+  SocketAcceptTask(UnixSocketImpl* aImpl) : mCanceled(false), mImpl(aImpl) { }
+};
+
+void SocketAcceptTask::Run() {
+  mImpl->UnsetTask();
+  if (mCanceled) {
+    return;
+  }
+  mImpl->Accept();
+}
+
+class SocketConnectTask : public CancelableTask {
+  virtual void Run();
+
+  bool mCanceled;
+  UnixSocketImpl* mImpl;
+public:
+  SocketConnectTask(UnixSocketImpl* aImpl) : mCanceled(false), mImpl(aImpl) { }
+  virtual void Cancel() { mCanceled = true; }  
+};
+
+void SocketConnectTask::Run() {
+  mImpl->UnsetTask();
+  if (mCanceled) {
+    return;
+  }
+  mImpl->Connect();
+}
+
+void
+UnixSocketImpl::Accept()
 {
-  if (!ConnectInternal(aFd, aAddress))
+  socklen_t addr_sz;
+  struct sockaddr addr;
+
+  // This will set things we don't particularly care about, but it will hand
+  // back the correct structure size which is what we do care about.
+  mConnector->CreateAddr(true, addr_sz, &addr, nullptr);
+
+  if(mFd.get() < 0)
   {
-    return false;
+    mFd = mConnector->Create();
+    if (mFd.get() < 0) {
+      return;
+    }
+
+    if (!SetNonblockFlags()) {
+      return;
+    }
+
+    if (bind(mFd.get(), &addr, addr_sz)) {
+#ifdef DEBUG
+      LOG("...bind(%d) gave errno %d", mFd.get(), errno);
+#endif
+      return;
+    }
+
+    if (listen(mFd.get(), 1)) {
+#ifdef DEBUG
+      LOG("...listen(%d) gave errno %d", mFd.get(), errno);
+#endif
+      return;
+    }
+
   }
 
+  int client_fd;
+  client_fd = accept(mFd.get(), &addr, &addr_sz);
+  if (client_fd < 0) {
+#if DEBUG
+    LOG("Socket accept errno=%d\n", errno);
+#endif
+    EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
+    return;
+  }
+
+  if(client_fd < 0)
+  {
+    EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
+    return;
+  }
+
+  if (!mConnector->Setup(client_fd)) {
+    NS_WARNING("Could not set up socket!");
+    return;
+  }
+  mFd.reset(client_fd);
+  XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
+                                   new StartImplReadingTask(this));
+}
+
+void
+UnixSocketImpl::Connect()
+{
+  if(mFd.get() < 0)
+  {
+    mFd = mConnector->Create();
+    if (mFd.get() < 0) {
+      return;
+    }
+  }
+
+  int ret;
+  socklen_t addr_sz;
+  struct sockaddr addr;
+
+  mConnector->CreateAddr(false, addr_sz, &addr, mAddress.get());
+
+  ret = connect(mFd.get(), &addr, addr_sz);
+
+  if (ret) {
+#if DEBUG
+    LOG("Socket connect errno=%d\n", errno);
+#endif
+    mFd.reset(-1);
+    return;
+  }
+
+  if (!mConnector->Setup(mFd)) {
+    NS_WARNING("Could not set up socket!");
+    return;
+  }
+
+  XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
+                                   new StartImplReadingTask(this));
+}
+
+bool
+UnixSocketImpl::SetNonblockFlags()
+{
+  // Set socket addr to be reused even if kernel is still waiting to close
+  int n = 1;
+  setsockopt(mFd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n));
+
   // Set close-on-exec bit.
-  int flags = fcntl(aFd, F_GETFD);
+  int flags = fcntl(mFd, F_GETFD);
   if (-1 == flags) {
     return false;
   }
 
   flags |= FD_CLOEXEC;
-  if (-1 == fcntl(aFd, F_SETFD, flags)) {
+  if (-1 == fcntl(mFd, F_SETFD, flags)) {
     return false;
   }
 
   // Select non-blocking IO.
-  if (-1 == fcntl(aFd, F_SETFL, O_NONBLOCK)) {
+  if (-1 == fcntl(mFd, F_SETFL, O_NONBLOCK)) {
     return false;
   }
 
   return true;
 }
 
 UnixSocketConsumer::~UnixSocketConsumer()
 {
@@ -368,28 +600,53 @@ UnixSocketImpl::OnFileCanWriteWithoutBlo
       return;
     }
     mOutgoingQ.RemoveElementAt(0);
     delete data;
   }
 }
 
 bool
-UnixSocketConsumer::ConnectSocket(UnixSocketConnector& aConnector,
+UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector,
                                   const char* aAddress)
 {
   MOZ_ASSERT(!NS_IsMainThread());
-  MOZ_ASSERT(!mImpl);
-  ScopedClose fd(aConnector.Create());
-  if (fd < 0) {
+  MOZ_ASSERT(aConnector);
+  if (mImpl) {
+    NS_WARNING("Socket already connecting/connected!");
     return false;
   }
-  if (!aConnector.Connect(fd, aAddress)) {
+  nsCString addr;
+  addr.Assign(aAddress);
+  mImpl = new UnixSocketImpl(this, aConnector, addr);
+  XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
+                                   new SocketConnectTask(mImpl));
+  return true;
+}
+
+bool
+UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector)
+{
+  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(aConnector);
+  if (mImpl) {
+    NS_WARNING("Socket already connecting/connected!");
     return false;
   }
-  mImpl = new UnixSocketImpl(this, fd.forget());
+  nsCString addr;
+  mImpl = new UnixSocketImpl(this, aConnector, addr);
   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
-                                   new StartImplReadingTask(mImpl));
+                                   new SocketAcceptTask(mImpl));
   return true;
 }
 
+void
+UnixSocketConsumer::CancelSocketTask()
+{
+  if(!mImpl) {
+    NS_WARNING("No socket implementation to cancel task on!");
+    return;
+  }
+  mImpl->CancelTask();
+}
+
 } // namespace ipc
 } // namespace mozilla
--- a/ipc/unixsocket/UnixSocket.h
+++ b/ipc/unixsocket/UnixSocket.h
@@ -19,43 +19,43 @@ struct UnixSocketRawData
 {
   static const size_t MAX_DATA_SIZE = 1024;
   uint8_t mData[MAX_DATA_SIZE];
 
   // Number of octets in mData.
   size_t mSize;
   size_t mCurrentWriteOffset;
 
-  /** 
+  /**
    * Constructor for situations where size is not known beforehand. (for
    * example, when reading a packet)
    *
    */
   UnixSocketRawData() :
     mSize(0),
     mCurrentWriteOffset(0)
   {
   }
 
-  /** 
+  /**
    * Constructor for situations where size is known beforehand (for example,
    * when being assigned strings)
    *
    */
   UnixSocketRawData(int aSize) :
     mSize(aSize),
     mCurrentWriteOffset(0)
   {
   }
 
 };
 
 class UnixSocketImpl;
 
-/** 
+/**
  * UnixSocketConnector defines the socket creation and connection/listening
  * functions for a UnixSocketConsumer. Due to the fact that socket setup can
  * vary between protocols (unix sockets, tcp sockets, bluetooth sockets, etc),
  * this allows the user to create whatever connection mechanism they need while
  * still depending on libevent for non-blocking communication handling.
  *
  * FIXME/Bug 793980: Currently only virtual, since we only support bluetooth.
  * Should make connection functions for other unix sockets so we can support
@@ -65,106 +65,117 @@ class UnixSocketConnector
 {
 public:
   UnixSocketConnector()
   {}
 
   virtual ~UnixSocketConnector()
   {}
 
-  /** 
+  /**
    * Establishs a file descriptor for a socket.
    *
    * @return File descriptor for socket
    */
   virtual int Create() = 0;
 
   /** 
-   * Runs connect function on a file descriptor for the address specified. Makes
-   * sure socket is marked with flags expected by the UnixSocket handler
-   * (non-block, etc...)
+   * Since most socket specifics are related to address formation into a
+   * sockaddr struct, this function is defined by subclasses and fills in the
+   * structure as needed for whatever connection it is trying to build
    *
-   * @param aFd File descriptor created by Create() function
-   * @param aAddress Address to connect to
-   *
-   * @return true if connected, false otherwise
+   * @param aIsServer True is we are acting as a server socket
+   * @param aAddrSize Size of the struct 
+   * @param aAddr Struct to fill
+   * @param aAddress If aIsServer is false, Address to connect to. nullptr otherwise.
    */
-  bool Connect(int aFd, const char* aAddress);
-  
-protected:
+  virtual void CreateAddr(bool aIsServer,
+                          socklen_t& aAddrSize,
+                          struct sockaddr *aAddr,
+                          const char* aAddress) = 0;
+
   /** 
-   * Internal type-specific connection function to be overridden by child
-   * classes.
+   * Does any socket type specific setup that may be needed
    *
-   * @param aFd File descriptor created by Create() function
-   * @param aAddress Address to connect to
+   * @param aFd File descriptor for opened socket
    *
-   * @return true if connected, false otherwise
+   * @return true is successful, false otherwise
    */
-  virtual bool ConnectInternal(int aFd, const char* aAddress) = 0;
+  virtual bool Setup(int aFd) = 0;  
 };
 
 class UnixSocketConsumer : public RefCounted<UnixSocketConsumer>
 {
 public:
   UnixSocketConsumer()
     : mImpl(nullptr)
   {}
 
   virtual ~UnixSocketConsumer();
-  
-  /** 
+
+  /**
    * Function to be called whenever data is received. This is only called on the
    * main thread.
    *
    * @param aMessage Data received from the socket.
    */
   virtual void ReceiveSocketData(UnixSocketRawData* aMessage) = 0;
 
-  /** 
+  /**
    * Queue data to be sent to the socket on the IO thread. Can only be called on
    * originating thread.
    *
    * @param aMessage Data to be sent to socket
    *
    * @return true if data is queued, false otherwise (i.e. not connected)
    */
   bool SendSocketData(UnixSocketRawData* aMessage);
 
-  /** 
+  /**
    * Convenience function for sending strings to the socket (common in bluetooth
    * profile usage). Converts to a UnixSocketRawData struct. Can only be called
    * on originating thread.
    *
    * @param aMessage String to be sent to socket
    *
    * @return true if data is queued, false otherwise (i.e. not connected)
    */
   bool SendSocketData(const nsACString& aMessage);
 
-  /** 
-   * Connects to a socket. Due to the fact that this is a blocking connect (and
-   * for things such as bluetooth, it /will/ block), it is expected to be run on
-   * a thread provided by the user. It cannot run on the main thread. Runs the
-   * Create() and Connect() functions in the UnixSocketConnector.
+  /**
+   * Starts a task on the socket that will try to connect to a socket in a
+   * non-blocking manner.
    *
    * @param aConnector Connector object for socket type specific functions
    * @param aAddress Address to connect to.
    *
-   * @return true on connection, false otherwise.
+   * @return true on connect task started, false otherwise.
    */
-  bool ConnectSocket(UnixSocketConnector& aConnector, const char* aAddress);
+  bool ConnectSocket(UnixSocketConnector* aConnector, const char* aAddress);
 
   /** 
+   * Starts a task on the socket that will try to accept a new connection in a
+   * non-blocking manner.
+   *
+   * @param aConnector Connector object for socket type specific functions
+   *
+   * @return true on listen started, false otherwise
+   */
+  bool ListenSocket(UnixSocketConnector* aConnector);
+
+  /**
    * Queues the internal representation of socket for deletion. Can be called
    * from main thread.
-   *
    */
   void CloseSocket();
 
+  /** 
+   * Cancels connect/accept task loop, if one is currently running.
+   */
+  void CancelSocketTask();
 private:
   nsAutoPtr<UnixSocketImpl> mImpl;
 };
 
 } // namespace ipc
 } // namepsace mozilla
 
 #endif // mozilla_ipc_Socket_h