Bug 915905 - Prevent multiple concurrent calls to OnInputStreamReady r=jduell
authorSteve Workman <sworkman@mozilla.com>
Tue, 24 Sep 2013 10:53:35 -0700
changeset 162286 67a56c884bfe5600dce84c43a1df422aa0994933
parent 162285 2ea5da604a19b8fe7c77831016ad81d8dc524c1e
child 162287 ac09e1a3fa46889569c043aa7c12f6bef071880a
push id3066
push userakeybl@mozilla.com
push dateMon, 09 Dec 2013 19:58:46 +0000
treeherdermozilla-beta@a31a0dce83aa [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersjduell
bugs915905
milestone27.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 915905 - Prevent multiple concurrent calls to OnInputStreamReady r=jduell
netwerk/base/src/nsInputStreamPump.cpp
netwerk/base/src/nsInputStreamPump.h
--- a/netwerk/base/src/nsInputStreamPump.cpp
+++ b/netwerk/base/src/nsInputStreamPump.cpp
@@ -35,19 +35,21 @@ static PRLogModuleInfo *gStreamPumpLog =
 
 nsInputStreamPump::nsInputStreamPump()
     : mState(STATE_IDLE)
     , mStreamOffset(0)
     , mStreamLength(UINT64_MAX)
     , mStatus(NS_OK)
     , mSuspendCount(0)
     , mLoadFlags(LOAD_NORMAL)
-    , mWaiting(false)
+    , mProcessingCallbacks(false)
+    , mWaitingForInputStreamReady(false)
     , mCloseWhenDone(false)
     , mRetargeting(false)
+    , mMonitor("nsInputStreamPump")
 {
 #if defined(PR_LOGGING)
     if (!gStreamPumpLog)
         gStreamPumpLog = PR_NewLogModule("nsStreamPump");
 #endif
 }
 
 nsInputStreamPump::~nsInputStreamPump()
@@ -96,16 +98,18 @@ CallPeekFunc(nsIInputStream *aInStream, 
   data->mFunc(data->mClosure,
               reinterpret_cast<const uint8_t*>(aFromSegment), aCount);
   return NS_BINDING_ABORTED;
 }
 
 nsresult
 nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure)
 {
+  ReentrantMonitorAutoEnter mon(mMonitor);
+
   NS_ASSERTION(mAsyncStream, "PeekStream called without stream");
 
   // See if the pipe is closed by checking the return of Available.
   uint64_t dummy64;
   nsresult rv = mAsyncStream->Available(&dummy64);
   if (NS_FAILED(rv))
     return rv;
   uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX);
@@ -115,37 +119,39 @@ nsInputStreamPump::PeekStream(PeekSegmen
                                     &data,
                                     nsIOService::gDefaultSegmentSize,
                                     &dummy);
 }
 
 nsresult
 nsInputStreamPump::EnsureWaiting()
 {
+    mMonitor.AssertCurrentThreadIn();
+
     // no need to worry about multiple threads... an input stream pump lives
     // on only one thread at a time.
     MOZ_ASSERT(mAsyncStream);
-    if (!mWaiting) {
+    if (!mWaitingForInputStreamReady && !mProcessingCallbacks) {
         // Ensure OnStateStop is called on the main thread.
         if (mState == STATE_STOP) {
             nsCOMPtr<nsIThread> mainThread = do_GetMainThread();
             if (mTargetThread != mainThread) {
                 mTargetThread = do_QueryInterface(mainThread);
             }
         }
         MOZ_ASSERT(mTargetThread);
         nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread);
         if (NS_FAILED(rv)) {
             NS_ERROR("AsyncWait failed");
             return rv;
         }
         // Any retargeting during STATE_START or START_TRANSFER is complete
         // after the call to AsyncWait; next callback wil be on mTargetThread.
         mRetargeting = false;
-        mWaiting = true;
+        mWaitingForInputStreamReady = true;
     }
     return NS_OK;
 }
 
 //-----------------------------------------------------------------------------
 // nsInputStreamPump::nsISupports
 //-----------------------------------------------------------------------------
 
@@ -160,37 +166,45 @@ NS_IMPL_ISUPPORTS4(nsInputStreamPump,
 
 //-----------------------------------------------------------------------------
 // nsInputStreamPump::nsIRequest
 //-----------------------------------------------------------------------------
 
 NS_IMETHODIMP
 nsInputStreamPump::GetName(nsACString &result)
 {
+    ReentrantMonitorAutoEnter mon(mMonitor);
+
     result.Truncate();
     return NS_OK;
 }
 
 NS_IMETHODIMP
 nsInputStreamPump::IsPending(bool *result)
 {
+    ReentrantMonitorAutoEnter mon(mMonitor);
+
     *result = (mState != STATE_IDLE);
     return NS_OK;
 }
 
 NS_IMETHODIMP
 nsInputStreamPump::GetStatus(nsresult *status)
 {
+    ReentrantMonitorAutoEnter mon(mMonitor);
+
     *status = mStatus;
     return NS_OK;
 }
 
 NS_IMETHODIMP
 nsInputStreamPump::Cancel(nsresult status)
 {
+    ReentrantMonitorAutoEnter mon(mMonitor);
+
     LOG(("nsInputStreamPump::Cancel [this=%p status=%x]\n",
         this, status));
 
     if (NS_FAILED(mStatus)) {
         LOG(("  already canceled\n"));
         return NS_OK;
     }
 
@@ -208,58 +222,70 @@ nsInputStreamPump::Cancel(nsresult statu
         // on a closed stream works and will dispatch an event immediately.
     }
     return NS_OK;
 }
 
 NS_IMETHODIMP
 nsInputStreamPump::Suspend()
 {
+    ReentrantMonitorAutoEnter mon(mMonitor);
+
     LOG(("nsInputStreamPump::Suspend [this=%p]\n", this));
     NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
     ++mSuspendCount;
     return NS_OK;
 }
 
 NS_IMETHODIMP
 nsInputStreamPump::Resume()
 {
+    ReentrantMonitorAutoEnter mon(mMonitor);
+
     LOG(("nsInputStreamPump::Resume [this=%p]\n", this));
     NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED);
     NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
 
     if (--mSuspendCount == 0)
         EnsureWaiting();
     return NS_OK;
 }
 
 NS_IMETHODIMP
 nsInputStreamPump::GetLoadFlags(nsLoadFlags *aLoadFlags)
 {
+    ReentrantMonitorAutoEnter mon(mMonitor);
+
     *aLoadFlags = mLoadFlags;
     return NS_OK;
 }
 
 NS_IMETHODIMP
 nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags)
 {
+    ReentrantMonitorAutoEnter mon(mMonitor);
+
     mLoadFlags = aLoadFlags;
     return NS_OK;
 }
 
 NS_IMETHODIMP
 nsInputStreamPump::GetLoadGroup(nsILoadGroup **aLoadGroup)
 {
+    ReentrantMonitorAutoEnter mon(mMonitor);
+
     NS_IF_ADDREF(*aLoadGroup = mLoadGroup);
     return NS_OK;
 }
 
 NS_IMETHODIMP
 nsInputStreamPump::SetLoadGroup(nsILoadGroup *aLoadGroup)
 {
+    ReentrantMonitorAutoEnter mon(mMonitor);
+
     mLoadGroup = aLoadGroup;
     return NS_OK;
 }
 
 //-----------------------------------------------------------------------------
 // nsInputStreamPump::nsIInputStreamPump implementation
 //-----------------------------------------------------------------------------
 
@@ -280,16 +306,18 @@ nsInputStreamPump::Init(nsIInputStream *
     mCloseWhenDone = closeWhenDone;
 
     return NS_OK;
 }
 
 NS_IMETHODIMP
 nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt)
 {
+    ReentrantMonitorAutoEnter mon(mMonitor);
+
     NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
     NS_ENSURE_ARG_POINTER(listener);
     MOZ_ASSERT(NS_IsMainThread(), "nsInputStreamPump should be read from the "
                                   "main thread only.");
 
     //
     // OK, we need to use the stream transport service if
     //
@@ -369,18 +397,33 @@ nsInputStreamPump::OnInputStreamReady(ns
 {
     LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this));
 
     PROFILER_LABEL("Input", "nsInputStreamPump::OnInputStreamReady");
     // this function has been called from a PLEvent, so we can safely call
     // any listener or progress sink methods directly from here.
 
     for (;;) {
+        // There should only be one iteration of this loop happening at a time. 
+        // To prevent AsyncWait() (called during callbacks or on other threads)
+        // from creating a parallel OnInputStreamReady(), we use:
+        // -- a monitor; and
+        // -- a boolean mProcessingCallbacks to detect parallel loops
+        //    when exiting the monitor for callbacks.
+        ReentrantMonitorAutoEnter lock(mMonitor);
+
+        // Prevent parallel execution during callbacks, while out of monitor.
+        if (mProcessingCallbacks) {
+            MOZ_ASSERT(!mProcessingCallbacks);
+            break;
+        }
+        mProcessingCallbacks = true;
         if (mSuspendCount || mState == STATE_IDLE) {
-            mWaiting = false;
+            mWaitingForInputStreamReady = false;
+            mProcessingCallbacks = false;
             break;
         }
 
         uint32_t nextState;
         switch (mState) {
         case STATE_START:
             nextState = OnStateStart();
             break;
@@ -412,21 +455,25 @@ nsInputStreamPump::OnInputStreamReady(ns
         }
 
         // Set mRetargeting so EnsureWaiting will be called. It ensures that
         // OnStateStop is called on the main thread. 
         if (nextState == STATE_STOP && !NS_IsMainThread()) {
             mRetargeting = true;
         }
 
+        // Unset mProcessingCallbacks here (while we have lock) so our own call to
+        // EnsureWaiting isn't blocked by it.
+        mProcessingCallbacks = false;
+
         // Wait asynchronously if there is still data to transfer, or we're
         // switching event delivery to another thread.
         if (!mSuspendCount && (stillTransferring || mRetargeting)) {
             mState = nextState;
-            mWaiting = false;
+            mWaitingForInputStreamReady = false;
             nsresult rv = EnsureWaiting();
             if (NS_SUCCEEDED(rv))
                 break;
             
             // Failure to start asynchronous wait: stop transfer.
             // Do not set mStatus if it was previously set to report a failure.
             if (NS_SUCCEEDED(mStatus)) {
                 mStatus = rv;
@@ -437,44 +484,55 @@ nsInputStreamPump::OnInputStreamReady(ns
         mState = nextState;
     }
     return NS_OK;
 }
 
 uint32_t
 nsInputStreamPump::OnStateStart()
 {
+    mMonitor.AssertCurrentThreadIn();
+
     PROFILER_LABEL("nsInputStreamPump", "OnStateStart");
     LOG(("  OnStateStart [this=%p]\n", this));
 
     nsresult rv;
 
     // need to check the reason why the stream is ready.  this is required
     // so our listener can check our status from OnStartRequest.
     // XXX async streams should have a GetStatus method!
     if (NS_SUCCEEDED(mStatus)) {
         uint64_t avail;
         rv = mAsyncStream->Available(&avail);
         if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED)
             mStatus = rv;
     }
 
-    rv = mListener->OnStartRequest(this, mListenerContext);
+    {
+        // Note: Must exit monitor for call to OnStartRequest to avoid
+        // deadlocks when calls to RetargetDeliveryTo for multiple
+        // nsInputStreamPumps are needed (e.g. nsHttpChannel).
+        mMonitor.Exit();
+        rv = mListener->OnStartRequest(this, mListenerContext);
+        mMonitor.Enter();
+    }
 
     // an error returned from OnStartRequest should cause us to abort; however,
     // we must not stomp on mStatus if already canceled.
     if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus))
         mStatus = rv;
 
     return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP;
 }
 
 uint32_t
 nsInputStreamPump::OnStateTransfer()
 {
+    mMonitor.AssertCurrentThreadIn();
+
     PROFILER_LABEL("Input", "nsInputStreamPump::OnStateTransfer");
     LOG(("  OnStateTransfer [this=%p]\n", this));
 
     // if canceled, go directly to STATE_STOP...
     if (NS_FAILED(mStatus))
         return STATE_STOP;
 
     nsresult rv;
@@ -517,18 +575,26 @@ nsInputStreamPump::OnStateTransfer()
 
             uint32_t odaAvail =
                 avail > UINT32_MAX ?
                 UINT32_MAX : uint32_t(avail);
 
             LOG(("  calling OnDataAvailable [offset=%llu count=%llu(%u)]\n",
                 mStreamOffset, avail, odaAvail));
 
-            rv = mListener->OnDataAvailable(this, mListenerContext, mAsyncStream,
-                                            mStreamOffset, odaAvail);
+            {
+                // Note: Must exit monitor for call to OnStartRequest to avoid
+                // deadlocks when calls to RetargetDeliveryTo for multiple
+                // nsInputStreamPumps are needed (e.g. nsHttpChannel).
+                mMonitor.Exit();
+                rv = mListener->OnDataAvailable(this, mListenerContext,
+                                                mAsyncStream, mStreamOffset,
+                                                odaAvail);
+                mMonitor.Enter();
+            }
 
             // don't enter this code if ODA failed or called Cancel
             if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) {
                 // test to see if this ODA failed to consume data
                 if (seekable) {
                     // NOTE: if Tell fails, which can happen if the stream is
                     // now closed, then we assume that everything was read.
                     int64_t offsetAfter;
@@ -573,26 +639,30 @@ nsInputStreamPump::OnStateTransfer()
         }
     }
     return STATE_STOP;
 }
 
 nsresult
 nsInputStreamPump::CallOnStateStop()
 {
+    ReentrantMonitorAutoEnter mon(mMonitor);
+
     MOZ_ASSERT(NS_IsMainThread(),
                "CallOnStateStop should only be called on the main thread.");
 
     mState = OnStateStop();
     return NS_OK;
 }
 
 uint32_t
 nsInputStreamPump::OnStateStop()
 {
+    mMonitor.AssertCurrentThreadIn();
+
     if (!NS_IsMainThread()) {
         // Hopefully temporary hack: OnStateStop should only run on the main
         // thread, but we're seeing some rare off-main-thread calls. For now
         // just redispatch to the main thread in release builds, and crash in
         // debug builds.
         MOZ_ASSERT(NS_IsMainThread(),
                    "OnStateStop should only be called on the main thread.");
         nsresult rv = NS_DispatchToMainThread(
@@ -617,33 +687,42 @@ nsInputStreamPump::OnStateStop()
     if (NS_FAILED(mStatus))
         mAsyncStream->CloseWithStatus(mStatus);
     else if (mCloseWhenDone)
         mAsyncStream->Close();
 
     mAsyncStream = 0;
     mTargetThread = 0;
     mIsPending = false;
-    mListener->OnStopRequest(this, mListenerContext, mStatus);
+    {
+        // Note: Must exit monitor for call to OnStartRequest to avoid
+        // deadlocks when calls to RetargetDeliveryTo for multiple
+        // nsInputStreamPumps are needed (e.g. nsHttpChannel).
+        mMonitor.Exit();
+        mListener->OnStopRequest(this, mListenerContext, mStatus);
+        mMonitor.Enter();
+    }
     mListener = 0;
     mListenerContext = 0;
 
     if (mLoadGroup)
         mLoadGroup->RemoveRequest(this, nullptr, mStatus);
 
     return STATE_IDLE;
 }
 
 //-----------------------------------------------------------------------------
 // nsIThreadRetargetableRequest
 //-----------------------------------------------------------------------------
 
 NS_IMETHODIMP
 nsInputStreamPump::RetargetDeliveryTo(nsIEventTarget* aNewTarget)
 {
+    ReentrantMonitorAutoEnter mon(mMonitor);
+
     NS_ENSURE_ARG(aNewTarget);
     NS_ENSURE_TRUE(mState == STATE_START || mState == STATE_TRANSFER,
                    NS_ERROR_UNEXPECTED);
 
     // If canceled, do not retarget. Return with canceled status.
     if (NS_FAILED(mStatus)) {
         return mStatus;
     }
--- a/netwerk/base/src/nsInputStreamPump.h
+++ b/netwerk/base/src/nsInputStreamPump.h
@@ -6,26 +6,28 @@
 #ifndef nsInputStreamPump_h__
 #define nsInputStreamPump_h__
 
 #include "nsIInputStreamPump.h"
 #include "nsIAsyncInputStream.h"
 #include "nsIThreadRetargetableRequest.h"
 #include "nsCOMPtr.h"
 #include "mozilla/Attributes.h"
+#include "mozilla/ReentrantMonitor.h"
 
 class nsIInputStream;
 class nsILoadGroup;
 class nsIStreamListener;
 
 class nsInputStreamPump MOZ_FINAL : public nsIInputStreamPump
                                   , public nsIInputStreamCallback
                                   , public nsIThreadRetargetableRequest
 {
 public:
+    typedef mozilla::ReentrantMonitorAutoEnter ReentrantMonitorAutoEnter;
     NS_DECL_THREADSAFE_ISUPPORTS
     NS_DECL_NSIREQUEST
     NS_DECL_NSIINPUTSTREAMPUMP
     NS_DECL_NSIINPUTSTREAMCALLBACK
     NS_DECL_NSITHREADRETARGETABLEREQUEST
 
     nsInputStreamPump(); 
     ~nsInputStreamPump();
@@ -83,14 +85,20 @@ protected:
     uint64_t                      mStreamOffset;
     uint64_t                      mStreamLength;
     uint32_t                      mSegSize;
     uint32_t                      mSegCount;
     nsresult                      mStatus;
     uint32_t                      mSuspendCount;
     uint32_t                      mLoadFlags;
     bool                          mIsPending;
-    bool                          mWaiting; // true if waiting on async source
+    // True while in OnInputStreamReady, calling OnStateStart, OnStateTransfer
+    // and OnStateStop. Used to prevent calls to AsyncWait during callbacks.
+    bool                          mProcessingCallbacks;
+    // True if waiting on the "input stream ready" callback.
+    bool                          mWaitingForInputStreamReady;
     bool                          mCloseWhenDone;
     bool                          mRetargeting;
+    // Protects state/member var accesses across multiple threads.
+    mozilla::ReentrantMonitor     mMonitor;
 };
 
 #endif // !nsInputStreamChannel_h__