Bug 1134372 P1 Allow pipe cloned streams to be read at different rates. r=froydnj a=jcristau+gchang
authorBen Kelly <ben@wanderview.com>
Fri, 02 Dec 2016 10:41:33 -0800
changeset 352940 dec6908254a5ff7a6917827c7cf85e47072483ea
parent 352939 df1cd1f34259b3674eaa22b6040f699da784ef7f
child 352941 b512f512ba1fd814530bd77a048194ba0ed8399c
push id6795
push userjlund@mozilla.com
push dateMon, 23 Jan 2017 14:19:46 +0000
treeherdermozilla-esr52@76101b503191 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersfroydnj, jcristau
bugs1134372
milestone52.0a2
Bug 1134372 P1 Allow pipe cloned streams to be read at different rates. r=froydnj a=jcristau+gchang
xpcom/io/nsPipe3.cpp
--- a/xpcom/io/nsPipe3.cpp
+++ b/xpcom/io/nsPipe3.cpp
@@ -50,17 +50,17 @@ enum MonitorAction
 {
   DoNotNotifyMonitor,
   NotifyMonitor
 };
 
 enum SegmentChangeResult
 {
   SegmentNotChanged,
-  SegmentDeleted
+  SegmentAdvanceBufferRead
 };
 
 } // namespace
 
 //-----------------------------------------------------------------------------
 
 // this class is used to delay notifications until the end of a particular
 // scope.  it helps avoid the complexity of issuing callbacks while inside
@@ -304,36 +304,41 @@ public:
 
   // nsPipe methods:
   nsPipe();
 
 private:
   ~nsPipe();
 
   //
-  // methods below may only be called while inside the pipe's monitor
+  // Methods below may only be called while inside the pipe's monitor.  Some
+  // of these methods require passing a ReentrantMonitorAutoEnter to prove the
+  // monitor is held.
   //
 
   void PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
                    char*& aCursor, char*& aLimit);
-  SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState);
+  SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState,
+                                         const ReentrantMonitorAutoEnter &ev);
   bool ReadSegmentBeingWritten(nsPipeReadState& aReadState);
   uint32_t CountSegmentReferences(int32_t aSegment);
   void SetAllNullReadCursors();
   bool AllReadCursorsMatchWriteCursor();
   void RollBackAllReadCursors(char* aWriteCursor);
   void UpdateAllReadCursors(char* aWriteCursor);
   void ValidateAllReadCursors();
+  uint32_t GetBufferSegmentCount(const nsPipeReadState& aReadState,
+                                 const ReentrantMonitorAutoEnter& ev) const;
+  bool IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const;
 
   //
   // methods below may be called while outside the pipe's monitor
   //
 
   void     DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents);
-
   nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen);
   void     AdvanceWriteCursor(uint32_t aCount);
 
   void     OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason);
   void     OnPipeException(nsresult aReason, bool aOutputOnly = false);
 
   nsresult CloneInputStream(nsPipeInputStream* aOriginal,
                             nsIInputStream** aCloneOut);
@@ -360,16 +365,21 @@ private:
   // compatibility we need to be able to consistently return this same
   // object from GetInputStream().  Note, mOriginalInput is also stored
   // in mInputList as a weak ref.
   RefPtr<nsPipeInputStream> mOriginalInput;
 
   ReentrantMonitor    mReentrantMonitor;
   nsSegmentedBuffer   mBuffer;
 
+  // The maximum number of segments to allow to be buffered in advance
+  // of the fastest reader.  This is collection of segments is called
+  // the "advance buffer".
+  uint32_t            mMaxAdvanceBufferSegmentCount;
+
   int32_t             mWriteSegment;
   char*               mWriteCursor;
   char*               mWriteLimit;
 
   // |mStatus| is protected by |mReentrantMonitor|.
   nsresult            mStatus;
   bool                mInited;
 };
@@ -489,31 +499,39 @@ private:
 //
 // (shaded region contains data)
 //
 // NOTE: Each input stream produced by the nsPipe contains its own, separate
 //       nsPipeReadState.  This means there are multiple mReadCursor and
 //       mReadLimit values in play.  The pipe cannot discard old data until
 //       all mReadCursors have moved beyond that point in the stream.
 //
+//       Likewise, each input stream reader will have it's own amount of
+//       buffered data.  The pipe size threshold, however, is only applied
+//       to the input stream that is being read fastest.  We call this
+//       the "advance buffer" in that its in advance of all readers.  We
+//       allow slower input streams to buffer more data so that we don't
+//       stall processing of the faster input stream.
+//
 // NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
 // small allocations (e.g., 64 byte allocations).  this means that buffers may
 // be allocated back-to-back.  in the diagram above, for example, mReadLimit
 // would actually be pointing at the beginning of the next segment.  when
 // making changes to this file, please keep this fact in mind.
 //
 
 //-----------------------------------------------------------------------------
 // nsPipe methods:
 //-----------------------------------------------------------------------------
 
 nsPipe::nsPipe()
   : mOutput(this)
   , mOriginalInput(new nsPipeInputStream(this))
   , mReentrantMonitor("nsPipe.mReentrantMonitor")
+  , mMaxAdvanceBufferSegmentCount(0)
   , mWriteSegment(-1)
   , mWriteCursor(nullptr)
   , mWriteLimit(nullptr)
   , mStatus(NS_OK)
   , mInited(false)
 {
   mInputList.AppendElement(mOriginalInput);
 }
@@ -561,21 +579,27 @@ nsPipe::Init(bool aNonBlockingIn,
   }
 
   // protect against overflow
   uint32_t maxCount = uint32_t(-1) / aSegmentSize;
   if (aSegmentCount > maxCount) {
     aSegmentCount = maxCount;
   }
 
-  nsresult rv = mBuffer.Init(aSegmentSize, aSegmentSize * aSegmentCount);
+  // The internal buffer is always "infinite" so that we can allow
+  // the size to expand when cloned streams are read at different
+  // rates.  We enforce a limit on how much data can be buffered
+  // ahead of the fastest reader in GetWriteSegment().
+  nsresult rv = mBuffer.Init(aSegmentSize, UINT32_MAX);
   if (NS_FAILED(rv)) {
     return rv;
   }
 
+  mMaxAdvanceBufferSegmentCount = aSegmentCount;
+
   mOutput.SetNonBlocking(aNonBlockingOut);
   mOriginalInput->SetNonBlocking(aNonBlockingIn);
 
   return NS_OK;
 }
 
 NS_IMETHODIMP
 nsPipe::GetInputStream(nsIAsyncInputStream** aInputStream)
@@ -685,40 +709,40 @@ nsPipe::AdvanceReadCursor(nsPipeReadStat
     aReadState.mAvailable -= aBytesRead;
 
     // Check to see if we're at the end of the available read data.  If we
     // are, and this segment is not still being written, then we can possibly
     // free up the segment.
     if (aReadState.mReadCursor == aReadState.mReadLimit &&
         !ReadSegmentBeingWritten(aReadState)) {
 
-      // Check to see if we can free up any segments.  If we can, then notify
-      // the output stream that the pipe has room for a new segment.
-      if (AdvanceReadSegment(aReadState) == SegmentDeleted &&
+      // Advance the segment position.  If we have read any segments from the
+      // advance buffer then we can potentially notify blocked writers.
+      if (AdvanceReadSegment(aReadState, mon) == SegmentAdvanceBufferRead &&
           mOutput.OnOutputWritable(events) == NotifyMonitor) {
         mon.NotifyAll();
       }
     }
 
     ReleaseReadSegment(aReadState, events);
   }
 }
 
 SegmentChangeResult
-nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState)
+nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState,
+                           const ReentrantMonitorAutoEnter &ev)
 {
-  mReentrantMonitor.AssertCurrentThreadIn();
+  // Calculate how many segments are buffered for this stream to start.
+  uint32_t startBufferSegments = GetBufferSegmentCount(aReadState, ev);
 
   int32_t currentSegment = aReadState.mSegment;
 
   // Move to the next segment to read
   aReadState.mSegment += 1;
 
-  SegmentChangeResult result = SegmentNotChanged;
-
   // If this was the last reference to the first segment, then remove it.
   if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
 
     // shift write and read segment index (-1 indicates an empty buffer).
     mWriteSegment -= 1;
 
     // Directly modify the current read state.  If the associated input
     // stream is closed simultaneous with reading, then it may not be
@@ -732,18 +756,16 @@ nsPipe::AdvanceReadSegment(nsPipeReadSta
         continue;
       }
       mInputList[i]->ReadState().mSegment -= 1;
     }
 
     // done with this segment
     mBuffer.DeleteFirstSegment();
     LOG(("III deleting first segment\n"));
-
-    result = SegmentDeleted;
   }
 
   if (mWriteSegment < aReadState.mSegment) {
     // read cursor has hit the end of written data, so reset it
     MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
     aReadState.mReadCursor = nullptr;
     aReadState.mReadLimit = nullptr;
     // also, the buffer is completely empty, so reset the write cursor
@@ -756,17 +778,29 @@ nsPipe::AdvanceReadSegment(nsPipeReadSta
     aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
     if (mWriteSegment == aReadState.mSegment) {
       aReadState.mReadLimit = mWriteCursor;
     } else {
       aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
     }
   }
 
-  return result;
+  // Calculate how many segments are buffered for the stream after
+  // reading.
+  uint32_t endBufferSegments = GetBufferSegmentCount(aReadState, ev);
+
+  // If the stream has read a segment out of the set of advanced buffer
+  // segments, then the writer may advance.
+  if (startBufferSegments >= mMaxAdvanceBufferSegmentCount &&
+      endBufferSegments < mMaxAdvanceBufferSegmentCount) {
+    return SegmentAdvanceBufferRead;
+  }
+
+  // Otherwise there are no significant changes to the segment structure.
+  return SegmentNotChanged;
 }
 
 void
 nsPipe::DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents)
 {
   ReentrantMonitorAutoEnter mon(mReentrantMonitor);
 
   // If a segment is actively being read in ReadSegments() for this input
@@ -777,33 +811,33 @@ nsPipe::DrainInputStream(nsPipeReadState
   if (aReadState.mActiveRead) {
     MOZ_ASSERT(!aReadState.mNeedDrain);
     aReadState.mNeedDrain = true;
     return;
   }
 
   aReadState.mAvailable = 0;
 
-  SegmentChangeResult result = SegmentNotChanged;
   while(mWriteSegment >= aReadState.mSegment) {
 
     // If the last segment to free is still being written to, we're done
     // draining.  We can't free any more.
     if (ReadSegmentBeingWritten(aReadState)) {
       break;
     }
 
-    if (AdvanceReadSegment(aReadState) == SegmentDeleted) {
-      result = SegmentDeleted;
-    }
+    // Don't bother checking if this results in an advance buffer segment
+    // read.  Since we are draining the entire stream we will read an
+    // advance buffer segment no matter what.
+    AdvanceReadSegment(aReadState, mon);
   }
 
-  // if we've free'd up a segment, notify output stream that pipe has
-  // room for a new segment.
-  if (result == SegmentDeleted &&
+  // If we have read any segments from the advance buffer then we can
+  // potentially notify blocked writers.
+  if (!IsAdvanceBufferFull(mon) &&
       mOutput.OnOutputWritable(aEvents) == NotifyMonitor) {
     mon.NotifyAll();
   }
 }
 
 bool
 nsPipe::ReadSegmentBeingWritten(nsPipeReadState& aReadState)
 {
@@ -821,21 +855,28 @@ nsPipe::GetWriteSegment(char*& aSegment,
   ReentrantMonitorAutoEnter mon(mReentrantMonitor);
 
   if (NS_FAILED(mStatus)) {
     return mStatus;
   }
 
   // write cursor and limit may both be null indicating an empty buffer.
   if (mWriteCursor == mWriteLimit) {
-    char* seg = mBuffer.AppendNewSegment();
-    // pipe is full
-    if (!seg) {
+    // The pipe is full if we have hit our limit on advance data buffering.
+    // This means the fastest reader is still reading slower than data is
+    // being written into the pipe.
+    if (IsAdvanceBufferFull(mon)) {
       return NS_BASE_STREAM_WOULD_BLOCK;
     }
+
+    // The nsSegmentedBuffer is configured to be "infinite", so this
+    // should never return nullptr here.
+    char* seg = mBuffer.AppendNewSegment();
+    MOZ_DIAGNOSTIC_ASSERT(seg);
+
     LOG(("OOO appended new segment\n"));
     mWriteCursor = seg;
     mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
     ++mWriteSegment;
   }
 
   // make sure read cursor is initialized
   SetAllNullReadCursors();
@@ -872,19 +913,17 @@ nsPipe::AdvanceWriteCursor(uint32_t aByt
     UpdateAllReadCursors(newWriteCursor);
 
     mWriteCursor = newWriteCursor;
 
     ValidateAllReadCursors();
 
     // update the writable flag on the output stream
     if (mWriteCursor == mWriteLimit) {
-      if (mBuffer.GetSize() >= mBuffer.GetMaxSize()) {
-        mOutput.SetWritable(false);
-      }
+      mOutput.SetWritable(!IsAdvanceBufferFull(mon));
     }
 
     // notify input stream that pipe now contains additional data
     bool needNotify = false;
     for (uint32_t i = 0; i < mInputList.Length(); ++i) {
       if (mInputList[i]->OnInputReadable(aBytesWritten, events, mon)
           == NotifyMonitor) {
         needNotify = true;
@@ -1096,16 +1135,78 @@ nsPipe::ValidateAllReadCursors()
     NS_ASSERTION(state.mReadCursor != mWriteCursor ||
                  (mBuffer.GetSegment(state.mSegment) == state.mReadCursor &&
                   mWriteCursor == mWriteLimit),
                  "read cursor is bad");
   }
 #endif
 }
 
+uint32_t
+nsPipe::GetBufferSegmentCount(const nsPipeReadState& aReadState,
+                              const ReentrantMonitorAutoEnter& ev) const
+{
+  // The write segment can be smaller than the current reader position
+  // in some cases.  For example, when the first write segment has not
+  // been allocated yet mWriteSegment is negative.  In these cases
+  // the stream is effectively using zero segments.
+  if (mWriteSegment < aReadState.mSegment) {
+    return 0;
+  }
+
+  MOZ_ASSERT(mWriteSegment >= 0);
+  MOZ_ASSERT(aReadState.mSegment >= 0);
+
+  // Otherwise at least one segment is being used.  We add one here
+  // since a single segment is being used when the write and read
+  // segment indices are the same.
+  return 1 + mWriteSegment - aReadState.mSegment;
+}
+
+bool
+nsPipe::IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const
+{
+  // If we have fewer total segments than the limit we can immediately
+  // determine we are not full.  Note, we must add one to mWriteSegment
+  // to convert from a index to a count.
+  MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= -1);
+  MOZ_DIAGNOSTIC_ASSERT(mWriteSegment < INT32_MAX);
+  uint32_t totalWriteSegments = mWriteSegment + 1;
+  if (totalWriteSegments < mMaxAdvanceBufferSegmentCount) {
+    return false;
+  }
+
+  // Otherwise we must inspect all of our reader streams.  We need
+  // to determine the buffer depth of the fastest reader.
+  uint32_t minBufferSegments = UINT32_MAX;
+  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+    // Only count buffer segments from input streams that are open.
+    if (NS_FAILED(mInputList[i]->Status(ev))) {
+      continue;
+    }
+    const nsPipeReadState& state = mInputList[i]->ReadState();
+    uint32_t bufferSegments = GetBufferSegmentCount(state, ev);
+    minBufferSegments = std::min(minBufferSegments, bufferSegments);
+    // We only care if any reader has fewer segments buffered than
+    // our threshold.  We can stop once we hit that threshold.
+    if (minBufferSegments < mMaxAdvanceBufferSegmentCount) {
+      return false;
+    }
+  }
+
+  // Note, its possible for minBufferSegments to exceed our
+  // mMaxAdvanceBufferSegmentCount here.  This happens when a cloned
+  // reader gets far behind, but then the fastest reader stream is
+  // closed.  This leaves us with a single stream that is buffered
+  // beyond our max.  Naturally we continue to indicate the pipe
+  // is full at this point.
+
+  return true;
+}
+
 //-----------------------------------------------------------------------------
 // nsPipeEvents methods:
 //-----------------------------------------------------------------------------
 
 nsPipeEvents::~nsPipeEvents()
 {
   // dispatch any pending events