--- a/ipc/unixsocket/UnixSocket.cpp
+++ b/ipc/unixsocket/UnixSocket.cpp
@@ -49,16 +49,17 @@ public:
, mConnector(aConnector)
, mCurrentTaskIsCanceled(false)
, mAddress(aAddress)
{
}
~UnixSocketImpl()
{
+ StopTask();
mReadWatcher.StopWatchingFileDescriptor();
mWriteWatcher.StopWatchingFileDescriptor();
}
void QueueWriteData(UnixSocketRawData* aData)
{
mOutgoingQ.AppendElement(aData);
OnFileCanWriteWithoutBlocking(mFd);
@@ -111,22 +112,16 @@ public:
mIOLoop = MessageLoopForIO::current();
mIOLoop->WatchFileDescriptor(mFd,
true,
MessageLoopForIO::WATCH_READ,
&mReadWatcher,
this);
}
- void PrepareRemoval()
- {
- mTask = nullptr;
- mCurrentTaskIsCanceled = true;
- }
-
/**
* Connect to a socket
*/
void Connect();
/**
* Run bind/listen to prepare for further runs of accept()
*/
@@ -135,17 +130,24 @@ public:
/**
* Accept an incoming connection
*/
void Accept();
/**
* Stop whatever connect/accept task is running
*/
- void Stop();
+ void StopTask()
+ {
+ if (mTask) {
+ mTask->Cancel();
+ mTask = nullptr;
+ }
+ mCurrentTaskIsCanceled = true;
+ }
/**
* Set up nonblocking flags on whatever our current file descriptor is.
*
* @return true if successful, false otherwise
*/
bool SetNonblockFlags();
@@ -229,16 +231,53 @@ private:
static void
DestroyImpl(UnixSocketImpl* impl)
{
MOZ_ASSERT(impl);
delete impl;
}
+class OnSocketEventTask : public nsRunnable
+{
+public:
+ enum SocketEvent {
+ CONNECT_SUCCESS,
+ CONNECT_ERROR,
+ DISCONNECT
+ };
+
+ OnSocketEventTask(UnixSocketImpl* aImpl, SocketEvent e) :
+ mImpl(aImpl),
+ mEvent(e)
+ {
+ MOZ_ASSERT(aImpl);
+ }
+
+ NS_IMETHOD Run()
+ {
+ MOZ_ASSERT(NS_IsMainThread());
+ if (!mImpl->mConsumer) {
+ NS_WARNING("CloseSocket has already been called! (mConsumer is null)");
+ // Since we've already explicitly closed and the close happened before
+ // this, this isn't really an error. Since we've warned, return OK.
+ return NS_OK;
+ }
+ if (mEvent == CONNECT_SUCCESS) {
+ mImpl->mConsumer->NotifySuccess();
+ } else if (mEvent == CONNECT_ERROR) {
+ mImpl->mConsumer->NotifyError();
+ }
+ return NS_OK;
+ }
+private:
+ UnixSocketImpl* mImpl;
+ SocketEvent mEvent;
+};
+
class SocketReceiveTask : public nsRunnable
{
public:
SocketReceiveTask(UnixSocketImpl* aImpl, UnixSocketRawData* aData) :
mImpl(aImpl),
mRawData(aData)
{
MOZ_ASSERT(aImpl);
@@ -270,17 +309,18 @@ public:
mImpl(aImpl),
mData(aData)
{
MOZ_ASSERT(aConsumer);
MOZ_ASSERT(aImpl);
MOZ_ASSERT(aData);
}
- void Run()
+ void
+ Run()
{
mImpl->QueueWriteData(mData);
}
private:
nsRefPtr<UnixSocketConsumer> mConsumer;
UnixSocketImpl* mImpl;
UnixSocketRawData* mData;
@@ -289,17 +329,18 @@ private:
class StartImplReadingTask : public Task
{
public:
StartImplReadingTask(UnixSocketImpl* aImpl)
: mImpl(aImpl)
{
}
- void Run()
+ void
+ Run()
{
mImpl->SetUpIO();
}
private:
UnixSocketImpl* mImpl;
};
class SocketAcceptTask : public CancelableTask {
@@ -339,16 +380,21 @@ void SocketConnectTask::Run() {
}
void
UnixSocketImpl::Accept()
{
socklen_t addr_sz;
struct sockaddr addr;
+ if (!mConnector) {
+ NS_WARNING("No connector object available!");
+ return;
+ }
+
// This will set things we don't particularly care about, but it will hand
// back the correct structure size which is what we do care about.
mConnector->CreateAddr(true, addr_sz, &addr, nullptr);
if(mFd.get() < 0)
{
mFd = mConnector->Create();
if (mFd.get() < 0) {
@@ -373,34 +419,33 @@ UnixSocketImpl::Accept()
return;
}
}
int client_fd;
client_fd = accept(mFd.get(), &addr, &addr_sz);
if (client_fd < 0) {
-#if DEBUG
- LOG("Socket accept errno=%d\n", errno);
-#endif
EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
return;
}
- if(client_fd < 0)
- {
- EnqueueTask(SOCKET_RETRY_TIME_MS, new SocketAcceptTask(this));
- return;
- }
-
- if (!mConnector->Setup(client_fd)) {
+ if (!mConnector->SetUp(client_fd)) {
NS_WARNING("Could not set up socket!");
return;
}
mFd.reset(client_fd);
+
+ nsRefPtr<OnSocketEventTask> t =
+ new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
+ NS_DispatchToMainThread(t);
+
+ // Due to the fact that we've dispatched our OnConnectSuccess message before
+ // starting reading, we're guaranteed that any subsequent read tasks will
+ // happen after the object has been notified of a successful connect.
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new StartImplReadingTask(this));
}
void
UnixSocketImpl::Connect()
{
if(mFd.get() < 0)
@@ -419,24 +464,34 @@ UnixSocketImpl::Connect()
ret = connect(mFd.get(), &addr, addr_sz);
if (ret) {
#if DEBUG
LOG("Socket connect errno=%d\n", errno);
#endif
mFd.reset(-1);
+ nsRefPtr<OnSocketEventTask> t =
+ new OnSocketEventTask(this, OnSocketEventTask::CONNECT_ERROR);
+ NS_DispatchToMainThread(t);
return;
}
- if (!mConnector->Setup(mFd)) {
+ if (!mConnector->SetUp(mFd)) {
NS_WARNING("Could not set up socket!");
return;
}
+ nsRefPtr<OnSocketEventTask> t =
+ new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
+ NS_DispatchToMainThread(t);
+
+ // Due to the fact that we've dispatched our OnConnectSuccess message before
+ // starting reading, we're guaranteed that any subsequent read tasks will
+ // happen after the object has been notified of a successful connect.
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new StartImplReadingTask(this));
}
bool
UnixSocketImpl::SetNonblockFlags()
{
// Set socket addr to be reused even if kernel is still waiting to close
@@ -457,34 +512,41 @@ UnixSocketImpl::SetNonblockFlags()
// Select non-blocking IO.
if (-1 == fcntl(mFd, F_SETFL, O_NONBLOCK)) {
return false;
}
return true;
}
+UnixSocketConsumer::UnixSocketConsumer() : mImpl(nullptr)
+ , mConnectionStatus(SOCKET_DISCONNECTED)
+{
+}
+
UnixSocketConsumer::~UnixSocketConsumer()
{
}
bool
UnixSocketConsumer::SendSocketData(UnixSocketRawData* aData)
{
+ MOZ_ASSERT(NS_IsMainThread());
if (!mImpl) {
return false;
}
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new SocketSendTask(this, mImpl, aData));
return true;
}
bool
UnixSocketConsumer::SendSocketData(const nsACString& aStr)
{
+ MOZ_ASSERT(NS_IsMainThread());
if (!mImpl) {
return false;
}
if (aStr.Length() > UnixSocketRawData::MAX_DATA_SIZE) {
return false;
}
nsCString str(aStr);
UnixSocketRawData* d = new UnixSocketRawData(aStr.Length());
@@ -496,18 +558,19 @@ UnixSocketConsumer::SendSocketData(const
void
UnixSocketConsumer::CloseSocket()
{
if (!mImpl) {
return;
}
UnixSocketImpl* impl = mImpl;
- mImpl->mConsumer.forget();
mImpl = nullptr;
+ impl->mConsumer.forget();
+ impl->StopTask();
// To make sure the owner doesn't die on the IOThread, remove pointer here
// Line it up to be destructed on the IO Thread
// Kill our pointer to it
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
NewRunnableFunction(DestroyImpl,
impl));
}
@@ -533,20 +596,17 @@ UnixSocketImpl::OnFileCanReadWithoutBloc
}
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
mIncoming.forget();
return; // no data available: return and re-poll
}
// else fall through to error handling on other errno's
}
#ifdef DEBUG
- nsAutoString str;
- str.AssignLiteral("Cannot read from network, error ");
- str += (int)ret;
- NS_WARNING(NS_ConvertUTF16toUTF8(str).get());
+ NS_WARNING("Cannot read from network");
#endif
// At this point, assume that we can't actually access
// the socket anymore
mIncoming.forget();
mReadWatcher.StopWatchingFileDescriptor();
mWriteWatcher.StopWatchingFileDescriptor();
mConsumer->CloseSocket();
return;
@@ -603,53 +663,71 @@ UnixSocketImpl::OnFileCanWriteWithoutBlo
this);
return;
}
mOutgoingQ.RemoveElementAt(0);
delete data;
}
}
+void
+UnixSocketConsumer::NotifySuccess()
+{
+ MOZ_ASSERT(NS_IsMainThread());
+ mConnectionStatus = SOCKET_CONNECTED;
+ OnConnectSuccess();
+}
+
+void
+UnixSocketConsumer::NotifyError()
+{
+ MOZ_ASSERT(NS_IsMainThread());
+ mConnectionStatus = SOCKET_DISCONNECTED;
+ OnConnectError();
+}
bool
UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector,
const char* aAddress)
{
- MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(aConnector);
+ MOZ_ASSERT(NS_IsMainThread());
if (mImpl) {
NS_WARNING("Socket already connecting/connected!");
return false;
}
nsCString addr;
addr.Assign(aAddress);
mImpl = new UnixSocketImpl(this, aConnector, addr);
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new SocketConnectTask(mImpl));
+ mConnectionStatus = SOCKET_CONNECTING;
return true;
}
bool
UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector)
{
- MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(aConnector);
+ MOZ_ASSERT(NS_IsMainThread());
if (mImpl) {
NS_WARNING("Socket already connecting/connected!");
return false;
}
nsCString addr;
mImpl = new UnixSocketImpl(this, aConnector, addr);
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new SocketAcceptTask(mImpl));
+ mConnectionStatus = SOCKET_CONNECTING;
return true;
}
void
UnixSocketConsumer::CancelSocketTask()
{
+ mConnectionStatus = SOCKET_DISCONNECTED;
if(!mImpl) {
NS_WARNING("No socket implementation to cancel task on!");
return;
}
mImpl->CancelTask();
}
} // namespace ipc
--- a/ipc/unixsocket/UnixSocket.h
+++ b/ipc/unixsocket/UnixSocket.h
@@ -95,28 +95,37 @@ public:
/**
* Does any socket type specific setup that may be needed
*
* @param aFd File descriptor for opened socket
*
* @return true is successful, false otherwise
*/
- virtual bool Setup(int aFd) = 0;
+ virtual bool SetUp(int aFd) = 0;
+};
+
+enum SocketConnectionStatus {
+ SOCKET_DISCONNECTED = 0,
+ SOCKET_CONNECTING = 1,
+ SOCKET_CONNECTED = 2
};
class UnixSocketConsumer : public RefCounted<UnixSocketConsumer>
{
public:
- UnixSocketConsumer()
- : mImpl(nullptr)
- {}
+ UnixSocketConsumer();
virtual ~UnixSocketConsumer();
+ SocketConnectionStatus GetConnectionStatus()
+ {
+ return mConnectionStatus;
+ }
+
/**
* Function to be called whenever data is received. This is only called on the
* main thread.
*
* @param aMessage Data received from the socket.
*/
virtual void ReceiveSocketData(UnixSocketRawData* aMessage) = 0;
@@ -167,16 +176,40 @@ public:
* from main thread.
*/
void CloseSocket();
/**
* Cancels connect/accept task loop, if one is currently running.
*/
void CancelSocketTask();
+
+ /**
+ * Callback for socket connect/accept success. Called after connect/accept has
+ * finished. Will be run on main thread, before any reads take place.
+ */
+ virtual void OnConnectSuccess() = 0;
+
+ /**
+ * Callback for socket connect/accept error. Will be run on main thread.
+ */
+ virtual void OnConnectError() = 0;
+
+
+ /**
+ * Called by implementation to notify consumer of success.
+ */
+ void NotifySuccess();
+
+ /**
+ * Called by implementation to notify consumer of error.
+ */
+ void NotifyError();
+
private:
UnixSocketImpl* mImpl;
+ SocketConnectionStatus mConnectionStatus;
};
} // namespace ipc
} // namepsace mozilla
#endif // mozilla_ipc_Socket_h