Bug 1328642 Protect multi-thread stream access with a mutex. r=asuth
authorBen Kelly <ben@wanderview.com>
Wed, 18 Jan 2017 06:30:24 -0800
changeset 463135 c6e104aef85f2ee6831ce1e6b38c4127ef08d125
parent 463134 2d9cff6d389ac6ece80e6621b2a6d029bcb81908
child 463136 a26555efc6e55329fd02217f5585658db3ed91df
push id41967
push userbmo:miket@mozilla.com
push dateWed, 18 Jan 2017 15:17:32 +0000
reviewersasuth
bugs1328642
milestone53.0a1
Bug 1328642 Protect multi-thread stream access with a mutex. r=asuth
dom/cache/ReadStream.cpp
--- a/dom/cache/ReadStream.cpp
+++ b/dom/cache/ReadStream.cpp
@@ -94,29 +94,36 @@ private:
 
   // Weak ref to the stream control actor.  The actor will always call either
   // CloseStream() or CloseStreamWithoutReporting() before it's destroyed.  The
   // weak ref is cleared in the resulting NoteClosedOnOwningThread() or
   // ForgetOnOwningThread() method call.
   StreamControl* mControl;
 
   const nsID mId;
-  nsCOMPtr<nsIInputStream> mStream;
-  nsCOMPtr<nsIInputStream> mSnappyStream;
   nsCOMPtr<nsIThread> mOwningThread;
 
   enum State
   {
     Open,
     Closed,
     NumStates
   };
   Atomic<State> mState;
   Atomic<bool> mHasEverBeenRead;
 
+
+  // The wrapped stream objects may not be threadsafe.  We need to be able
+  // to close a stream on our owning thread while an IO thread is simultaneously
+  // reading the same stream.  Therefore, protect all access to these stream
+  // objects with a mutex.
+  Mutex mMutex;
+  nsCOMPtr<nsIInputStream> mStream;
+  nsCOMPtr<nsIInputStream> mSnappyStream;
+
   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(cache::ReadStream::Inner, override)
 };
 
 // ----------------------------------------------------------------------------
 
 // Runnable to notify actors that the ReadStream has closed.  This must
 // be done on the thread associated with the PBackground actor.  Must be
 // cancelable to execute on Worker threads (which can occur when the
@@ -185,20 +192,22 @@ private:
 };
 
 // ----------------------------------------------------------------------------
 
 ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId,
                          nsIInputStream* aStream)
   : mControl(aControl)
   , mId(aId)
+  , mOwningThread(NS_GetCurrentThread())
+  , mState(Open)
+  , mHasEverBeenRead(false)
+  , mMutex("dom::cache::ReadStream")
   , mStream(aStream)
   , mSnappyStream(new SnappyUncompressInputStream(aStream))
-  , mOwningThread(NS_GetCurrentThread())
-  , mState(Open)
 {
   MOZ_DIAGNOSTIC_ASSERT(mStream);
   MOZ_DIAGNOSTIC_ASSERT(mControl);
   mControl->AddReadStream(this);
 }
 
 void
 ReadStream::Inner::Serialize(CacheReadStreamOrVoid* aReadStreamOut,
@@ -223,17 +232,21 @@ ReadStream::Inner::Serialize(CacheReadSt
     aRv.ThrowTypeError<MSG_CACHE_STREAM_CLOSED>();
     return;
   }
 
   MOZ_DIAGNOSTIC_ASSERT(mControl);
 
   aReadStreamOut->id() = mId;
   mControl->SerializeControl(aReadStreamOut);
-  mControl->SerializeStream(aReadStreamOut, mStream, aStreamCleanupList);
+
+  {
+    MutexAutoLock lock(mMutex);
+    mControl->SerializeStream(aReadStreamOut, mStream, aStreamCleanupList);
+  }
 
   MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut->stream().type() ==
                         IPCStream::TInputStreamParamsWithFds);
 
   // We're passing ownership across the IPC barrier with the control, so
   // do not signal that the stream is closed here.
   Forget();
 }
@@ -265,41 +278,53 @@ ReadStream::Inner::HasEverBeenRead() con
   MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
   return mHasEverBeenRead;
 }
 
 nsresult
 ReadStream::Inner::Close()
 {
   // stream ops can happen on any thread
-  nsresult rv = mStream->Close();
+  nsresult rv = NS_OK;
+  {
+    MutexAutoLock lock(mMutex);
+    rv = mSnappyStream->Close();
+  }
   NoteClosed();
   return rv;
 }
 
 nsresult
 ReadStream::Inner::Available(uint64_t* aNumAvailableOut)
 {
   // stream ops can happen on any thread
-  nsresult rv = mSnappyStream->Available(aNumAvailableOut);
+  nsresult rv = NS_OK;
+  {
+    MutexAutoLock lock(mMutex);
+    rv = mSnappyStream->Available(aNumAvailableOut);
+  }
 
   if (NS_FAILED(rv)) {
     Close();
   }
 
   return rv;
 }
 
 nsresult
 ReadStream::Inner::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut)
 {
   // stream ops can happen on any thread
   MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
 
-  nsresult rv = mSnappyStream->Read(aBuf, aCount, aNumReadOut);
+  nsresult rv = NS_OK;
+  {
+    MutexAutoLock lock(mMutex);
+    rv = mSnappyStream->Read(aBuf, aCount, aNumReadOut);
+  }
 
   if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) ||
       *aNumReadOut == 0) {
     Close();
   }
 
   mHasEverBeenRead = true;
 
@@ -312,18 +337,22 @@ ReadStream::Inner::ReadSegments(nsWriteS
 {
   // stream ops can happen on any thread
   MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
 
   if (aCount) {
     mHasEverBeenRead = true;
   }
 
-  nsresult rv = mSnappyStream->ReadSegments(aWriter, aClosure, aCount,
-                                            aNumReadOut);
+
+  nsresult rv = NS_OK;
+  {
+    MutexAutoLock lock(mMutex);
+    rv = mSnappyStream->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
+  }
 
   if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK &&
                         rv != NS_ERROR_NOT_IMPLEMENTED) || *aNumReadOut == 0) {
     Close();
   }
 
   // Verify bytes were actually read before marking as being ever read.  For
   // example, code can test if the stream supports ReadSegments() by calling
@@ -335,16 +364,17 @@ ReadStream::Inner::ReadSegments(nsWriteS
 
   return rv;
 }
 
 nsresult
 ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut)
 {
   // stream ops can happen on any thread
+  MutexAutoLock lock(mMutex);
   return mSnappyStream->IsNonBlocking(aNonBlockingOut);
 }
 
 ReadStream::Inner::~Inner()
 {
   // Any thread
   MOZ_DIAGNOSTIC_ASSERT(mState == Closed);
   MOZ_DIAGNOSTIC_ASSERT(!mControl);