Bug 1093025: Add |StreamSocket|, r=kyle
authorThomas Zimmermann <tdz@users.sourceforge.net>
Tue, 02 Dec 2014 15:00:36 -0800
changeset 218246 73e1dda14ec9fcdc43523bace69ac6cda2c86661
parent 218245 1840df9fd1906227e7d782e1e964e5ee333d77c3
child 218247 b28c38459608ce604042b946a9cf7a7d1a90493b
push id27924
push usercbook@mozilla.com
push dateWed, 03 Dec 2014 12:26:46 +0000
treeherdermozilla-central@7ea9f701465a [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerskyle
bugs1093025
milestone37.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1093025: Add |StreamSocket|, r=kyle |StreamSocket| implements a |ConnectionOrientedSocket| for stream sockets. It utilizes |ListenSocket| to wait for incoming connection requests.
ipc/unixsocket/StreamSocket.cpp
ipc/unixsocket/StreamSocket.h
ipc/unixsocket/moz.build
new file mode 100644
--- /dev/null
+++ b/ipc/unixsocket/StreamSocket.cpp
@@ -0,0 +1,680 @@
+/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */
+/* vim: set ts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "StreamSocket.h"
+#include <fcntl.h>
+#include "mozilla/RefPtr.h"
+#include "mozilla/unused.h"
+#include "nsXULAppAPI.h"
+#include "UnixSocketConnector.h"
+
+static const size_t MAX_READ_SIZE = 1 << 16;
+
+namespace mozilla {
+namespace ipc {
+
+//
+// StreamSocketIO
+//
+
+class StreamSocketIO MOZ_FINAL : public UnixSocketWatcher
+                               , protected SocketIOBase
+                               , public ConnectionOrientedSocketIO
+{
+public:
+  class ConnectTask;
+  class DelayedConnectTask;
+
+  StreamSocketIO(MessageLoop* mIOLoop,
+                 StreamSocket* aStreamSocket,
+                 UnixSocketConnector* aConnector,
+                 const nsACString& aAddress);
+  StreamSocketIO(MessageLoop* mIOLoop, int aFd,
+                 ConnectionStatus aConnectionStatus,
+                 StreamSocket* aStreamSocket,
+                 UnixSocketConnector* aConnector,
+                 const nsACString& aAddress);
+  ~StreamSocketIO();
+
+  void                GetSocketAddr(nsAString& aAddrStr) const;
+  SocketConsumerBase* GetConsumer();
+  SocketBase*         GetSocketBase();
+
+  // StreamSocketIOBase
+  //
+
+  nsresult Accept(int aFd,
+                  const union sockaddr_any* aAddr, socklen_t aAddrLen);
+
+  // Shutdown state
+  //
+
+  bool IsShutdownOnMainThread() const;
+  void ShutdownOnMainThread();
+
+  bool IsShutdownOnIOThread() const;
+  void ShutdownOnIOThread();
+
+  // Delayed-task handling
+  //
+
+  void SetDelayedConnectTask(CancelableTask* aTask);
+  void ClearDelayedConnectTask();
+  void CancelDelayedConnectTask();
+
+  // Task callback methods
+  //
+
+  /**
+   * Connect to a socket
+   */
+  void Connect();
+
+  void Send(UnixSocketRawData* aData);
+
+  // I/O callback methods
+  //
+
+  void OnAccepted(int aFd, const sockaddr_any* aAddr,
+                  socklen_t aAddrLen) MOZ_OVERRIDE;
+  void OnConnected() MOZ_OVERRIDE;
+  void OnError(const char* aFunction, int aErrno) MOZ_OVERRIDE;
+  void OnListening() MOZ_OVERRIDE;
+  void OnSocketCanReceiveWithoutBlocking() MOZ_OVERRIDE;
+  void OnSocketCanSendWithoutBlocking() MOZ_OVERRIDE;
+
+private:
+  void FireSocketError();
+
+  // Set up flags on file descriptor.
+  static bool SetSocketFlags(int aFd);
+
+  /**
+   * Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
+   * directly from main thread. All non-main-thread accesses should happen with
+   * mIO as container.
+   */
+  RefPtr<StreamSocket> mStreamSocket;
+
+  /**
+   * Connector object used to create the connection we are currently using.
+   */
+  nsAutoPtr<UnixSocketConnector> mConnector;
+
+  /**
+   * If true, do not requeue whatever task we're running
+   */
+  bool mShuttingDownOnIOThread;
+
+  /**
+   * Address we are connecting to, assuming we are creating a client connection.
+   */
+  nsCString mAddress;
+
+  /**
+   * Size of the socket address struct
+   */
+  socklen_t mAddrSize;
+
+  /**
+   * Address struct of the socket currently in use
+   */
+  sockaddr_any mAddr;
+
+  /**
+   * Task member for delayed connect task. Should only be access on main thread.
+   */
+  CancelableTask* mDelayedConnectTask;
+};
+
+StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop,
+                               StreamSocket* aStreamSocket,
+                               UnixSocketConnector* aConnector,
+                               const nsACString& aAddress)
+: UnixSocketWatcher(mIOLoop)
+, SocketIOBase(MAX_READ_SIZE)
+, mStreamSocket(aStreamSocket)
+, mConnector(aConnector)
+, mShuttingDownOnIOThread(false)
+, mAddress(aAddress)
+, mDelayedConnectTask(nullptr)
+{
+  MOZ_ASSERT(mStreamSocket);
+  MOZ_ASSERT(mConnector);
+}
+
+StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, int aFd,
+                               ConnectionStatus aConnectionStatus,
+                               StreamSocket* aStreamSocket,
+                               UnixSocketConnector* aConnector,
+                               const nsACString& aAddress)
+: UnixSocketWatcher(mIOLoop, aFd, aConnectionStatus)
+, SocketIOBase(MAX_READ_SIZE)
+, mStreamSocket(aStreamSocket)
+, mConnector(aConnector)
+, mShuttingDownOnIOThread(false)
+, mAddress(aAddress)
+, mDelayedConnectTask(nullptr)
+{
+  MOZ_ASSERT(mStreamSocket);
+  MOZ_ASSERT(mConnector);
+}
+
+StreamSocketIO::~StreamSocketIO()
+{
+  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(IsShutdownOnMainThread());
+}
+
+void
+StreamSocketIO::GetSocketAddr(nsAString& aAddrStr) const
+{
+  if (!mConnector) {
+    NS_WARNING("No connector to get socket address from!");
+    aAddrStr.Truncate();
+    return;
+  }
+  mConnector->GetSocketAddr(mAddr, aAddrStr);
+}
+
+SocketConsumerBase*
+StreamSocketIO::GetConsumer()
+{
+  return mStreamSocket.get();
+}
+
+SocketBase*
+StreamSocketIO::GetSocketBase()
+{
+  return GetConsumer();
+}
+
+bool
+StreamSocketIO::IsShutdownOnMainThread() const
+{
+  MOZ_ASSERT(NS_IsMainThread());
+
+  return mStreamSocket == nullptr;
+}
+
+void
+StreamSocketIO::ShutdownOnMainThread()
+{
+  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(!IsShutdownOnMainThread());
+
+  mStreamSocket = nullptr;
+}
+
+bool
+StreamSocketIO::IsShutdownOnIOThread() const
+{
+  return mShuttingDownOnIOThread;
+}
+
+void
+StreamSocketIO::ShutdownOnIOThread()
+{
+  MOZ_ASSERT(!NS_IsMainThread());
+  MOZ_ASSERT(!mShuttingDownOnIOThread);
+
+  Close(); // will also remove fd from I/O loop
+  mShuttingDownOnIOThread = true;
+}
+
+void
+StreamSocketIO::SetDelayedConnectTask(CancelableTask* aTask)
+{
+  MOZ_ASSERT(NS_IsMainThread());
+
+  mDelayedConnectTask = aTask;
+}
+
+void
+StreamSocketIO::ClearDelayedConnectTask()
+{
+  MOZ_ASSERT(NS_IsMainThread());
+
+  mDelayedConnectTask = nullptr;
+}
+
+void
+StreamSocketIO::CancelDelayedConnectTask()
+{
+  MOZ_ASSERT(NS_IsMainThread());
+
+  if (!mDelayedConnectTask) {
+    return;
+  }
+
+  mDelayedConnectTask->Cancel();
+  ClearDelayedConnectTask();
+}
+
+nsresult
+StreamSocketIO::Accept(int aFd,
+                       const union sockaddr_any* aAddr, socklen_t aAddrLen)
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+  MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTING);
+
+  // File-descriptor setup
+
+  if (!mConnector->SetUp(aFd)) {
+    NS_WARNING("Could not set up socket!");
+    return NS_ERROR_FAILURE;
+  }
+
+  if (!SetSocketFlags(aFd)) {
+    return NS_ERROR_FAILURE;
+  }
+  SetSocket(aFd, SOCKET_IS_CONNECTED);
+
+  AddWatchers(READ_WATCHER, true);
+  if (HasPendingData()) {
+    AddWatchers(WRITE_WATCHER, false);
+  }
+
+  // Address setup
+
+  memcpy(&mAddr, aAddr, aAddrLen);
+  mAddrSize = aAddrLen;
+
+  // Signal success
+
+  nsRefPtr<nsRunnable> r =
+    new SocketIOEventRunnable<StreamSocketIO>(
+      this, SocketIOEventRunnable<StreamSocketIO>::CONNECT_SUCCESS);
+  NS_DispatchToMainThread(r);
+
+  return NS_OK;
+}
+
+void
+StreamSocketIO::Connect()
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+  MOZ_ASSERT(mConnector);
+
+  if (!IsOpen()) {
+    int fd = mConnector->Create();
+    if (fd < 0) {
+      NS_WARNING("Cannot create socket fd!");
+      FireSocketError();
+      return;
+    }
+    if (!SetSocketFlags(fd)) {
+      NS_WARNING("Cannot set socket flags!");
+      FireSocketError();
+      return;
+    }
+    SetFd(fd);
+  }
+
+  if (!mConnector->CreateAddr(false, mAddrSize, mAddr, mAddress.get())) {
+    NS_WARNING("Cannot create socket address!");
+    FireSocketError();
+    return;
+  }
+
+  // calls OnConnected() on success, or OnError() otherwise
+  nsresult rv = UnixSocketWatcher::Connect(
+    reinterpret_cast<struct sockaddr*>(&mAddr), mAddrSize);
+  NS_WARN_IF(NS_FAILED(rv));
+}
+
+void
+StreamSocketIO::Send(UnixSocketRawData* aData)
+{
+  EnqueueData(aData);
+  AddWatchers(WRITE_WATCHER, false);
+}
+
+void
+StreamSocketIO::OnAccepted(int aFd,
+                           const sockaddr_any* aAddr,
+                           socklen_t aAddrLen)
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+  MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
+  MOZ_ASSERT(aAddr);
+  MOZ_ASSERT(aAddrLen <= static_cast<socklen_t>(sizeof(mAddr)));
+
+  memcpy (&mAddr, aAddr, aAddrLen);
+  mAddrSize = aAddrLen;
+
+  if (!mConnector->SetUp(aFd)) {
+    NS_WARNING("Could not set up socket!");
+    return;
+  }
+
+  RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
+  Close();
+  if (!SetSocketFlags(aFd)) {
+    return;
+  }
+  SetSocket(aFd, SOCKET_IS_CONNECTED);
+
+  nsRefPtr<nsRunnable> r =
+    new SocketIOEventRunnable<StreamSocketIO>(
+      this, SocketIOEventRunnable<StreamSocketIO>::CONNECT_SUCCESS);
+  NS_DispatchToMainThread(r);
+
+  AddWatchers(READ_WATCHER, true);
+  if (HasPendingData()) {
+    AddWatchers(WRITE_WATCHER, false);
+  }
+}
+
+void
+StreamSocketIO::OnConnected()
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+  MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
+
+  if (!SetSocketFlags(GetFd())) {
+    NS_WARNING("Cannot set socket flags!");
+    FireSocketError();
+    return;
+  }
+
+  if (!mConnector->SetUp(GetFd())) {
+    NS_WARNING("Could not set up socket!");
+    FireSocketError();
+    return;
+  }
+
+  nsRefPtr<nsRunnable> r =
+    new SocketIOEventRunnable<StreamSocketIO>(
+      this, SocketIOEventRunnable<StreamSocketIO>::CONNECT_SUCCESS);
+  NS_DispatchToMainThread(r);
+
+  AddWatchers(READ_WATCHER, true);
+  if (HasPendingData()) {
+    AddWatchers(WRITE_WATCHER, false);
+  }
+}
+
+void
+StreamSocketIO::OnListening()
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+
+  NS_NOTREACHED("Invalid call to |StreamSocketIO::OnListening|");
+}
+
+void
+StreamSocketIO::OnError(const char* aFunction, int aErrno)
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+
+  UnixFdWatcher::OnError(aFunction, aErrno);
+  FireSocketError();
+}
+
+void
+StreamSocketIO::OnSocketCanReceiveWithoutBlocking()
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+  MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
+
+  ssize_t res = ReceiveData(GetFd(), this);
+  if (res < 0) {
+    /* I/O error */
+    RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
+  } else if (!res) {
+    /* EOF or peer shutdown */
+    RemoveWatchers(READ_WATCHER);
+  }
+}
+
+void
+StreamSocketIO::OnSocketCanSendWithoutBlocking()
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+  MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
+
+  nsresult rv = SendPendingData(GetFd(), this);
+  if (NS_FAILED(rv)) {
+    return;
+  }
+
+  if (HasPendingData()) {
+    AddWatchers(WRITE_WATCHER, false);
+  }
+}
+
+void
+StreamSocketIO::FireSocketError()
+{
+  MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
+
+  // Clean up watchers, statuses, fds
+  Close();
+
+  // Tell the main thread we've errored
+  nsRefPtr<nsRunnable> r =
+    new SocketIOEventRunnable<StreamSocketIO>(
+      this, SocketIOEventRunnable<StreamSocketIO>::CONNECT_ERROR);
+
+  NS_DispatchToMainThread(r);
+}
+
+bool
+StreamSocketIO::SetSocketFlags(int aFd)
+{
+  static const int reuseaddr = 1;
+
+  // Set socket addr to be reused even if kernel is still waiting to close
+  int res = setsockopt(aFd, SOL_SOCKET, SO_REUSEADDR,
+                       &reuseaddr, sizeof(reuseaddr));
+  if (res < 0) {
+    return false;
+  }
+
+  // Set close-on-exec bit.
+  int flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFD));
+  if (-1 == flags) {
+    return false;
+  }
+  flags |= FD_CLOEXEC;
+  if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFD, flags))) {
+    return false;
+  }
+
+  // Set non-blocking status flag.
+  flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFL));
+  if (-1 == flags) {
+    return false;
+  }
+  flags |= O_NONBLOCK;
+  if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFL, flags))) {
+    return false;
+  }
+
+  return true;
+}
+
+//
+// Socket tasks
+//
+
+class StreamSocketIO::ConnectTask MOZ_FINAL
+  : public SocketIOTask<StreamSocketIO>
+{
+public:
+  ConnectTask(StreamSocketIO* aIO)
+  : SocketIOTask<StreamSocketIO>(aIO)
+  { }
+
+  void Run() MOZ_OVERRIDE
+  {
+    MOZ_ASSERT(!NS_IsMainThread());
+    MOZ_ASSERT(!IsCanceled());
+
+    GetIO()->Connect();
+  }
+};
+
+class StreamSocketIO::DelayedConnectTask MOZ_FINAL
+  : public SocketIOTask<StreamSocketIO>
+{
+public:
+  DelayedConnectTask(StreamSocketIO* aIO)
+  : SocketIOTask<StreamSocketIO>(aIO)
+  { }
+
+  void Run() MOZ_OVERRIDE
+  {
+    MOZ_ASSERT(NS_IsMainThread());
+
+    if (IsCanceled()) {
+      return;
+    }
+
+    StreamSocketIO* io = GetIO();
+    if (io->IsShutdownOnMainThread()) {
+      return;
+    }
+
+    io->ClearDelayedConnectTask();
+    XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new ConnectTask(io));
+  }
+};
+
+//
+// StreamSocket
+//
+
+StreamSocket::StreamSocket()
+: mIO(nullptr)
+{ }
+
+StreamSocket::~StreamSocket()
+{
+  MOZ_ASSERT(!mIO);
+}
+
+bool
+StreamSocket::SendSocketData(UnixSocketRawData* aData)
+{
+  MOZ_ASSERT(NS_IsMainThread());
+  if (!mIO) {
+    return false;
+  }
+
+  MOZ_ASSERT(!mIO->IsShutdownOnMainThread());
+  XRE_GetIOMessageLoop()->PostTask(
+    FROM_HERE,
+    new SocketIOSendTask<StreamSocketIO, UnixSocketRawData>(mIO, aData));
+
+  return true;
+}
+
+bool
+StreamSocket::SendSocketData(const nsACString& aStr)
+{
+  if (aStr.Length() > MAX_READ_SIZE) {
+    return false;
+  }
+
+  nsAutoPtr<UnixSocketRawData> data(
+    new UnixSocketRawData(aStr.BeginReading(), aStr.Length()));
+
+  if (!SendSocketData(data)) {
+    return false;
+  }
+
+  unused << data.forget();
+
+  return true;
+}
+
+void
+StreamSocket::Close()
+{
+  MOZ_ASSERT(NS_IsMainThread());
+
+  if (!mIO) {
+    return;
+  }
+
+  mIO->CancelDelayedConnectTask();
+
+  // From this point on, we consider mIO as being deleted.
+  // We sever the relationship here so any future calls to listen or connect
+  // will create a new implementation.
+  mIO->ShutdownOnMainThread();
+
+  XRE_GetIOMessageLoop()->PostTask(
+    FROM_HERE, new SocketIOShutdownTask<StreamSocketIO>(mIO));
+
+  mIO = nullptr;
+
+  NotifyDisconnect();
+}
+
+void
+StreamSocket::GetSocketAddr(nsAString& aAddrStr)
+{
+  aAddrStr.Truncate();
+  if (!mIO || GetConnectionStatus() != SOCKET_CONNECTED) {
+    NS_WARNING("No socket currently open!");
+    return;
+  }
+  mIO->GetSocketAddr(aAddrStr);
+}
+
+bool
+StreamSocket::Connect(UnixSocketConnector* aConnector,
+                      const char* aAddress,
+                      int aDelayMs)
+{
+  MOZ_ASSERT(aConnector);
+  MOZ_ASSERT(NS_IsMainThread());
+
+  nsAutoPtr<UnixSocketConnector> connector(aConnector);
+
+  if (mIO) {
+    NS_WARNING("Socket already connecting/connected!");
+    return false;
+  }
+
+  nsCString addr(aAddress);
+  MessageLoop* ioLoop = XRE_GetIOMessageLoop();
+  mIO = new StreamSocketIO(ioLoop, this, connector.forget(), addr);
+  SetConnectionStatus(SOCKET_CONNECTING);
+  if (aDelayMs > 0) {
+    StreamSocketIO::DelayedConnectTask* connectTask =
+      new StreamSocketIO::DelayedConnectTask(mIO);
+    mIO->SetDelayedConnectTask(connectTask);
+    MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs);
+  } else {
+    ioLoop->PostTask(FROM_HERE, new StreamSocketIO::ConnectTask(mIO));
+  }
+
+  return true;
+}
+
+ConnectionOrientedSocketIO*
+StreamSocket::PrepareAccept(UnixSocketConnector* aConnector)
+{
+  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(!mIO);
+  MOZ_ASSERT(aConnector);
+
+  nsAutoPtr<UnixSocketConnector> connector(aConnector);
+
+  SetConnectionStatus(SOCKET_CONNECTING);
+
+  mIO = new StreamSocketIO(XRE_GetIOMessageLoop(),
+                           -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
+                           this, connector.forget(), EmptyCString());
+  return mIO;
+}
+
+} // namespace ipc
+} // namespace mozilla
new file mode 100644
--- /dev/null
+++ b/ipc/unixsocket/StreamSocket.h
@@ -0,0 +1,93 @@
+/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */
+/* vim: set ts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#ifndef mozilla_ipc_streamsocket_h
+#define mozilla_ipc_streamsocket_h
+
+#include "mozilla/ipc/SocketBase.h"
+#include "ConnectionOrientedSocket.h"
+
+namespace mozilla {
+namespace ipc {
+
+class StreamSocketIO;
+class UnixSocketConnector;
+
+class StreamSocket : public SocketConsumerBase
+                   , public ConnectionOrientedSocket
+{
+public:
+  StreamSocket();
+
+  /**
+   * Queue data to be sent to the socket on the IO thread. Can only be called on
+   * originating thread.
+   *
+   * @param aMessage Data to be sent to socket
+   *
+   * @return true if data is queued, false otherwise (i.e. not connected)
+   */
+  bool SendSocketData(UnixSocketRawData* aMessage);
+
+  /**
+   * Convenience function for sending strings to the socket (common in bluetooth
+   * profile usage). Converts to a UnixSocketRawData struct. Can only be called
+   * on originating thread.
+   *
+   * @param aMessage String to be sent to socket
+   *
+   * @return true if data is queued, false otherwise (i.e. not connected)
+   */
+  bool SendSocketData(const nsACString& aMessage);
+
+  /**
+   * Starts a task on the socket that will try to connect to a socket in a
+   * non-blocking manner.
+   *
+   * @param aConnector Connector object for socket type specific functions
+   * @param aAddress Address to connect to.
+   * @param aDelayMs Time delay in milli-seconds.
+   *
+   * @return true on connect task started, false otherwise.
+   */
+  bool Connect(UnixSocketConnector* aConnector,
+               const char* aAddress,
+               int aDelayMs = 0);
+
+  /**
+   * Queues the internal representation of socket for deletion. Can be called
+   * from main thread.
+   */
+  void Close();
+
+  /**
+   * Get the current sockaddr for the socket
+   */
+  void GetSocketAddr(nsAString& aAddrStr);
+
+protected:
+  virtual ~StreamSocket();
+
+  // Prepares an instance of |StreamSocket| in DISCONNECTED state
+  // for accepting a connection. Subclasses implementing |GetIO|
+  // need to call this method.
+  ConnectionOrientedSocketIO* PrepareAccept(UnixSocketConnector* aConnector);
+
+private:
+
+  // Legacy interface from |SocketBase|; should be replaced by |Close|.
+  void CloseSocket() MOZ_OVERRIDE
+  {
+    Close();
+  }
+
+  StreamSocketIO* mIO;
+};
+
+} // namespace ipc
+} // namepsace mozilla
+
+#endif // mozilla_ipc_streamsocket_h
--- a/ipc/unixsocket/moz.build
+++ b/ipc/unixsocket/moz.build
@@ -3,24 +3,26 @@
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 EXPORTS.mozilla.ipc += [
     'ConnectionOrientedSocket.h',
     'ListenSocket.h',
     'SocketBase.h',
+    'StreamSocket.h',
     'UnixSocket.h',
     'UnixSocketConnector.h'
 ]
 
 SOURCES += [
     'ConnectionOrientedSocket.cpp',
     'ListenSocket.cpp',
     'SocketBase.cpp',
+    'StreamSocket.cpp',
     'UnixSocket.cpp',
     'UnixSocketConnector.cpp'
 ]
 
 FAIL_ON_WARNINGS = True
 
 include('/ipc/chromium/chromium-config.mozbuild')