--- a/dom/cache/ReadStream.cpp
+++ b/dom/cache/ReadStream.cpp
@@ -94,29 +94,36 @@ private:
// Weak ref to the stream control actor. The actor will always call either
// CloseStream() or CloseStreamWithoutReporting() before it's destroyed. The
// weak ref is cleared in the resulting NoteClosedOnOwningThread() or
// ForgetOnOwningThread() method call.
StreamControl* mControl;
const nsID mId;
- nsCOMPtr<nsIInputStream> mStream;
- nsCOMPtr<nsIInputStream> mSnappyStream;
nsCOMPtr<nsIThread> mOwningThread;
enum State
{
Open,
Closed,
NumStates
};
Atomic<State> mState;
Atomic<bool> mHasEverBeenRead;
+
+ // The wrapped stream objects may not be threadsafe. We need to be able
+ // to close a stream on our owning thread while an IO thread is simultaneously
+ // reading the same stream. Therefore, protect all access to these stream
+ // objects with a mutex.
+ Mutex mMutex;
+ nsCOMPtr<nsIInputStream> mStream;
+ nsCOMPtr<nsIInputStream> mSnappyStream;
+
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(cache::ReadStream::Inner, override)
};
// ----------------------------------------------------------------------------
// Runnable to notify actors that the ReadStream has closed. This must
// be done on the thread associated with the PBackground actor. Must be
// cancelable to execute on Worker threads (which can occur when the
@@ -185,20 +192,22 @@ private:
};
// ----------------------------------------------------------------------------
ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId,
nsIInputStream* aStream)
: mControl(aControl)
, mId(aId)
+ , mOwningThread(NS_GetCurrentThread())
+ , mState(Open)
+ , mHasEverBeenRead(false)
+ , mMutex("dom::cache::ReadStream")
, mStream(aStream)
, mSnappyStream(new SnappyUncompressInputStream(aStream))
- , mOwningThread(NS_GetCurrentThread())
- , mState(Open)
{
MOZ_DIAGNOSTIC_ASSERT(mStream);
MOZ_DIAGNOSTIC_ASSERT(mControl);
mControl->AddReadStream(this);
}
void
ReadStream::Inner::Serialize(CacheReadStreamOrVoid* aReadStreamOut,
@@ -223,17 +232,21 @@ ReadStream::Inner::Serialize(CacheReadSt
aRv.ThrowTypeError<MSG_CACHE_STREAM_CLOSED>();
return;
}
MOZ_DIAGNOSTIC_ASSERT(mControl);
aReadStreamOut->id() = mId;
mControl->SerializeControl(aReadStreamOut);
- mControl->SerializeStream(aReadStreamOut, mStream, aStreamCleanupList);
+
+ {
+ MutexAutoLock lock(mMutex);
+ mControl->SerializeStream(aReadStreamOut, mStream, aStreamCleanupList);
+ }
MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut->stream().type() ==
IPCStream::TInputStreamParamsWithFds);
// We're passing ownership across the IPC barrier with the control, so
// do not signal that the stream is closed here.
Forget();
}
@@ -265,41 +278,53 @@ ReadStream::Inner::HasEverBeenRead() con
MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
return mHasEverBeenRead;
}
nsresult
ReadStream::Inner::Close()
{
// stream ops can happen on any thread
- nsresult rv = mStream->Close();
+ nsresult rv = NS_OK;
+ {
+ MutexAutoLock lock(mMutex);
+ rv = mSnappyStream->Close();
+ }
NoteClosed();
return rv;
}
nsresult
ReadStream::Inner::Available(uint64_t* aNumAvailableOut)
{
// stream ops can happen on any thread
- nsresult rv = mSnappyStream->Available(aNumAvailableOut);
+ nsresult rv = NS_OK;
+ {
+ MutexAutoLock lock(mMutex);
+ rv = mSnappyStream->Available(aNumAvailableOut);
+ }
if (NS_FAILED(rv)) {
Close();
}
return rv;
}
nsresult
ReadStream::Inner::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut)
{
// stream ops can happen on any thread
MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
- nsresult rv = mSnappyStream->Read(aBuf, aCount, aNumReadOut);
+ nsresult rv = NS_OK;
+ {
+ MutexAutoLock lock(mMutex);
+ rv = mSnappyStream->Read(aBuf, aCount, aNumReadOut);
+ }
if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) ||
*aNumReadOut == 0) {
Close();
}
mHasEverBeenRead = true;
@@ -312,18 +337,22 @@ ReadStream::Inner::ReadSegments(nsWriteS
{
// stream ops can happen on any thread
MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
if (aCount) {
mHasEverBeenRead = true;
}
- nsresult rv = mSnappyStream->ReadSegments(aWriter, aClosure, aCount,
- aNumReadOut);
+
+ nsresult rv = NS_OK;
+ {
+ MutexAutoLock lock(mMutex);
+ rv = mSnappyStream->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
+ }
if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK &&
rv != NS_ERROR_NOT_IMPLEMENTED) || *aNumReadOut == 0) {
Close();
}
// Verify bytes were actually read before marking as being ever read. For
// example, code can test if the stream supports ReadSegments() by calling
@@ -335,16 +364,17 @@ ReadStream::Inner::ReadSegments(nsWriteS
return rv;
}
nsresult
ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut)
{
// stream ops can happen on any thread
+ MutexAutoLock lock(mMutex);
return mSnappyStream->IsNonBlocking(aNonBlockingOut);
}
ReadStream::Inner::~Inner()
{
// Any thread
MOZ_DIAGNOSTIC_ASSERT(mState == Closed);
MOZ_DIAGNOSTIC_ASSERT(!mControl);