author | Thomas Zimmermann <tdz@users.sourceforge.net> |
Tue, 02 Jun 2015 10:01:57 +0200 | |
changeset 246783 | 075e96a57701c252935f7b8d9ecb517da459073f |
parent 246782 | ef212da7547858d3bc2c9a27ffcf75bdd7968669 |
child 246784 | fd7c97d41cf392d9e8229e0031c5742d8d586f2b |
push id | 28840 |
push user | kwierso@gmail.com |
push date | Wed, 03 Jun 2015 01:34:22 +0000 |
treeherder | mozilla-central@b0a507af2b4a [default view] [failures only] |
perfherder | [talos] [build metrics] [platform microbench] (compared to previous push) |
reviewers | kmachulis |
bugs | 1168806 |
milestone | 41.0a1 |
first release with | nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
|
last release without | nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
|
--- a/dom/bluetooth/bluedroid/BluetoothSocket.cpp +++ b/dom/bluetooth/bluedroid/BluetoothSocket.cpp @@ -69,26 +69,29 @@ public: */ enum ConnectionStatus { SOCKET_IS_DISCONNECTED = 0, SOCKET_IS_LISTENING, SOCKET_IS_CONNECTING, SOCKET_IS_CONNECTED }; - DroidSocketImpl(MessageLoop* aIOLoop, BluetoothSocket* aConsumer) + DroidSocketImpl(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, + BluetoothSocket* aConsumer) : ipc::UnixFdWatcher(aIOLoop) + , DataSocketIO(aConsumerThread) , mConsumer(aConsumer) , mShuttingDownOnIOThread(false) , mConnectionStatus(SOCKET_IS_DISCONNECTED) { } ~DroidSocketImpl() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); } void Send(UnixSocketIOBuffer* aBuffer) { EnqueueData(aBuffer); AddWatchers(WRITE_WATCHER, false); } @@ -137,35 +140,36 @@ public: SocketBase* GetSocketBase() override { return GetDataSocket(); } bool IsShutdownOnMainThread() const override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); + return mConsumer == nullptr; } bool IsShutdownOnIOThread() const override { return mShuttingDownOnIOThread; } void ShutdownOnMainThread() override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(!IsShutdownOnMainThread()); mConsumer = nullptr; } void ShutdownOnIOThread() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); Close(); // will also remove fd from I/O loop mShuttingDownOnIOThread = true; } private: class ReceiveRunnable; @@ -209,17 +213,17 @@ class SocketConnectTask final : public S public: SocketConnectTask(DroidSocketImpl* aDroidSocketImpl, int aFd) : SocketIOTask<DroidSocketImpl>(aDroidSocketImpl) , mFd(aFd) { } void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); MOZ_ASSERT(!IsCanceled()); GetIO()->Connect(mFd); } private: int mFd; }; @@ -229,17 +233,17 @@ class SocketListenTask final : public So public: SocketListenTask(DroidSocketImpl* aDroidSocketImpl, int aFd) : SocketIOTask<DroidSocketImpl>(aDroidSocketImpl) , mFd(aFd) { } void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); if (!IsCanceled()) { GetIO()->Listen(mFd); } } private: int mFd; @@ -249,17 +253,17 @@ class SocketConnectClientFdTask final : public SocketIOTask<DroidSocketImpl> { SocketConnectClientFdTask(DroidSocketImpl* aImpl) : SocketIOTask<DroidSocketImpl>(aImpl) { } void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); GetIO()->ConnectClientFd(); } }; void DroidSocketImpl::Connect(int aFd) { @@ -309,18 +313,19 @@ DroidSocketImpl::Accept(int aFd) if (!(flags & O_NONBLOCK)) { int res = TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFL, flags | O_NONBLOCK)); NS_ENSURE_TRUE_VOID(!res); } SetFd(aFd); mConnectionStatus = SOCKET_IS_CONNECTED; - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { AddWatchers(WRITE_WATCHER, false); } } void @@ -333,17 +338,17 @@ DroidSocketImpl::OnFileCanReadWithoutBlo } else { NS_NOTREACHED("invalid connection state for reading"); } } void DroidSocketImpl::OnSocketCanReceiveWithoutBlocking(int aFd) { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); ssize_t res = ReceiveData(aFd); if (res < 0) { /* I/O error */ RemoveWatchers(READ_WATCHER|WRITE_WATCHER); } else if (!res) { /* EOF or peer shutdown */ @@ -356,17 +361,17 @@ class AcceptTask final : public SocketIO public: AcceptTask(DroidSocketImpl* aDroidSocketImpl, int aFd) : SocketIOTask<DroidSocketImpl>(aDroidSocketImpl) , mFd(aFd) { } void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); MOZ_ASSERT(!IsCanceled()); GetIO()->Accept(mFd); } private: int mFd; }; @@ -378,17 +383,17 @@ public: : mImpl(aImpl) { MOZ_ASSERT(mImpl); } void Accept(int aFd, const nsAString& aBdAddress, int aConnectionStatus) override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(mImpl->IsConsumerThread()); mozilla::ScopedClose fd(aFd); // Close received socket fd on error if (mImpl->IsShutdownOnMainThread()) { BT_LOGD("mConsumer is null, aborting receive!"); return; } @@ -399,17 +404,17 @@ public: mImpl->mConsumer->SetAddress(aBdAddress); mImpl->GetIOLoop()->PostTask(FROM_HERE, new AcceptTask(mImpl, fd.forget())); } void OnError(BluetoothStatus aStatus) override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(mImpl->IsConsumerThread()); BT_LOGR("BluetoothSocketInterface::Accept failed: %d", (int)aStatus); if (!mImpl->IsShutdownOnMainThread()) { // Instead of NotifyError(), call NotifyDisconnect() to trigger // BluetoothOppManager::OnSocketDisconnect() as // DroidSocketImpl::OnFileCanReadWithoutBlocking() in Firefox OS 2.0 in // order to keep the same behavior and reduce regression risk. mImpl->mConsumer->NotifyDisconnect(); @@ -425,17 +430,17 @@ class AcceptRunnable final : public Sock public: AcceptRunnable(DroidSocketImpl* aImpl, int aFd) : SocketIORunnable<DroidSocketImpl>(aImpl) , mFd(aFd) { } NS_IMETHOD Run() override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(GetIO()->IsConsumerThread()); MOZ_ASSERT(sBluetoothSocketInterface); BluetoothSocketResultHandler* res = new AcceptResultHandler(GetIO()); GetIO()->mConsumer->SetCurrentResultHandler(res); sBluetoothSocketInterface->Accept(mFd, res); return NS_OK; @@ -443,26 +448,26 @@ public: private: int mFd; }; void DroidSocketImpl::OnSocketCanAcceptWithoutBlocking(int aFd) { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); /* When a listening socket is ready for receiving data, * we can call |Accept| on it. */ RemoveWatchers(READ_WATCHER); - nsRefPtr<AcceptRunnable> t = new AcceptRunnable(this, aFd); - NS_DispatchToMainThread(t); + GetConsumerThread()->Dispatch(new AcceptRunnable(this, aFd), + NS_DISPATCH_NORMAL); } void DroidSocketImpl::OnFileCanWriteWithoutBlocking(int aFd) { if (mConnectionStatus == SOCKET_IS_CONNECTED) { OnSocketCanSendWithoutBlocking(aFd); } else if (mConnectionStatus == SOCKET_IS_CONNECTING) { @@ -470,44 +475,45 @@ DroidSocketImpl::OnFileCanWriteWithoutBl } else { NS_NOTREACHED("invalid connection state for writing"); } } void DroidSocketImpl::OnSocketCanSendWithoutBlocking(int aFd) { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); MOZ_ASSERT(aFd >= 0); nsresult rv = SendPendingData(aFd); if (NS_FAILED(rv)) { return; } if (HasPendingData()) { AddWatchers(WRITE_WATCHER, false); } } void DroidSocketImpl::OnSocketCanConnectWithoutBlocking(int aFd) { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); /* We follow Posix behaviour here: Connect operations are * complete once we can write to the connecting socket. */ mConnectionStatus = SOCKET_IS_CONNECTED; - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { AddWatchers(WRITE_WATCHER, false); } } // |DataSocketIO| @@ -536,19 +542,19 @@ class DroidSocketImpl::ReceiveRunnable f public: ReceiveRunnable(DroidSocketImpl* aIO, UnixSocketBuffer* aBuffer) : SocketIORunnable<DroidSocketImpl>(aIO) , mBuffer(aBuffer) { } NS_IMETHOD Run() override { - MOZ_ASSERT(NS_IsMainThread()); + DroidSocketImpl* io = SocketIORunnable<DroidSocketImpl>::GetIO(); - DroidSocketImpl* io = SocketIORunnable<DroidSocketImpl>::GetIO(); + MOZ_ASSERT(io->IsConsumerThread()); if (NS_WARN_IF(io->IsShutdownOnMainThread())) { // Since we've already explicitly closed and the close // happened before this, this isn't really an error. return NS_OK; } BluetoothSocket* bluetoothSocket = io->GetBluetoothSocket(); @@ -561,17 +567,18 @@ public: private: nsAutoPtr<UnixSocketBuffer> mBuffer; }; void DroidSocketImpl::ConsumeBuffer() { - NS_DispatchToMainThread(new ReceiveRunnable(this, mBuffer.forget())); + GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()), + NS_DISPATCH_NORMAL); } void DroidSocketImpl::DiscardBuffer() { // Nothing to do. } @@ -597,17 +604,17 @@ public: : mImpl(aImpl) { MOZ_ASSERT(mImpl); } void Connect(int aFd, const nsAString& aBdAddress, int aConnectionStatus) override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(mImpl->IsConsumerThread()); if (mImpl->IsShutdownOnMainThread()) { BT_LOGD("mConsumer is null, aborting send!"); return; } if (aConnectionStatus != 0) { mImpl->mConsumer->NotifyError(); @@ -616,17 +623,17 @@ public: mImpl->mConsumer->SetAddress(aBdAddress); mImpl->GetIOLoop()->PostTask(FROM_HERE, new SocketConnectTask(mImpl, aFd)); } void OnError(BluetoothStatus aStatus) override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(mImpl->IsConsumerThread()); BT_WARNING("Connect failed: %d", (int)aStatus); if (!mImpl->IsShutdownOnMainThread()) { // Instead of NotifyError(), call NotifyDisconnect() to trigger // BluetoothOppManager::OnSocketDisconnect() as // DroidSocketImpl::OnFileCanReadWithoutBlocking() in Firefox OS 2.0 in // order to keep the same behavior and reduce regression risk. mImpl->mConsumer->NotifyDisconnect(); @@ -638,24 +645,24 @@ private: }; nsresult BluetoothSocket::Connect(const nsAString& aDeviceAddress, const BluetoothUuid& aServiceUuid, BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt, + nsIThread* aConsumerThread, MessageLoop* aIOLoop) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!mImpl); SetConnectionStatus(SOCKET_CONNECTING); - mImpl = new DroidSocketImpl(aIOLoop, this); + mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this); BluetoothSocketResultHandler* res = new ConnectSocketResultHandler(mImpl); SetCurrentResultHandler(res); sBluetoothSocketInterface->Connect( aDeviceAddress, aType, aServiceUuid.mUuid, aChannel, aEncrypt, aAuth, res); @@ -665,61 +672,67 @@ BluetoothSocket::Connect(const nsAString nsresult BluetoothSocket::Connect(const nsAString& aDeviceAddress, const BluetoothUuid& aServiceUuid, BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt) { + nsIThread* consumerThread = nullptr; + nsresult rv = NS_GetCurrentThread(&consumerThread); + if (NS_FAILED(rv)) { + return rv; + } + return Connect(aDeviceAddress, aServiceUuid, aType, aChannel, aAuth, - aEncrypt, XRE_GetIOMessageLoop()); + aEncrypt, consumerThread, XRE_GetIOMessageLoop()); } class ListenResultHandler final : public BluetoothSocketResultHandler { public: ListenResultHandler(DroidSocketImpl* aImpl) : mImpl(aImpl) { MOZ_ASSERT(mImpl); } void Listen(int aFd) override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(mImpl->IsConsumerThread()); mImpl->GetIOLoop()->PostTask(FROM_HERE, new SocketListenTask(mImpl, aFd)); } void OnError(BluetoothStatus aStatus) override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(mImpl->IsConsumerThread()); BT_WARNING("Listen failed: %d", (int)aStatus); } private: DroidSocketImpl* mImpl; }; nsresult BluetoothSocket::Listen(const nsAString& aServiceName, const BluetoothUuid& aServiceUuid, BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt, + nsIThread* aConsumerThread, MessageLoop* aIOLoop) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!mImpl); SetConnectionStatus(SOCKET_LISTENING); - mImpl = new DroidSocketImpl(aIOLoop, this); + mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this); BluetoothSocketResultHandler* res = new ListenResultHandler(mImpl); SetCurrentResultHandler(res); sBluetoothSocketInterface->Listen( aType, aServiceName, aServiceUuid.mUuid, aChannel, aEncrypt, aAuth, res); @@ -729,54 +742,61 @@ BluetoothSocket::Listen(const nsAString& nsresult BluetoothSocket::Listen(const nsAString& aServiceName, const BluetoothUuid& aServiceUuid, BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt) { + nsIThread* consumerThread = nullptr; + nsresult rv = NS_GetCurrentThread(&consumerThread); + if (NS_FAILED(rv)) { + return rv; + } + return Listen(aServiceName, aServiceUuid, aType, aChannel, aAuth, aEncrypt, - XRE_GetIOMessageLoop()); + consumerThread, XRE_GetIOMessageLoop()); } void BluetoothSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); mObserver->ReceiveSocketData(this, aBuffer); } // |DataSocket| void BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mImpl); + MOZ_ASSERT(mImpl->IsConsumerThread()); MOZ_ASSERT(!mImpl->IsShutdownOnMainThread()); mImpl->GetIOLoop()->PostTask( FROM_HERE, new SocketIOSendTask<DroidSocketImpl, UnixSocketIOBuffer>(mImpl, aBuffer)); } // |SocketBase| void BluetoothSocket::Close() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(sBluetoothSocketInterface); + if (!mImpl) { return; } + MOZ_ASSERT(mImpl->IsConsumerThread()); + // Stop any watching |SocketMessageWatcher| if (mCurrentRes) { sBluetoothSocketInterface->Close(mCurrentRes); } // From this point on, we consider mImpl as being deleted. // We sever the relationship here so any future calls to listen or connect // will create a new implementation. @@ -785,32 +805,29 @@ BluetoothSocket::Close() mImpl = nullptr; NotifyDisconnect(); } void BluetoothSocket::OnConnectSuccess() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); SetCurrentResultHandler(nullptr); mObserver->OnSocketConnectSuccess(this); } void BluetoothSocket::OnConnectError() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); SetCurrentResultHandler(nullptr); mObserver->OnSocketConnectError(this); } void BluetoothSocket::OnDisconnect() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); mObserver->OnSocketDisconnect(this); }
--- a/dom/bluetooth/bluedroid/BluetoothSocket.h +++ b/dom/bluetooth/bluedroid/BluetoothSocket.h @@ -6,16 +6,17 @@ #ifndef mozilla_dom_bluetooth_BluetoothSocket_h #define mozilla_dom_bluetooth_BluetoothSocket_h #include "BluetoothCommon.h" #include "mozilla/ipc/DataSocket.h" class MessageLoop; +class nsIThread; BEGIN_BLUETOOTH_NAMESPACE class BluetoothSocketObserver; class BluetoothSocketResultHandler; class DroidSocketImpl; class BluetoothSocket final : public mozilla::ipc::DataSocket @@ -23,29 +24,31 @@ class BluetoothSocket final : public moz public: BluetoothSocket(BluetoothSocketObserver* aObserver); nsresult Connect(const nsAString& aDeviceAddress, const BluetoothUuid& aServiceUuid, BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt, + nsIThread* aConsumerThread, MessageLoop* aIOLoop); nsresult Connect(const nsAString& aDeviceAddress, const BluetoothUuid& aServiceUuid, BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt); nsresult Listen(const nsAString& aServiceName, const BluetoothUuid& aServiceUuid, BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt, + nsIThread* aConsumerThread, MessageLoop* aIOLoop); nsresult Listen(const nsAString& aServiceName, const BluetoothUuid& aServiceUuid, BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt);
--- a/dom/bluetooth/bluez/BluetoothSocket.cpp +++ b/dom/bluetooth/bluez/BluetoothSocket.cpp @@ -23,17 +23,18 @@ static const size_t MAX_READ_SIZE = 1 << // BluetoothSocketIO // class BluetoothSocket::BluetoothSocketIO final : public UnixSocketWatcher , public DataSocketIO { public: - BluetoothSocketIO(MessageLoop* mIOLoop, + BluetoothSocketIO(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, BluetoothSocket* aConsumer, UnixSocketConnector* aConnector); ~BluetoothSocketIO(); void GetSocketAddr(nsAString& aAddrStr) const; BluetoothSocket* GetBluetoothSocket(); DataSocket* GetDataSocket(); @@ -127,33 +128,35 @@ private: /** * I/O buffer for received data */ nsAutoPtr<UnixSocketRawData> mBuffer; }; BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO( - MessageLoop* mIOLoop, + nsIThread* aConsumerThread, + MessageLoop* aIOLoop, BluetoothSocket* aConsumer, UnixSocketConnector* aConnector) - : UnixSocketWatcher(mIOLoop) + : UnixSocketWatcher(aIOLoop) + , DataSocketIO(aConsumerThread) , mConsumer(aConsumer) , mConnector(aConnector) , mShuttingDownOnIOThread(false) , mAddressLength(0) , mDelayedConnectTask(nullptr) { MOZ_ASSERT(mConsumer); MOZ_ASSERT(mConnector); } BluetoothSocket::BluetoothSocketIO::~BluetoothSocketIO() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(IsShutdownOnMainThread()); } void BluetoothSocket::BluetoothSocketIO::GetSocketAddr(nsAString& aAddrStr) const { if (!mConnector) { NS_WARNING("No connector to get socket address from!"); @@ -183,33 +186,33 @@ DataSocket* BluetoothSocket::BluetoothSocketIO::GetDataSocket() { return GetBluetoothSocket(); } void BluetoothSocket::BluetoothSocketIO::SetDelayedConnectTask(CancelableTask* aTask) { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); mDelayedConnectTask = aTask; } void BluetoothSocket::BluetoothSocketIO::ClearDelayedConnectTask() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); mDelayedConnectTask = nullptr; } void BluetoothSocket::BluetoothSocketIO::CancelDelayedConnectTask() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); if (!mDelayedConnectTask) { return; } mDelayedConnectTask->Cancel(); ClearDelayedConnectTask(); } @@ -272,18 +275,19 @@ BluetoothSocket::BluetoothSocketIO::Send } void BluetoothSocket::BluetoothSocketIO::OnConnected() { MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { AddWatchers(WRITE_WATCHER, false); } } void @@ -321,18 +325,19 @@ BluetoothSocket::BluetoothSocketIO::OnSo if (NS_WARN_IF(NS_FAILED(rv))) { FireSocketError(); return; } Close(); SetSocket(fd, SOCKET_IS_CONNECTED); - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { AddWatchers(WRITE_WATCHER, false); } } void @@ -371,18 +376,19 @@ void BluetoothSocket::BluetoothSocketIO::FireSocketError() { MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); // Clean up watchers, statuses, fds Close(); // Tell the main thread we've errored - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR), + NS_DISPATCH_NORMAL); } // |DataSocketIO| nsresult BluetoothSocket::BluetoothSocketIO::QueryReceiveBuffer( UnixSocketIOBuffer** aBuffer) @@ -407,19 +413,19 @@ class BluetoothSocket::BluetoothSocketIO public: ReceiveRunnable(BluetoothSocketIO* aIO, UnixSocketBuffer* aBuffer) : SocketIORunnable<BluetoothSocketIO>(aIO) , mBuffer(aBuffer) { } NS_IMETHOD Run() override { - MOZ_ASSERT(NS_IsMainThread()); + BluetoothSocketIO* io = SocketIORunnable<BluetoothSocketIO>::GetIO(); - BluetoothSocketIO* io = SocketIORunnable<BluetoothSocketIO>::GetIO(); + MOZ_ASSERT(io->IsConsumerThread()); if (NS_WARN_IF(io->IsShutdownOnMainThread())) { // Since we've already explicitly closed and the close // happened before this, this isn't really an error. return NS_OK; } BluetoothSocket* bluetoothSocket = io->GetBluetoothSocket(); @@ -432,17 +438,18 @@ public: private: nsAutoPtr<UnixSocketBuffer> mBuffer; }; void BluetoothSocket::BluetoothSocketIO::ConsumeBuffer() { - NS_DispatchToMainThread(new ReceiveRunnable(this, mBuffer.forget())); + GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()), + NS_DISPATCH_NORMAL); } void BluetoothSocket::BluetoothSocketIO::DiscardBuffer() { // Nothing to do. } @@ -452,40 +459,40 @@ SocketBase* BluetoothSocket::BluetoothSocketIO::GetSocketBase() { return GetDataSocket(); } bool BluetoothSocket::BluetoothSocketIO::IsShutdownOnMainThread() const { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); return mConsumer == nullptr; } void BluetoothSocket::BluetoothSocketIO::ShutdownOnMainThread() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(!IsShutdownOnMainThread()); mConsumer = nullptr; } bool BluetoothSocket::BluetoothSocketIO::IsShutdownOnIOThread() const { return mShuttingDownOnIOThread; } void BluetoothSocket::BluetoothSocketIO::ShutdownOnIOThread() { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); Close(); // will also remove fd from I/O loop mShuttingDownOnIOThread = true; } // @@ -497,17 +504,17 @@ class BluetoothSocket::ListenTask final { public: ListenTask(BluetoothSocketIO* aIO) : SocketIOTask<BluetoothSocketIO>(aIO) { } void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); if (!IsCanceled()) { GetIO()->Listen(); } } }; class BluetoothSocket::ConnectTask final @@ -515,34 +522,34 @@ class BluetoothSocket::ConnectTask final { public: ConnectTask(BluetoothSocketIO* aIO) : SocketIOTask<BluetoothSocketIO>(aIO) { } void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); MOZ_ASSERT(!IsCanceled()); GetIO()->Connect(); } }; class BluetoothSocket::DelayedConnectTask final : public SocketIOTask<BluetoothSocketIO> { public: DelayedConnectTask(BluetoothSocketIO* aIO) : SocketIOTask<BluetoothSocketIO>(aIO) { } void Run() override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(GetIO()->IsConsumerThread()); if (IsCanceled()) { return; } BluetoothSocketIO* io = GetIO(); if (io->IsShutdownOnMainThread()) { return; @@ -571,17 +578,16 @@ BluetoothSocket::~BluetoothSocket() nsresult BluetoothSocket::Connect(const nsAString& aDeviceAddress, const BluetoothUuid& aServiceUuid, BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!aDeviceAddress.IsEmpty()); nsAutoPtr<BluetoothUnixSocketConnector> connector( new BluetoothUnixSocketConnector(NS_ConvertUTF16toUTF8(aDeviceAddress), aType, aChannel, aAuth, aEncrypt)); nsresult rv = Connect(connector); if (NS_FAILED(rv)) { @@ -598,18 +604,16 @@ BluetoothSocket::Connect(const nsAString nsresult BluetoothSocket::Listen(const nsAString& aServiceName, const BluetoothUuid& aServiceUuid, BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt) { - MOZ_ASSERT(NS_IsMainThread()); - nsAutoPtr<BluetoothUnixSocketConnector> connector( new BluetoothUnixSocketConnector(NS_LITERAL_CSTRING(BLUETOOTH_ADDRESS_NONE), aType, aChannel, aAuth, aEncrypt)); nsresult rv = Listen(connector); if (NS_FAILED(rv)) { nsAutoString addr; GetAddress(addr); @@ -620,17 +624,16 @@ BluetoothSocket::Listen(const nsAString& connector.forget(); return NS_OK; } void BluetoothSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); mObserver->ReceiveSocketData(this, aBuffer); } bool BluetoothSocket::SendSocketData(const nsACString& aStr) { @@ -640,24 +643,25 @@ BluetoothSocket::SendSocketData(const ns SendSocketData(new UnixSocketRawData(aStr.BeginReading(), aStr.Length())); return true; } nsresult BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector, - int aDelayMs, MessageLoop* aIOLoop) + int aDelayMs, + nsIThread* aConsumerThread, MessageLoop* aIOLoop) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(aConnector); + MOZ_ASSERT(aConsumerThread); MOZ_ASSERT(aIOLoop); MOZ_ASSERT(!mIO); - mIO = new BluetoothSocketIO(aIOLoop, this, aConnector); + mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector); SetConnectionStatus(SOCKET_CONNECTING); if (aDelayMs > 0) { DelayedConnectTask* connectTask = new DelayedConnectTask(mIO); mIO->SetDelayedConnectTask(connectTask); MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs); } else { aIOLoop->PostTask(FROM_HERE, new ConnectTask(mIO)); @@ -665,40 +669,52 @@ BluetoothSocket::Connect(BluetoothUnixSo return NS_OK; } nsresult BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector, int aDelayMs) { - return Connect(aConnector, aDelayMs, XRE_GetIOMessageLoop()); + nsIThread* consumerThread = nullptr; + nsresult rv = NS_GetCurrentThread(&consumerThread); + if (NS_FAILED(rv)) { + return rv; + } + + return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop()); } nsresult BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector, - MessageLoop* aIOLoop) + nsIThread* aConsumerThread, MessageLoop* aIOLoop) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(aConnector); + MOZ_ASSERT(aConsumerThread); MOZ_ASSERT(aIOLoop); MOZ_ASSERT(!mIO); - mIO = new BluetoothSocketIO(aIOLoop, this, aConnector); + mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector); SetConnectionStatus(SOCKET_LISTENING); aIOLoop->PostTask(FROM_HERE, new ListenTask(mIO)); return NS_OK; } nsresult BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector) { - return Listen(aConnector, XRE_GetIOMessageLoop()); + nsIThread* consumerThread = nullptr; + nsresult rv = NS_GetCurrentThread(&consumerThread); + if (NS_FAILED(rv)) { + return rv; + } + + return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop()); } void BluetoothSocket::GetAddress(nsAString& aAddrStr) { aAddrStr.Truncate(); if (!mIO || GetConnectionStatus() != SOCKET_CONNECTED) { NS_WARNING("No socket currently open!"); @@ -707,64 +723,62 @@ BluetoothSocket::GetAddress(nsAString& a mIO->GetSocketAddr(aAddrStr); } // |DataSocket| void BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mIO); + MOZ_ASSERT(mIO->IsConsumerThread()); MOZ_ASSERT(!mIO->IsShutdownOnMainThread()); mIO->GetIOLoop()->PostTask( FROM_HERE, new SocketIOSendTask<BluetoothSocketIO, UnixSocketIOBuffer>(mIO, aBuffer)); } // |SocketBase| void BluetoothSocket::Close() { - MOZ_ASSERT(NS_IsMainThread()); if (!mIO) { return; } + MOZ_ASSERT(mIO->IsConsumerThread()); + mIO->CancelDelayedConnectTask(); // From this point on, we consider mIO as being deleted. // We sever the relationship here so any future calls to listen or connect // will create a new implementation. mIO->ShutdownOnMainThread(); mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO)); mIO = nullptr; NotifyDisconnect(); } void BluetoothSocket::OnConnectSuccess() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); mObserver->OnSocketConnectSuccess(this); } void BluetoothSocket::OnConnectError() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); mObserver->OnSocketConnectError(this); } void BluetoothSocket::OnDisconnect() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); mObserver->OnSocketDisconnect(this); } END_BLUETOOTH_NAMESPACE
--- a/dom/bluetooth/bluez/BluetoothSocket.h +++ b/dom/bluetooth/bluez/BluetoothSocket.h @@ -61,21 +61,22 @@ public: bool SendSocketData(const nsACString& aMessage); /** * Starts a task on the socket that will try to connect to a socket in a * non-blocking manner. * * @param aConnector Connector object for socket type specific functions * @param aDelayMs Time delay in milli-seconds. + * @param aConsumerThread The socket's consumer thread. * @param aIOLoop The socket's I/O thread. * @return NS_OK on success, or an XPCOM error code otherwise. */ - nsresult Connect(BluetoothUnixSocketConnector* aConnector, - int aDelayMs, MessageLoop* aIOLoop); + nsresult Connect(BluetoothUnixSocketConnector* aConnector, int aDelayMs, + nsIThread* aConsumerThread, MessageLoop* aIOLoop); /** * Starts a task on the socket that will try to connect to a socket in a * non-blocking manner. * * @param aConnector Connector object for socket type specific functions * @param aDelayMs Time delay in milli-seconds. * @return NS_OK on success, or an XPCOM error code otherwise. @@ -83,21 +84,22 @@ public: nsresult Connect(BluetoothUnixSocketConnector* aConnector, int aDelayMs = 0); /** * Starts a task on the socket that will try to accept a new connection in a * non-blocking manner. * * @param aConnector Connector object for socket type specific functions + * @param aConsumerThread The socket's consumer thread. * @param aIOLoop The socket's I/O thread. * @return NS_OK on success, or an XPCOM error code otherwise. */ nsresult Listen(BluetoothUnixSocketConnector* aConnector, - MessageLoop* aIOLoop); + nsIThread* aConsumerThread, MessageLoop* aIOLoop); /** * Starts a task on the socket that will try to accept a new connection in a * non-blocking manner. * * @param aConnector Connector object for socket type specific functions * @return NS_OK on success, or an XPCOM error code otherwise. */
--- a/ipc/bluetooth/BluetoothDaemonConnection.cpp +++ b/ipc/bluetooth/BluetoothDaemonConnection.cpp @@ -201,18 +201,19 @@ BluetoothDaemonPDUConsumer::~BluetoothDa // BluetoothDaemonConnectionIO // class BluetoothDaemonConnectionIO final : public UnixSocketWatcher , public ConnectionOrientedSocketIO { public: - BluetoothDaemonConnectionIO(MessageLoop* aIOLoop, int aFd, - ConnectionStatus aConnectionStatus, + BluetoothDaemonConnectionIO(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, + int aFd, ConnectionStatus aConnectionStatus, BluetoothDaemonConnection* aConnection, BluetoothDaemonPDUConsumer* aConsumer); // Task callback methods // void Send(UnixSocketIOBuffer* aBuffer); @@ -250,24 +251,27 @@ public: private: BluetoothDaemonConnection* mConnection; BluetoothDaemonPDUConsumer* mConsumer; nsAutoPtr<BluetoothDaemonPDU> mPDU; bool mShuttingDownOnIOThread; }; BluetoothDaemonConnectionIO::BluetoothDaemonConnectionIO( - MessageLoop* aIOLoop, int aFd, + nsIThread* aConsumerThread, + MessageLoop* aIOLoop, + int aFd, ConnectionStatus aConnectionStatus, BluetoothDaemonConnection* aConnection, BluetoothDaemonPDUConsumer* aConsumer) -: UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus) -, mConnection(aConnection) -, mConsumer(aConsumer) -, mShuttingDownOnIOThread(false) + : UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus) + , ConnectionOrientedSocketIO(aConsumerThread) + , mConnection(aConnection) + , mConsumer(aConsumer) + , mShuttingDownOnIOThread(false) { MOZ_ASSERT(mConnection); MOZ_ASSERT(mConsumer); } void BluetoothDaemonConnectionIO::Send(UnixSocketIOBuffer* aBuffer) { @@ -303,18 +307,19 @@ BluetoothDaemonConnectionIO::OnSocketCan } void BluetoothDaemonConnectionIO::OnConnected() { MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { AddWatchers(WRITE_WATCHER, false); } } void @@ -323,18 +328,19 @@ BluetoothDaemonConnectionIO::OnError(con MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); UnixFdWatcher::OnError(aFunction, aErrno); // Clean up watchers, status, fd Close(); // Tell the main thread we've errored - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR), + NS_DISPATCH_NORMAL); } // |ConnectionOrientedSocketIO| nsresult BluetoothDaemonConnectionIO::Accept(int aFd, const struct sockaddr* aAddress, socklen_t aAddressLength) @@ -394,40 +400,40 @@ SocketBase* BluetoothDaemonConnectionIO::GetSocketBase() { return mConnection; } bool BluetoothDaemonConnectionIO::IsShutdownOnMainThread() const { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); return mConnection == nullptr; } bool BluetoothDaemonConnectionIO::IsShutdownOnIOThread() const { return mShuttingDownOnIOThread; } void BluetoothDaemonConnectionIO::ShutdownOnMainThread() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(!IsShutdownOnMainThread()); mConnection = nullptr; } void BluetoothDaemonConnectionIO::ShutdownOnIOThread() { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); Close(); // will also remove fd from I/O loop mShuttingDownOnIOThread = true; } // // BluetoothDaemonConnection @@ -447,87 +453,82 @@ BluetoothDaemonConnection::BluetoothDaem BluetoothDaemonConnection::~BluetoothDaemonConnection() { } // |ConnectionOrientedSocket| nsresult BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocketIO*& aIO) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!mIO); // |BluetoothDaemonConnection| now owns the connector, but doesn't // actually use it. So the connector is stored in an auto pointer // to be deleted at the end of the method. nsAutoPtr<UnixSocketConnector> connector(aConnector); SetConnectionStatus(SOCKET_CONNECTING); mIO = new BluetoothDaemonConnectionIO( - aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING, + aConsumerThread, aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING, this, mPDUConsumer); aIO = mIO; return NS_OK; } // |DataSocket| void BluetoothDaemonConnection::SendSocketData(UnixSocketIOBuffer* aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mIO); + MOZ_ASSERT(mIO->IsConsumerThread()); mIO->GetIOLoop()->PostTask( FROM_HERE, new SocketIOSendTask<BluetoothDaemonConnectionIO, UnixSocketIOBuffer>(mIO, aBuffer)); } // |SocketBase| void BluetoothDaemonConnection::Close() { - MOZ_ASSERT(NS_IsMainThread()); - if (!mIO) { CHROMIUM_LOG("Bluetooth daemon already disconnected!"); return; } + MOZ_ASSERT(mIO->IsConsumerThread()); + + mIO->ShutdownOnMainThread(); mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO)); mIO = nullptr; NotifyDisconnect(); } void BluetoothDaemonConnection::OnConnectSuccess() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnConnectSuccess(mIndex); } void BluetoothDaemonConnection::OnConnectError() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnConnectError(mIndex); } void BluetoothDaemonConnection::OnDisconnect() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnDisconnect(mIndex); } } }
--- a/ipc/bluetooth/BluetoothDaemonConnection.h +++ b/ipc/bluetooth/BluetoothDaemonConnection.h @@ -120,16 +120,17 @@ public: BluetoothDaemonConnectionConsumer* aConsumer, int aIndex); virtual ~BluetoothDaemonConnection(); // Methods for |ConnectionOrientedSocket| // nsresult PrepareAccept(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocketIO*& aIO) override; // Methods for |DataSocket| // void SendSocketData(UnixSocketIOBuffer* aBuffer) override;
--- a/ipc/unixsocket/ConnectionOrientedSocket.cpp +++ b/ipc/unixsocket/ConnectionOrientedSocket.cpp @@ -4,16 +4,29 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "ConnectionOrientedSocket.h" namespace mozilla { namespace ipc { +// +// ConnectionOrientedSocketIO +// + +ConnectionOrientedSocketIO::ConnectionOrientedSocketIO( + nsIThread* aConsumerThread) + : DataSocketIO(aConsumerThread) +{ } + ConnectionOrientedSocketIO::~ConnectionOrientedSocketIO() { } +// +// ConnectionOrientedSocket +// + ConnectionOrientedSocket::~ConnectionOrientedSocket() { } } }
--- a/ipc/unixsocket/ConnectionOrientedSocket.h +++ b/ipc/unixsocket/ConnectionOrientedSocket.h @@ -21,37 +21,40 @@ class UnixSocketConnector; * |ConnectionOrientedSocketIO| and |ConnectionOrientedSocket| define * interfaces for implementing stream sockets on I/O and main thread. * |ListenSocket| uses these classes to handle accepted sockets. */ class ConnectionOrientedSocketIO : public DataSocketIO { public: + ConnectionOrientedSocketIO(nsIThread* aConsumerThread); virtual ~ConnectionOrientedSocketIO(); virtual nsresult Accept(int aFd, const struct sockaddr* aAddress, socklen_t aAddressLength) = 0; }; class ConnectionOrientedSocket : public DataSocket { public: /** * Prepares an instance of |ConnectionOrientedSocket| in DISCONNECTED * state for accepting a connection. Main-thread only. * * @param aConnector The new connector object, owned by the * connection-oriented socket. + * @param aConsumerThread The socket's consumer thread. * @param aIOLoop The socket's I/O thread. * @param[out] aIO, Returns an instance of |ConnectionOrientedSocketIO|. * @return NS_OK on success, or an XPCOM error code otherwise. */ virtual nsresult PrepareAccept(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocketIO*& aIO) = 0; protected: virtual ~ConnectionOrientedSocket(); }; }
--- a/ipc/unixsocket/DataSocket.cpp +++ b/ipc/unixsocket/DataSocket.cpp @@ -45,30 +45,33 @@ ssize_t DataSocketIO::ReceiveData(int aFd) { MOZ_ASSERT(aFd >= 0); UnixSocketIOBuffer* incoming; nsresult rv = QueryReceiveBuffer(&incoming); if (NS_FAILED(rv)) { /* an error occured */ - NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this)); + GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this), + NS_DISPATCH_NORMAL); return -1; } ssize_t res = incoming->Receive(aFd); if (res < 0) { /* an I/O error occured */ DiscardBuffer(); - NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this)); + GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this), + NS_DISPATCH_NORMAL); return -1; } else if (!res) { /* EOF or peer shut down sending */ DiscardBuffer(); - NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this)); + GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this), + NS_DISPATCH_NORMAL); return 0; } #ifdef MOZ_TASK_TRACER /* Make unix socket creation events to be the source events of TaskTracer, * and originate the rest correlation tasks from here. */ AutoSourceEvent taskTracerEvent(SourceEventType::Unixsocket); @@ -85,32 +88,34 @@ DataSocketIO::SendPendingData(int aFd) MOZ_ASSERT(aFd >= 0); while (HasPendingData()) { UnixSocketIOBuffer* outgoing = mOutgoingQ.ElementAt(0); ssize_t res = outgoing->Send(aFd); if (res < 0) { /* an I/O error occured */ - NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this)); + GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this), + NS_DISPATCH_NORMAL); return NS_ERROR_FAILURE; } else if (!res && outgoing->GetSize()) { /* I/O is currently blocked; try again later */ return NS_OK; } if (!outgoing->GetSize()) { mOutgoingQ.RemoveElementAt(0); delete outgoing; } } return NS_OK; } -DataSocketIO::DataSocketIO() +DataSocketIO::DataSocketIO(nsIThread* aConsumerThread) + : SocketIOBase(aConsumerThread) { } // // DataSocket // DataSocket::~DataSocket() { }
--- a/ipc/unixsocket/DataSocket.h +++ b/ipc/unixsocket/DataSocket.h @@ -85,17 +85,17 @@ public: void EnqueueData(UnixSocketIOBuffer* aBuffer); bool HasPendingData() const; ssize_t ReceiveData(int aFd); nsresult SendPendingData(int aFd); protected: - DataSocketIO(); + DataSocketIO(nsIThread* aConsumerThread); private: /** * Raw data queue. Must be pushed/popped from I/O thread only. */ nsTArray<UnixSocketIOBuffer*> mOutgoingQ; }; @@ -115,20 +115,20 @@ public: : SocketIOTask<Tio>(aIO) , mData(aData) { MOZ_ASSERT(aData); } void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); MOZ_ASSERT(!SocketIOTask<Tio>::IsCanceled()); Tio* io = SocketIOTask<Tio>::GetIO(); + MOZ_ASSERT(!io->IsConsumerThread()); MOZ_ASSERT(!io->IsShutdownOnIOThread()); io->Send(mData); } private: Tdata* mData; };
--- a/ipc/unixsocket/ListenSocket.cpp +++ b/ipc/unixsocket/ListenSocket.cpp @@ -22,17 +22,18 @@ namespace ipc { class ListenSocketIO final : public UnixSocketWatcher , public SocketIOBase { public: class ListenTask; - ListenSocketIO(MessageLoop* mIOLoop, + ListenSocketIO(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, ListenSocket* aListenSocket, UnixSocketConnector* aConnector); ~ListenSocketIO(); UnixSocketConnector* GetConnector() const; // Task callback methods // @@ -89,34 +90,35 @@ private: /** * Address structure of the socket currently in use */ struct sockaddr_storage mAddress; ConnectionOrientedSocketIO* mCOSocketIO; }; -ListenSocketIO::ListenSocketIO(MessageLoop* mIOLoop, +ListenSocketIO::ListenSocketIO(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, ListenSocket* aListenSocket, UnixSocketConnector* aConnector) - : UnixSocketWatcher(mIOLoop) - , SocketIOBase() + : UnixSocketWatcher(aIOLoop) + , SocketIOBase(aConsumerThread) , mListenSocket(aListenSocket) , mConnector(aConnector) , mShuttingDownOnIOThread(false) , mAddressLength(0) , mCOSocketIO(nullptr) { MOZ_ASSERT(mListenSocket); MOZ_ASSERT(mConnector); } ListenSocketIO::~ListenSocketIO() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(IsShutdownOnMainThread()); } UnixSocketConnector* ListenSocketIO::GetConnector() const { return mConnector; } @@ -161,18 +163,19 @@ void ListenSocketIO::OnListening() { MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); AddWatchers(READ_WATCHER, true); /* We signal a successful 'connection' to a local address for listening. */ - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); } void ListenSocketIO::OnError(const char* aFunction, int aErrno) { MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); UnixFdWatcher::OnError(aFunction, aErrno); @@ -183,18 +186,19 @@ void ListenSocketIO::FireSocketError() { MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); // Clean up watchers, statuses, fds Close(); // Tell the main thread we've errored - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR), + NS_DISPATCH_NORMAL); } void ListenSocketIO::OnSocketCanAcceptWithoutBlocking() { MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); MOZ_ASSERT(mCOSocketIO); @@ -225,40 +229,40 @@ SocketBase* ListenSocketIO::GetSocketBase() { return mListenSocket.get(); } bool ListenSocketIO::IsShutdownOnMainThread() const { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); return mListenSocket == nullptr; } bool ListenSocketIO::IsShutdownOnIOThread() const { return mShuttingDownOnIOThread; } void ListenSocketIO::ShutdownOnMainThread() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(!IsShutdownOnMainThread()); mListenSocket = nullptr; } void ListenSocketIO::ShutdownOnIOThread() { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); Close(); // will also remove fd from I/O loop mShuttingDownOnIOThread = true; } // // Socket tasks @@ -272,17 +276,17 @@ public: : SocketIOTask<ListenSocketIO>(aIO) , mCOSocketIO(aCOSocketIO) { MOZ_ASSERT(mCOSocketIO); } void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); if (!IsCanceled()) { GetIO()->Listen(mCOSocketIO); } } private: ConnectionOrientedSocketIO* mCOSocketIO; @@ -302,60 +306,66 @@ ListenSocket::ListenSocket(ListenSocketC ListenSocket::~ListenSocket() { MOZ_ASSERT(!mIO); } nsresult ListenSocket::Listen(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocket* aCOSocket) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!mIO); - mIO = new ListenSocketIO(aIOLoop, this, aConnector); + mIO = new ListenSocketIO(aConsumerThread, aIOLoop, this, aConnector); // Prepared I/O object, now start listening. nsresult rv = Listen(aCOSocket); if (NS_FAILED(rv)) { delete mIO; mIO = nullptr; return rv; } return NS_OK; } nsresult ListenSocket::Listen(UnixSocketConnector* aConnector, ConnectionOrientedSocket* aCOSocket) { - return Listen(aConnector, XRE_GetIOMessageLoop(), aCOSocket); + nsIThread* consumerThread = nullptr; + nsresult rv = NS_GetCurrentThread(&consumerThread); + if (NS_FAILED(rv)) { + return rv; + } + + return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop(), aCOSocket); } nsresult ListenSocket::Listen(ConnectionOrientedSocket* aCOSocket) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(aCOSocket); MOZ_ASSERT(mIO); // We first prepare the connection-oriented socket with a // socket connector and a socket I/O class. nsAutoPtr<UnixSocketConnector> connector; nsresult rv = mIO->GetConnector()->Duplicate(*connector.StartAssignment()); if (NS_FAILED(rv)) { return rv; } nsAutoPtr<ConnectionOrientedSocketIO> io; - rv = aCOSocket->PrepareAccept(connector, mIO->GetIOLoop(), + rv = aCOSocket->PrepareAccept(connector, + mIO->GetConsumerThread(), mIO->GetIOLoop(), *io.StartAssignment()); if (NS_FAILED(rv)) { return rv; } connector.forget(); // now owned by |io| // Then we start listening for connection requests. @@ -367,50 +377,44 @@ ListenSocket::Listen(ConnectionOrientedS return NS_OK; } // |SocketBase| void ListenSocket::Close() { - MOZ_ASSERT(NS_IsMainThread()); - if (!mIO) { return; } + MOZ_ASSERT(mIO->IsConsumerThread()); + // From this point on, we consider mIO as being deleted. We sever // the relationship here so any future calls to listen or connect // will create a new implementation. mIO->ShutdownOnMainThread(); mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO)); mIO = nullptr; NotifyDisconnect(); } void ListenSocket::OnConnectSuccess() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnConnectSuccess(mIndex); } void ListenSocket::OnConnectError() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnConnectError(mIndex); } void ListenSocket::OnDisconnect() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnDisconnect(mIndex); } } // namespace ipc } // namespace mozilla
--- a/ipc/unixsocket/ListenSocket.h +++ b/ipc/unixsocket/ListenSocket.h @@ -6,16 +6,17 @@ #ifndef mozilla_ipc_listensocket_h #define mozilla_ipc_listensocket_h #include "nsString.h" #include "mozilla/ipc/SocketBase.h" class MessageLoop; +class nsIThread; namespace mozilla { namespace ipc { class ConnectionOrientedSocket; class ListenSocketConsumer; class ListenSocketIO; class UnixSocketConnector; @@ -31,22 +32,24 @@ public: */ ListenSocket(ListenSocketConsumer* aConsumer, int aIndex); /** * Starts a task on the socket that will try to accept a new connection * in a non-blocking manner. * * @param aConnector Connector object for socket-type-specific functions + * @param aConsumerThread The socket's consumer thread. * @param aIOLoop The socket's I/O thread. * @param aCOSocket The connection-oriented socket for handling the * accepted connection. * @return NS_OK on success, or an XPCOM error code otherwise. */ nsresult Listen(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocket* aCOSocket); /** * Starts a task on the socket that will try to accept a new connection * in a non-blocking manner. * * @param aConnector Connector object for socket-type-specific functions
--- a/ipc/unixsocket/SocketBase.cpp +++ b/ipc/unixsocket/SocketBase.cpp @@ -181,66 +181,54 @@ UnixSocketRawData::Send(int aFd) // // SocketBase // SocketConnectionStatus SocketBase::GetConnectionStatus() const { - MOZ_ASSERT(NS_IsMainThread()); - return mConnectionStatus; } int SocketBase::GetSuggestedConnectDelayMs() const { - MOZ_ASSERT(NS_IsMainThread()); - return mConnectDelayMs; } void SocketBase::NotifySuccess() { - MOZ_ASSERT(NS_IsMainThread()); - mConnectionStatus = SOCKET_CONNECTED; mConnectTimestamp = PR_IntervalNow(); OnConnectSuccess(); } void SocketBase::NotifyError() { - MOZ_ASSERT(NS_IsMainThread()); - mConnectionStatus = SOCKET_DISCONNECTED; mConnectDelayMs = CalculateConnectDelayMs(); mConnectTimestamp = 0; OnConnectError(); } void SocketBase::NotifyDisconnect() { - MOZ_ASSERT(NS_IsMainThread()); - mConnectionStatus = SOCKET_DISCONNECTED; mConnectDelayMs = CalculateConnectDelayMs(); mConnectTimestamp = 0; OnDisconnect(); } uint32_t SocketBase::CalculateConnectDelayMs() const { - MOZ_ASSERT(NS_IsMainThread()); - uint32_t connectDelayMs = mConnectDelayMs; if (mConnectTimestamp && (PR_IntervalNow()-mConnectTimestamp) > connectDelayMs) { // reset delay if connection has been opened for a while, or... connectDelayMs = 0; } else if (!connectDelayMs) { // ...start with a delay of ~1 sec, or... connectDelayMs = 1<<10; @@ -267,38 +255,57 @@ SocketBase::SetConnectionStatus(SocketCo { mConnectionStatus = aConnectionStatus; } // // SocketIOBase // -SocketIOBase::SocketIOBase() -{ } +SocketIOBase::SocketIOBase(nsIThread* aConsumerThread) + : mConsumerThread(aConsumerThread) +{ + MOZ_ASSERT(mConsumerThread); +} SocketIOBase::~SocketIOBase() { } +nsIThread* +SocketIOBase::GetConsumerThread() const +{ + return mConsumerThread; +} + +bool +SocketIOBase::IsConsumerThread() const +{ + nsIThread* thread = nullptr; + if (NS_FAILED(NS_GetCurrentThread(&thread))) { + return false; + } + return thread == GetConsumerThread(); +} + // // SocketIOEventRunnable // SocketIOEventRunnable::SocketIOEventRunnable(SocketIOBase* aIO, SocketEvent aEvent) : SocketIORunnable<SocketIOBase>(aIO) , mEvent(aEvent) { } NS_METHOD SocketIOEventRunnable::Run() { - MOZ_ASSERT(NS_IsMainThread()); + SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO(); - SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO(); + MOZ_ASSERT(io->IsConsumerThread()); if (NS_WARN_IF(io->IsShutdownOnMainThread())) { // Since we've already explicitly closed and the close // happened before this, this isn't really an error. return NS_OK; } SocketBase* socketBase = io->GetSocketBase(); @@ -322,19 +329,19 @@ SocketIOEventRunnable::Run() SocketIORequestClosingRunnable::SocketIORequestClosingRunnable( SocketIOBase* aIO) : SocketIORunnable<SocketIOBase>(aIO) { } NS_METHOD SocketIORequestClosingRunnable::Run() { - MOZ_ASSERT(NS_IsMainThread()); + SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO(); - SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO(); + MOZ_ASSERT(io->IsConsumerThread()); if (NS_WARN_IF(io->IsShutdownOnMainThread())) { // Since we've already explicitly closed and the close // happened before this, this isn't really an error. return NS_OK; } SocketBase* socketBase = io->GetSocketBase(); @@ -368,25 +375,25 @@ SocketIODeleteInstanceRunnable::Run() SocketIOShutdownTask::SocketIOShutdownTask(SocketIOBase* aIO) : SocketIOTask<SocketIOBase>(aIO) { } void SocketIOShutdownTask::Run() { - MOZ_ASSERT(!NS_IsMainThread()); + SocketIOBase* io = SocketIOTask<SocketIOBase>::GetIO(); - SocketIOBase* io = SocketIOTask<SocketIOBase>::GetIO(); + MOZ_ASSERT(!io->IsConsumerThread()); + MOZ_ASSERT(!io->IsShutdownOnIOThread()); // At this point, there should be no new events on the I/O thread // after this one with the possible exception of an accept task, // which ShutdownOnIOThread will cancel for us. We are now fully // shut down, so we can send a message to the main thread to delete // |io| safely knowing that it's not reference any longer. - MOZ_ASSERT(!io->IsShutdownOnIOThread()); io->ShutdownOnIOThread(); - - NS_DispatchToMainThread(new SocketIODeleteInstanceRunnable(io)); + io->GetConsumerThread()->Dispatch(new SocketIODeleteInstanceRunnable(io), + NS_DISPATCH_NORMAL); } } }
--- a/ipc/unixsocket/SocketBase.h +++ b/ipc/unixsocket/SocketBase.h @@ -348,18 +348,34 @@ public: virtual void ShutdownOnIOThread() = 0; /** * Signals to the socket I/O classes that the socket class has been * shut down. */ virtual void ShutdownOnMainThread() = 0; + /** + * Returns the consumer thread. + * + * @return A pointer to the consumer thread. + */ + nsIThread* GetConsumerThread() const; + + /** + * @return True if the current thread is thre consumer thread, or false + * otherwise. + */ + bool IsConsumerThread() const; + protected: - SocketIOBase(); + SocketIOBase(nsIThread* nsConsumerThread); + +private: + nsCOMPtr<nsIThread> mConsumerThread; }; // // Socket I/O runnables // /* |SocketIORunnable| is a runnable for sending a message from * the I/O thread to the main thread.
--- a/ipc/unixsocket/StreamSocket.cpp +++ b/ipc/unixsocket/StreamSocket.cpp @@ -24,20 +24,22 @@ class StreamSocketIO final : public UnixSocketWatcher , public ConnectionOrientedSocketIO { public: class ConnectTask; class DelayedConnectTask; class ReceiveRunnable; - StreamSocketIO(MessageLoop* mIOLoop, + StreamSocketIO(nsIThread* aConsumerThread, + MessageLoop* mIOLoop, StreamSocket* aStreamSocket, UnixSocketConnector* aConnector); - StreamSocketIO(MessageLoop* mIOLoop, int aFd, + StreamSocketIO(nsIThread* aConsumerThread, + MessageLoop* mIOLoop, int aFd, ConnectionStatus aConnectionStatus, StreamSocket* aStreamSocket, UnixSocketConnector* aConnector); ~StreamSocketIO(); StreamSocket* GetStreamSocket(); DataSocket* GetDataSocket(); @@ -128,48 +130,52 @@ private: CancelableTask* mDelayedConnectTask; /** * I/O buffer for received data */ nsAutoPtr<UnixSocketRawData> mBuffer; }; -StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, +StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, StreamSocket* aStreamSocket, UnixSocketConnector* aConnector) - : UnixSocketWatcher(mIOLoop) + : UnixSocketWatcher(aIOLoop) + , ConnectionOrientedSocketIO(aConsumerThread) , mStreamSocket(aStreamSocket) , mConnector(aConnector) , mShuttingDownOnIOThread(false) , mAddressLength(0) , mDelayedConnectTask(nullptr) { MOZ_ASSERT(mStreamSocket); MOZ_ASSERT(mConnector); } -StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, int aFd, - ConnectionStatus aConnectionStatus, +StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, + int aFd, ConnectionStatus aConnectionStatus, StreamSocket* aStreamSocket, UnixSocketConnector* aConnector) - : UnixSocketWatcher(mIOLoop, aFd, aConnectionStatus) + : UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus) + , ConnectionOrientedSocketIO(aConsumerThread) , mStreamSocket(aStreamSocket) , mConnector(aConnector) , mShuttingDownOnIOThread(false) , mAddressLength(0) , mDelayedConnectTask(nullptr) { MOZ_ASSERT(mStreamSocket); MOZ_ASSERT(mConnector); } StreamSocketIO::~StreamSocketIO() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(IsShutdownOnMainThread()); } StreamSocket* StreamSocketIO::GetStreamSocket() { return mStreamSocket.get(); } @@ -178,33 +184,33 @@ DataSocket* StreamSocketIO::GetDataSocket() { return mStreamSocket.get(); } void StreamSocketIO::SetDelayedConnectTask(CancelableTask* aTask) { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); mDelayedConnectTask = aTask; } void StreamSocketIO::ClearDelayedConnectTask() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); mDelayedConnectTask = nullptr; } void StreamSocketIO::CancelDelayedConnectTask() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); if (!mDelayedConnectTask) { return; } mDelayedConnectTask->Cancel(); ClearDelayedConnectTask(); } @@ -241,18 +247,19 @@ StreamSocketIO::Send(UnixSocketIOBuffer* } void StreamSocketIO::OnConnected() { MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { AddWatchers(WRITE_WATCHER, false); } } void @@ -308,18 +315,19 @@ void StreamSocketIO::FireSocketError() { MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); // Clean up watchers, statuses, fds Close(); // Tell the main thread we've errored - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR), + NS_DISPATCH_NORMAL); } // |ConnectionOrientedSocketIO| nsresult StreamSocketIO::Accept(int aFd, const struct sockaddr* aAddress, socklen_t aAddressLength) @@ -329,18 +337,19 @@ StreamSocketIO::Accept(int aFd, SetSocket(aFd, SOCKET_IS_CONNECTED); // Address setup mAddressLength = aAddressLength; memcpy(&mAddress, aAddress, mAddressLength); // Signal success - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { AddWatchers(WRITE_WATCHER, false); } return NS_OK; } @@ -370,19 +379,19 @@ class StreamSocketIO::ReceiveRunnable fi public: ReceiveRunnable(StreamSocketIO* aIO, UnixSocketBuffer* aBuffer) : SocketIORunnable<StreamSocketIO>(aIO) , mBuffer(aBuffer) { } NS_IMETHOD Run() override { - MOZ_ASSERT(NS_IsMainThread()); + StreamSocketIO* io = SocketIORunnable<StreamSocketIO>::GetIO(); - StreamSocketIO* io = SocketIORunnable<StreamSocketIO>::GetIO(); + MOZ_ASSERT(io->IsConsumerThread()); if (NS_WARN_IF(io->IsShutdownOnMainThread())) { // Since we've already explicitly closed and the close // happened before this, this isn't really an error. return NS_OK; } StreamSocket* streamSocket = io->GetStreamSocket(); @@ -395,17 +404,18 @@ public: private: nsAutoPtr<UnixSocketBuffer> mBuffer; }; void StreamSocketIO::ConsumeBuffer() { - NS_DispatchToMainThread(new ReceiveRunnable(this, mBuffer.forget())); + GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()), + NS_DISPATCH_NORMAL); } void StreamSocketIO::DiscardBuffer() { // Nothing to do. } @@ -415,40 +425,40 @@ SocketBase* StreamSocketIO::GetSocketBase() { return GetDataSocket(); } bool StreamSocketIO::IsShutdownOnMainThread() const { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); return mStreamSocket == nullptr; } bool StreamSocketIO::IsShutdownOnIOThread() const { return mShuttingDownOnIOThread; } void StreamSocketIO::ShutdownOnMainThread() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(!IsShutdownOnMainThread()); mStreamSocket = nullptr; } void StreamSocketIO::ShutdownOnIOThread() { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); Close(); // will also remove fd from I/O loop mShuttingDownOnIOThread = true; } // // Socket tasks @@ -459,34 +469,34 @@ class StreamSocketIO::ConnectTask final { public: ConnectTask(StreamSocketIO* aIO) : SocketIOTask<StreamSocketIO>(aIO) { } void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); MOZ_ASSERT(!IsCanceled()); GetIO()->Connect(); } }; class StreamSocketIO::DelayedConnectTask final : public SocketIOTask<StreamSocketIO> { public: DelayedConnectTask(StreamSocketIO* aIO) : SocketIOTask<StreamSocketIO>(aIO) { } void Run() override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(GetIO()->IsConsumerThread()); if (IsCanceled()) { return; } StreamSocketIO* io = GetIO(); if (io->IsShutdownOnMainThread()) { return; @@ -512,121 +522,119 @@ StreamSocket::StreamSocket(StreamSocketC StreamSocket::~StreamSocket() { MOZ_ASSERT(!mIO); } void StreamSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->ReceiveSocketData(mIndex, aBuffer); } nsresult StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs, - MessageLoop* aIOLoop) + nsIThread* aConsumerThread, MessageLoop* aIOLoop) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!mIO); - mIO = new StreamSocketIO(aIOLoop, this, aConnector); + mIO = new StreamSocketIO(aConsumerThread, aIOLoop, this, aConnector); SetConnectionStatus(SOCKET_CONNECTING); if (aDelayMs > 0) { StreamSocketIO::DelayedConnectTask* connectTask = new StreamSocketIO::DelayedConnectTask(mIO); mIO->SetDelayedConnectTask(connectTask); MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs); } else { aIOLoop->PostTask(FROM_HERE, new StreamSocketIO::ConnectTask(mIO)); } + return NS_OK; } nsresult StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs) { - return Connect(aConnector, aDelayMs, XRE_GetIOMessageLoop()); + nsIThread* consumerThread = nullptr; + nsresult rv = NS_GetCurrentThread(&consumerThread); + if (NS_FAILED(rv)) { + return rv; + } + + return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop()); } // |ConnectionOrientedSocket| nsresult StreamSocket::PrepareAccept(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocketIO*& aIO) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!mIO); MOZ_ASSERT(aConnector); SetConnectionStatus(SOCKET_CONNECTING); - mIO = new StreamSocketIO(aIOLoop, + mIO = new StreamSocketIO(aConsumerThread, aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING, this, aConnector); aIO = mIO; return NS_OK; } // |DataSocket| void StreamSocket::SendSocketData(UnixSocketIOBuffer* aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mIO); + MOZ_ASSERT(mIO->IsConsumerThread()); + MOZ_ASSERT(!mIO->IsShutdownOnMainThread()); - MOZ_ASSERT(!mIO->IsShutdownOnMainThread()); mIO->GetIOLoop()->PostTask( FROM_HERE, new SocketIOSendTask<StreamSocketIO, UnixSocketIOBuffer>(mIO, aBuffer)); } // |SocketBase| void StreamSocket::Close() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mIO); + MOZ_ASSERT(mIO->IsConsumerThread()); mIO->CancelDelayedConnectTask(); // From this point on, we consider |mIO| as being deleted. We sever // the relationship here so any future calls to |Connect| will create // a new I/O object. mIO->ShutdownOnMainThread(); mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO)); mIO = nullptr; NotifyDisconnect(); } void StreamSocket::OnConnectSuccess() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnConnectSuccess(mIndex); } void StreamSocket::OnConnectError() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnConnectError(mIndex); } void StreamSocket::OnDisconnect() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnDisconnect(mIndex); } } // namespace ipc } // namespace mozilla
--- a/ipc/unixsocket/StreamSocket.h +++ b/ipc/unixsocket/StreamSocket.h @@ -37,36 +37,38 @@ public: void ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer); /** * Starts a task on the socket that will try to connect to a socket in a * non-blocking manner. * * @param aConnector Connector object for socket type specific functions * @param aDelayMs Time delay in milliseconds. + * @param aConsumerThread The socket's consumer thread. * @param aIOLoop The socket's I/O thread. * @return NS_OK on success, or an XPCOM error code otherwise. */ nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs, - MessageLoop* aIOLoop); + nsIThread* aConsumerThread, MessageLoop* aIOLoop); /** * Starts a task on the socket that will try to connect to a socket in a * non-blocking manner. * * @param aConnector Connector object for socket type specific functions * @param aDelayMs Time delay in milliseconds. * @return NS_OK on success, or an XPCOM error code otherwise. */ nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs = 0); // Methods for |ConnectionOrientedSocket| // nsresult PrepareAccept(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocketIO*& aIO) override; // Methods for |DataSocket| // void SendSocketData(UnixSocketIOBuffer* aBuffer) override;