Bug 1133939 P1 Free buffer resources when an nsPipeInputStream is closed. r=froydnj
☠☠ backed out by e02960e6ee7a ☠ ☠
authorBen Kelly <ben@wanderview.com>
Fri, 20 Feb 2015 18:16:04 -0500
changeset 257154 c96050cdedd5f7b9336678499f91cf136fea7b04
parent 257153 e9a8fe34405ccb260609e18ff885921502a78327
child 257155 4c2b179b71aece02d8c241573d2e5c75321970c5
push id4610
push userjlund@mozilla.com
push dateMon, 30 Mar 2015 18:32:55 +0000
treeherdermozilla-beta@4df54044d9ef [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersfroydnj
bugs1133939
milestone38.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1133939 P1 Free buffer resources when an nsPipeInputStream is closed. r=froydnj
xpcom/io/nsPipe3.cpp
--- a/xpcom/io/nsPipe3.cpp
+++ b/xpcom/io/nsPipe3.cpp
@@ -49,16 +49,32 @@ GetPipeLog()
 #define DEFAULT_SEGMENT_SIZE  4096
 #define DEFAULT_SEGMENT_COUNT 16
 
 class nsPipe;
 class nsPipeEvents;
 class nsPipeInputStream;
 class nsPipeOutputStream;
 
+namespace {
+
+enum MonitorAction
+{
+  DoNotNotifyMonitor,
+  NotifyMonitor
+};
+
+enum SegmentChangeResult
+{
+  SegmentNotChanged,
+  SegmentDeleted
+};
+
+} // anonymous namespace
+
 //-----------------------------------------------------------------------------
 
 // this class is used to delay notifications until the end of a particular
 // scope.  it helps avoid the complexity of issuing callbacks while inside
 // a critical section.
 class nsPipeEvents
 {
 public:
@@ -155,20 +171,18 @@ public:
   uint32_t Available()
   {
     return mAvailable;
   }
 
   // synchronously wait for the pipe to become readable.
   nsresult Wait();
 
-  // these functions return true to indicate that the pipe's monitor should
-  // be notified, to wake up a blocked reader if any.
-  bool OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&);
-  bool OnInputException(nsresult, nsPipeEvents&);
+  MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&);
+  MonitorAction OnInputException(nsresult, nsPipeEvents&);
 
   nsPipeReadState& ReadState()
   {
     return mReadState;
   }
 
   const nsPipeReadState& ReadState() const
   {
@@ -234,20 +248,18 @@ public:
   void SetWritable(bool aWritable)
   {
     mWritable = aWritable;
   }
 
   // synchronously wait for the pipe to become writable.
   nsresult Wait();
 
-  // these functions return true to indicate that the pipe's monitor should
-  // be notified, to wake up a blocked writer if any.
-  bool OnOutputWritable(nsPipeEvents&);
-  bool OnOutputException(nsresult, nsPipeEvents&);
+  MonitorAction OnOutputWritable(nsPipeEvents&);
+  MonitorAction OnOutputException(nsresult, nsPipeEvents&);
 
 private:
   nsPipe*                         mPipe;
 
   // separate refcnt so that we know when to close the producer
   mozilla::ThreadSafeAutoRefCnt   mWriterRefCnt;
   int64_t                         mLogicalOffset;
   bool                            mBlocking;
@@ -287,16 +299,19 @@ public:
   //
   // methods below may be called while outside the pipe's monitor
   //
 
   nsresult GetReadSegment(const nsPipeReadState& aReadState,
                           const char*& aSegment, uint32_t& aSegmentLen);
   void     AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount,
                              uint32_t* aAvailableOut);
+  SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState);
+  void     DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents,
+                            uint32_t* aAvailableOut);
 
   nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen);
   void     AdvanceWriteCursor(uint32_t aCount);
 
   void     OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason);
   void     OnPipeException(nsresult aReason, bool aOutputOnly = false);
 
   nsresult CloneInputStream(nsPipeInputStream* aOriginal,
@@ -540,65 +555,99 @@ nsPipe::AdvanceReadCursor(nsPipeReadStat
 
       // if still writing in this segment then bail because we're not done
       // with the segment and have to wait for now...
       if (mWriteSegment == aReadState.mSegment && mWriteLimit > mWriteCursor) {
         NS_ASSERTION(aReadState.mReadLimit == mWriteCursor, "unexpected state");
         return;
       }
 
-      uint32_t currentSegment = aReadState.mSegment;
-
-      // Move to the next segment to read
-      aReadState.mSegment += 1;
-
-      // If this was the last reference to the first segment, then remove it.
-      if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
-
-        // shift write and read segment index (-1 indicates an empty buffer).
-        mWriteSegment -= 1;
-
-        for (uint32_t i = 0; i < mInputList.Length(); ++i) {
-          mInputList[i]->ReadState().mSegment -= 1;
-        }
-
-        // done with this segment
-        mBuffer.DeleteFirstSegment();
-        LOG(("III deleting first segment\n"));
-      }
-
-      if (mWriteSegment < aReadState.mSegment) {
-        // read cursor has hit the end of written data, so reset it
-        MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
-        aReadState.mReadCursor = nullptr;
-        aReadState.mReadLimit = nullptr;
-        // also, the buffer is completely empty, so reset the write cursor
-        if (mWriteSegment == -1) {
-          mWriteCursor = nullptr;
-          mWriteLimit = nullptr;
-        }
-      } else {
-        // advance read cursor and limit to next buffer segment
-        aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
-        if (mWriteSegment == aReadState.mSegment) {
-          aReadState.mReadLimit = mWriteCursor;
-        } else {
-          aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
-        }
-      }
-
-      // we've free'd up a segment, so notify output stream that pipe has
-      // room for a new segment.
-      if (mOutput.OnOutputWritable(events)) {
-        mon.Notify();
+      // Check to see if we can free up any segments.  If we can, then notify
+      // the output stream that the pipe has room for a new segment.
+      if (AdvanceReadSegment(aReadState) == SegmentDeleted &&
+          mOutput.OnOutputWritable(events) == NotifyMonitor) {
+        mon.NotifyAll();
       }
     }
   }
 }
 
+SegmentChangeResult
+nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState)
+{
+  uint32_t currentSegment = aReadState.mSegment;
+
+  // Move to the next segment to read
+  aReadState.mSegment += 1;
+
+  SegmentChangeResult result = SegmentNotChanged;
+
+  // If this was the last reference to the first segment, then remove it.
+  if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
+
+    // shift write and read segment index (-1 indicates an empty buffer).
+    mWriteSegment -= 1;
+
+    for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+      mInputList[i]->ReadState().mSegment -= 1;
+    }
+
+    // done with this segment
+    mBuffer.DeleteFirstSegment();
+    LOG(("III deleting first segment\n"));
+
+    result = SegmentDeleted;
+  }
+
+  if (mWriteSegment < aReadState.mSegment) {
+    // read cursor has hit the end of written data, so reset it
+    MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
+    aReadState.mReadCursor = nullptr;
+    aReadState.mReadLimit = nullptr;
+    // also, the buffer is completely empty, so reset the write cursor
+    if (mWriteSegment == -1) {
+      mWriteCursor = nullptr;
+      mWriteLimit = nullptr;
+    }
+  } else {
+    // advance read cursor and limit to next buffer segment
+    aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
+    if (mWriteSegment == aReadState.mSegment) {
+      aReadState.mReadLimit = mWriteCursor;
+    } else {
+      aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
+    }
+  }
+
+  return result;
+}
+
+void
+nsPipe::DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents,
+                         uint32_t* aAvailableOut)
+{
+  ReentrantMonitorAutoEnter mon(mReentrantMonitor);
+
+  *aAvailableOut = 0;
+
+  SegmentChangeResult result = SegmentNotChanged;
+  while(mWriteSegment >= aReadState.mSegment) {
+    if (AdvanceReadSegment(aReadState) == SegmentDeleted) {
+      result = SegmentDeleted;
+    }
+  }
+
+  // if we've free'd up a segment, notify output stream that pipe has
+  // room for a new segment.
+  if (result == SegmentDeleted &&
+      mOutput.OnOutputWritable(aEvents) == NotifyMonitor) {
+    mon.NotifyAll();
+  }
+}
+
 nsresult
 nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen)
 {
   ReentrantMonitorAutoEnter mon(mReentrantMonitor);
 
   if (NS_FAILED(mStatus)) {
     return mStatus;
   }
@@ -659,17 +708,17 @@ nsPipe::AdvanceWriteCursor(uint32_t aByt
       if (mBuffer.GetSize() >= mBuffer.GetMaxSize()) {
         mOutput.SetWritable(false);
       }
     }
 
     // notify input stream that pipe now contains additional data
     bool needNotify = false;
     for (uint32_t i = 0; i < mInputList.Length(); ++i) {
-      if (mInputList[i]->OnInputReadable(aBytesWritten, events)) {
+      if (mInputList[i]->OnInputReadable(aBytesWritten, events) == NotifyMonitor) {
         needNotify = true;
       }
     }
 
     if (needNotify) {
       mon.NotifyAll();
     }
   }
@@ -700,22 +749,22 @@ nsPipe::OnInputStreamException(nsPipeInp
     }
 
     // Otherwise just close the particular stream that hit an exception.
     for (uint32_t i = 0; i < mInputList.Length(); ++i) {
       if (mInputList[i] != aStream) {
         continue;
       }
 
-      bool needNotify = mInputList[i]->OnInputException(aReason, events);
+      MonitorAction action = mInputList[i]->OnInputException(aReason, events);
       mInputList.RemoveElementAt(i);
 
       // Notify after element is removed in case we re-enter as a result.
-      if (needNotify) {
-        mon.Notify();
+      if (action == NotifyMonitor) {
+        mon.NotifyAll();
       }
 
       return;
     }
   }
 }
 
 void
@@ -741,23 +790,23 @@ nsPipe::OnPipeException(nsresult aReason
     for (uint32_t i = 0; i < mInputList.Length(); ++i) {
       // an output-only exception applies to the input end if the pipe has
       // zero bytes available.
       if (aOutputOnly && mInputList[i]->Available()) {
         tmpInputList.AppendElement(mInputList[i]);
         continue;
       }
 
-      if (mInputList[i]->OnInputException(aReason, events)) {
+      if (mInputList[i]->OnInputException(aReason, events) == NotifyMonitor) {
         needNotify = true;
       }
     }
     mInputList = tmpInputList;
 
-    if (mOutput.OnOutputException(aReason, events)) {
+    if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) {
       needNotify = true;
     }
 
     // Notify after we have removed any input streams from mInputList
     if (needNotify) {
       mon.NotifyAll();
     }
   }
@@ -936,57 +985,57 @@ nsPipeInputStream::Wait()
 
     LOG(("III pipe input: woke up [status=%x available=%u]\n",
          Status(), mAvailable));
   }
 
   return Status() == NS_BASE_STREAM_CLOSED ? NS_OK : Status();
 }
 
-bool
+MonitorAction
 nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten, nsPipeEvents& aEvents)
 {
-  bool result = false;
+  MonitorAction result = DoNotNotifyMonitor;
 
   mAvailable += aBytesWritten;
 
   if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
     aEvents.NotifyInputReady(this, mCallback);
     mCallback = 0;
     mCallbackFlags = 0;
   } else if (mBlocked) {
-    result = true;
+    result = NotifyMonitor;
   }
 
   return result;
 }
 
-bool
+MonitorAction
 nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents)
 {
   LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n",
        this, aReason));
 
-  bool result = false;
+  MonitorAction result = DoNotNotifyMonitor;
 
   NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception");
 
   if (NS_SUCCEEDED(mInputStatus)) {
     mInputStatus = aReason;
   }
 
   // force count of available bytes to zero.
-  mAvailable = 0;
+  mPipe->DrainInputStream(mReadState, aEvents, &mAvailable);
 
   if (mCallback) {
     aEvents.NotifyInputReady(this, mCallback);
     mCallback = 0;
     mCallbackFlags = 0;
   } else if (mBlocked) {
-    result = true;
+    result = NotifyMonitor;
   }
 
   return result;
 }
 
 NS_IMETHODIMP
 nsPipeInputStream::CloseWithStatus(nsresult aReason)
 {
@@ -1317,51 +1366,51 @@ nsPipeOutputStream::Wait()
     mBlocked = false;
     LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n",
          mPipe->mStatus, mWritable));
   }
 
   return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
 }
 
-bool
+MonitorAction
 nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents)
 {
-  bool result = false;
+  MonitorAction result = DoNotNotifyMonitor;
 
   mWritable = true;
 
   if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
     aEvents.NotifyOutputReady(this, mCallback);
     mCallback = 0;
     mCallbackFlags = 0;
   } else if (mBlocked) {
-    result = true;
+    result = NotifyMonitor;
   }
 
   return result;
 }
 
-bool
+MonitorAction
 nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents)
 {
   LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n",
        this, aReason));
 
-  bool result = false;
+  MonitorAction result = DoNotNotifyMonitor;
 
   NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception");
   mWritable = false;
 
   if (mCallback) {
     aEvents.NotifyOutputReady(this, mCallback);
     mCallback = 0;
     mCallbackFlags = 0;
   } else if (mBlocked) {
-    result = true;
+    result = NotifyMonitor;
   }
 
   return result;
 }
 
 
 NS_IMETHODIMP_(MozExternalRefCountType)
 nsPipeOutputStream::AddRef()