Bug 1328642 Protect multi-thread stream access with a mutex. r=asuth
authorBen Kelly <ben@wanderview.com>
Wed, 18 Jan 2017 06:30:24 -0800
changeset 377234 c6e104aef85f2ee6831ce1e6b38c4127ef08d125
parent 377233 2d9cff6d389ac6ece80e6621b2a6d029bcb81908
child 377235 a26555efc6e55329fd02217f5585658db3ed91df
push id1419
push userjlund@mozilla.com
push dateMon, 10 Apr 2017 20:44:07 +0000
treeherdermozilla-release@5e6801b73ef6 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersasuth
bugs1328642
milestone53.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1328642 Protect multi-thread stream access with a mutex. r=asuth
dom/cache/ReadStream.cpp
--- a/dom/cache/ReadStream.cpp
+++ b/dom/cache/ReadStream.cpp
@@ -94,29 +94,36 @@ private:
 
   // Weak ref to the stream control actor.  The actor will always call either
   // CloseStream() or CloseStreamWithoutReporting() before it's destroyed.  The
   // weak ref is cleared in the resulting NoteClosedOnOwningThread() or
   // ForgetOnOwningThread() method call.
   StreamControl* mControl;
 
   const nsID mId;
-  nsCOMPtr<nsIInputStream> mStream;
-  nsCOMPtr<nsIInputStream> mSnappyStream;
   nsCOMPtr<nsIThread> mOwningThread;
 
   enum State
   {
     Open,
     Closed,
     NumStates
   };
   Atomic<State> mState;
   Atomic<bool> mHasEverBeenRead;
 
+
+  // The wrapped stream objects may not be threadsafe.  We need to be able
+  // to close a stream on our owning thread while an IO thread is simultaneously
+  // reading the same stream.  Therefore, protect all access to these stream
+  // objects with a mutex.
+  Mutex mMutex;
+  nsCOMPtr<nsIInputStream> mStream;
+  nsCOMPtr<nsIInputStream> mSnappyStream;
+
   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(cache::ReadStream::Inner, override)
 };
 
 // ----------------------------------------------------------------------------
 
 // Runnable to notify actors that the ReadStream has closed.  This must
 // be done on the thread associated with the PBackground actor.  Must be
 // cancelable to execute on Worker threads (which can occur when the
@@ -185,20 +192,22 @@ private:
 };
 
 // ----------------------------------------------------------------------------
 
 ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId,
                          nsIInputStream* aStream)
   : mControl(aControl)
   , mId(aId)
+  , mOwningThread(NS_GetCurrentThread())
+  , mState(Open)
+  , mHasEverBeenRead(false)
+  , mMutex("dom::cache::ReadStream")
   , mStream(aStream)
   , mSnappyStream(new SnappyUncompressInputStream(aStream))
-  , mOwningThread(NS_GetCurrentThread())
-  , mState(Open)
 {
   MOZ_DIAGNOSTIC_ASSERT(mStream);
   MOZ_DIAGNOSTIC_ASSERT(mControl);
   mControl->AddReadStream(this);
 }
 
 void
 ReadStream::Inner::Serialize(CacheReadStreamOrVoid* aReadStreamOut,
@@ -223,17 +232,21 @@ ReadStream::Inner::Serialize(CacheReadSt
     aRv.ThrowTypeError<MSG_CACHE_STREAM_CLOSED>();
     return;
   }
 
   MOZ_DIAGNOSTIC_ASSERT(mControl);
 
   aReadStreamOut->id() = mId;
   mControl->SerializeControl(aReadStreamOut);
-  mControl->SerializeStream(aReadStreamOut, mStream, aStreamCleanupList);
+
+  {
+    MutexAutoLock lock(mMutex);
+    mControl->SerializeStream(aReadStreamOut, mStream, aStreamCleanupList);
+  }
 
   MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut->stream().type() ==
                         IPCStream::TInputStreamParamsWithFds);
 
   // We're passing ownership across the IPC barrier with the control, so
   // do not signal that the stream is closed here.
   Forget();
 }
@@ -265,41 +278,53 @@ ReadStream::Inner::HasEverBeenRead() con
   MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
   return mHasEverBeenRead;
 }
 
 nsresult
 ReadStream::Inner::Close()
 {
   // stream ops can happen on any thread
-  nsresult rv = mStream->Close();
+  nsresult rv = NS_OK;
+  {
+    MutexAutoLock lock(mMutex);
+    rv = mSnappyStream->Close();
+  }
   NoteClosed();
   return rv;
 }
 
 nsresult
 ReadStream::Inner::Available(uint64_t* aNumAvailableOut)
 {
   // stream ops can happen on any thread
-  nsresult rv = mSnappyStream->Available(aNumAvailableOut);
+  nsresult rv = NS_OK;
+  {
+    MutexAutoLock lock(mMutex);
+    rv = mSnappyStream->Available(aNumAvailableOut);
+  }
 
   if (NS_FAILED(rv)) {
     Close();
   }
 
   return rv;
 }
 
 nsresult
 ReadStream::Inner::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut)
 {
   // stream ops can happen on any thread
   MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
 
-  nsresult rv = mSnappyStream->Read(aBuf, aCount, aNumReadOut);
+  nsresult rv = NS_OK;
+  {
+    MutexAutoLock lock(mMutex);
+    rv = mSnappyStream->Read(aBuf, aCount, aNumReadOut);
+  }
 
   if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) ||
       *aNumReadOut == 0) {
     Close();
   }
 
   mHasEverBeenRead = true;
 
@@ -312,18 +337,22 @@ ReadStream::Inner::ReadSegments(nsWriteS
 {
   // stream ops can happen on any thread
   MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
 
   if (aCount) {
     mHasEverBeenRead = true;
   }
 
-  nsresult rv = mSnappyStream->ReadSegments(aWriter, aClosure, aCount,
-                                            aNumReadOut);
+
+  nsresult rv = NS_OK;
+  {
+    MutexAutoLock lock(mMutex);
+    rv = mSnappyStream->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
+  }
 
   if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK &&
                         rv != NS_ERROR_NOT_IMPLEMENTED) || *aNumReadOut == 0) {
     Close();
   }
 
   // Verify bytes were actually read before marking as being ever read.  For
   // example, code can test if the stream supports ReadSegments() by calling
@@ -335,16 +364,17 @@ ReadStream::Inner::ReadSegments(nsWriteS
 
   return rv;
 }
 
 nsresult
 ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut)
 {
   // stream ops can happen on any thread
+  MutexAutoLock lock(mMutex);
   return mSnappyStream->IsNonBlocking(aNonBlockingOut);
 }
 
 ReadStream::Inner::~Inner()
 {
   // Any thread
   MOZ_DIAGNOSTIC_ASSERT(mState == Closed);
   MOZ_DIAGNOSTIC_ASSERT(!mControl);