--- a/xpcom/io/nsPipe3.cpp
+++ b/xpcom/io/nsPipe3.cpp
@@ -50,17 +50,17 @@ enum MonitorAction
{
DoNotNotifyMonitor,
NotifyMonitor
};
enum SegmentChangeResult
{
SegmentNotChanged,
- SegmentDeleted
+ SegmentAdvanceBufferRead
};
} // namespace
//-----------------------------------------------------------------------------
// this class is used to delay notifications until the end of a particular
// scope. it helps avoid the complexity of issuing callbacks while inside
@@ -304,36 +304,41 @@ public:
// nsPipe methods:
nsPipe();
private:
~nsPipe();
//
- // methods below may only be called while inside the pipe's monitor
+ // Methods below may only be called while inside the pipe's monitor. Some
+ // of these methods require passing a ReentrantMonitorAutoEnter to prove the
+ // monitor is held.
//
void PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
char*& aCursor, char*& aLimit);
- SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState);
+ SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState,
+ const ReentrantMonitorAutoEnter &ev);
bool ReadSegmentBeingWritten(nsPipeReadState& aReadState);
uint32_t CountSegmentReferences(int32_t aSegment);
void SetAllNullReadCursors();
bool AllReadCursorsMatchWriteCursor();
void RollBackAllReadCursors(char* aWriteCursor);
void UpdateAllReadCursors(char* aWriteCursor);
void ValidateAllReadCursors();
+ uint32_t GetBufferSegmentCount(const nsPipeReadState& aReadState,
+ const ReentrantMonitorAutoEnter& ev) const;
+ bool IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const;
//
// methods below may be called while outside the pipe's monitor
//
void DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents);
-
nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen);
void AdvanceWriteCursor(uint32_t aCount);
void OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason);
void OnPipeException(nsresult aReason, bool aOutputOnly = false);
nsresult CloneInputStream(nsPipeInputStream* aOriginal,
nsIInputStream** aCloneOut);
@@ -360,16 +365,21 @@ private:
// compatibility we need to be able to consistently return this same
// object from GetInputStream(). Note, mOriginalInput is also stored
// in mInputList as a weak ref.
RefPtr<nsPipeInputStream> mOriginalInput;
ReentrantMonitor mReentrantMonitor;
nsSegmentedBuffer mBuffer;
+ // The maximum number of segments to allow to be buffered in advance
+ // of the fastest reader. This is collection of segments is called
+ // the "advance buffer".
+ uint32_t mMaxAdvanceBufferSegmentCount;
+
int32_t mWriteSegment;
char* mWriteCursor;
char* mWriteLimit;
// |mStatus| is protected by |mReentrantMonitor|.
nsresult mStatus;
bool mInited;
};
@@ -489,31 +499,39 @@ private:
//
// (shaded region contains data)
//
// NOTE: Each input stream produced by the nsPipe contains its own, separate
// nsPipeReadState. This means there are multiple mReadCursor and
// mReadLimit values in play. The pipe cannot discard old data until
// all mReadCursors have moved beyond that point in the stream.
//
+// Likewise, each input stream reader will have it's own amount of
+// buffered data. The pipe size threshold, however, is only applied
+// to the input stream that is being read fastest. We call this
+// the "advance buffer" in that its in advance of all readers. We
+// allow slower input streams to buffer more data so that we don't
+// stall processing of the faster input stream.
+//
// NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
// small allocations (e.g., 64 byte allocations). this means that buffers may
// be allocated back-to-back. in the diagram above, for example, mReadLimit
// would actually be pointing at the beginning of the next segment. when
// making changes to this file, please keep this fact in mind.
//
//-----------------------------------------------------------------------------
// nsPipe methods:
//-----------------------------------------------------------------------------
nsPipe::nsPipe()
: mOutput(this)
, mOriginalInput(new nsPipeInputStream(this))
, mReentrantMonitor("nsPipe.mReentrantMonitor")
+ , mMaxAdvanceBufferSegmentCount(0)
, mWriteSegment(-1)
, mWriteCursor(nullptr)
, mWriteLimit(nullptr)
, mStatus(NS_OK)
, mInited(false)
{
mInputList.AppendElement(mOriginalInput);
}
@@ -561,21 +579,27 @@ nsPipe::Init(bool aNonBlockingIn,
}
// protect against overflow
uint32_t maxCount = uint32_t(-1) / aSegmentSize;
if (aSegmentCount > maxCount) {
aSegmentCount = maxCount;
}
- nsresult rv = mBuffer.Init(aSegmentSize, aSegmentSize * aSegmentCount);
+ // The internal buffer is always "infinite" so that we can allow
+ // the size to expand when cloned streams are read at different
+ // rates. We enforce a limit on how much data can be buffered
+ // ahead of the fastest reader in GetWriteSegment().
+ nsresult rv = mBuffer.Init(aSegmentSize, UINT32_MAX);
if (NS_FAILED(rv)) {
return rv;
}
+ mMaxAdvanceBufferSegmentCount = aSegmentCount;
+
mOutput.SetNonBlocking(aNonBlockingOut);
mOriginalInput->SetNonBlocking(aNonBlockingIn);
return NS_OK;
}
NS_IMETHODIMP
nsPipe::GetInputStream(nsIAsyncInputStream** aInputStream)
@@ -685,40 +709,40 @@ nsPipe::AdvanceReadCursor(nsPipeReadStat
aReadState.mAvailable -= aBytesRead;
// Check to see if we're at the end of the available read data. If we
// are, and this segment is not still being written, then we can possibly
// free up the segment.
if (aReadState.mReadCursor == aReadState.mReadLimit &&
!ReadSegmentBeingWritten(aReadState)) {
- // Check to see if we can free up any segments. If we can, then notify
- // the output stream that the pipe has room for a new segment.
- if (AdvanceReadSegment(aReadState) == SegmentDeleted &&
+ // Advance the segment position. If we have read any segments from the
+ // advance buffer then we can potentially notify blocked writers.
+ if (AdvanceReadSegment(aReadState, mon) == SegmentAdvanceBufferRead &&
mOutput.OnOutputWritable(events) == NotifyMonitor) {
mon.NotifyAll();
}
}
ReleaseReadSegment(aReadState, events);
}
}
SegmentChangeResult
-nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState)
+nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState,
+ const ReentrantMonitorAutoEnter &ev)
{
- mReentrantMonitor.AssertCurrentThreadIn();
+ // Calculate how many segments are buffered for this stream to start.
+ uint32_t startBufferSegments = GetBufferSegmentCount(aReadState, ev);
int32_t currentSegment = aReadState.mSegment;
// Move to the next segment to read
aReadState.mSegment += 1;
- SegmentChangeResult result = SegmentNotChanged;
-
// If this was the last reference to the first segment, then remove it.
if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
// shift write and read segment index (-1 indicates an empty buffer).
mWriteSegment -= 1;
// Directly modify the current read state. If the associated input
// stream is closed simultaneous with reading, then it may not be
@@ -732,18 +756,16 @@ nsPipe::AdvanceReadSegment(nsPipeReadSta
continue;
}
mInputList[i]->ReadState().mSegment -= 1;
}
// done with this segment
mBuffer.DeleteFirstSegment();
LOG(("III deleting first segment\n"));
-
- result = SegmentDeleted;
}
if (mWriteSegment < aReadState.mSegment) {
// read cursor has hit the end of written data, so reset it
MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
aReadState.mReadCursor = nullptr;
aReadState.mReadLimit = nullptr;
// also, the buffer is completely empty, so reset the write cursor
@@ -756,17 +778,29 @@ nsPipe::AdvanceReadSegment(nsPipeReadSta
aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
if (mWriteSegment == aReadState.mSegment) {
aReadState.mReadLimit = mWriteCursor;
} else {
aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
}
}
- return result;
+ // Calculate how many segments are buffered for the stream after
+ // reading.
+ uint32_t endBufferSegments = GetBufferSegmentCount(aReadState, ev);
+
+ // If the stream has read a segment out of the set of advanced buffer
+ // segments, then the writer may advance.
+ if (startBufferSegments >= mMaxAdvanceBufferSegmentCount &&
+ endBufferSegments < mMaxAdvanceBufferSegmentCount) {
+ return SegmentAdvanceBufferRead;
+ }
+
+ // Otherwise there are no significant changes to the segment structure.
+ return SegmentNotChanged;
}
void
nsPipe::DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents)
{
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
// If a segment is actively being read in ReadSegments() for this input
@@ -777,33 +811,33 @@ nsPipe::DrainInputStream(nsPipeReadState
if (aReadState.mActiveRead) {
MOZ_ASSERT(!aReadState.mNeedDrain);
aReadState.mNeedDrain = true;
return;
}
aReadState.mAvailable = 0;
- SegmentChangeResult result = SegmentNotChanged;
while(mWriteSegment >= aReadState.mSegment) {
// If the last segment to free is still being written to, we're done
// draining. We can't free any more.
if (ReadSegmentBeingWritten(aReadState)) {
break;
}
- if (AdvanceReadSegment(aReadState) == SegmentDeleted) {
- result = SegmentDeleted;
- }
+ // Don't bother checking if this results in an advance buffer segment
+ // read. Since we are draining the entire stream we will read an
+ // advance buffer segment no matter what.
+ AdvanceReadSegment(aReadState, mon);
}
- // if we've free'd up a segment, notify output stream that pipe has
- // room for a new segment.
- if (result == SegmentDeleted &&
+ // If we have read any segments from the advance buffer then we can
+ // potentially notify blocked writers.
+ if (!IsAdvanceBufferFull(mon) &&
mOutput.OnOutputWritable(aEvents) == NotifyMonitor) {
mon.NotifyAll();
}
}
bool
nsPipe::ReadSegmentBeingWritten(nsPipeReadState& aReadState)
{
@@ -821,21 +855,28 @@ nsPipe::GetWriteSegment(char*& aSegment,
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
if (NS_FAILED(mStatus)) {
return mStatus;
}
// write cursor and limit may both be null indicating an empty buffer.
if (mWriteCursor == mWriteLimit) {
- char* seg = mBuffer.AppendNewSegment();
- // pipe is full
- if (!seg) {
+ // The pipe is full if we have hit our limit on advance data buffering.
+ // This means the fastest reader is still reading slower than data is
+ // being written into the pipe.
+ if (IsAdvanceBufferFull(mon)) {
return NS_BASE_STREAM_WOULD_BLOCK;
}
+
+ // The nsSegmentedBuffer is configured to be "infinite", so this
+ // should never return nullptr here.
+ char* seg = mBuffer.AppendNewSegment();
+ MOZ_DIAGNOSTIC_ASSERT(seg);
+
LOG(("OOO appended new segment\n"));
mWriteCursor = seg;
mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
++mWriteSegment;
}
// make sure read cursor is initialized
SetAllNullReadCursors();
@@ -872,19 +913,17 @@ nsPipe::AdvanceWriteCursor(uint32_t aByt
UpdateAllReadCursors(newWriteCursor);
mWriteCursor = newWriteCursor;
ValidateAllReadCursors();
// update the writable flag on the output stream
if (mWriteCursor == mWriteLimit) {
- if (mBuffer.GetSize() >= mBuffer.GetMaxSize()) {
- mOutput.SetWritable(false);
- }
+ mOutput.SetWritable(!IsAdvanceBufferFull(mon));
}
// notify input stream that pipe now contains additional data
bool needNotify = false;
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
if (mInputList[i]->OnInputReadable(aBytesWritten, events, mon)
== NotifyMonitor) {
needNotify = true;
@@ -1096,16 +1135,78 @@ nsPipe::ValidateAllReadCursors()
NS_ASSERTION(state.mReadCursor != mWriteCursor ||
(mBuffer.GetSegment(state.mSegment) == state.mReadCursor &&
mWriteCursor == mWriteLimit),
"read cursor is bad");
}
#endif
}
+uint32_t
+nsPipe::GetBufferSegmentCount(const nsPipeReadState& aReadState,
+ const ReentrantMonitorAutoEnter& ev) const
+{
+ // The write segment can be smaller than the current reader position
+ // in some cases. For example, when the first write segment has not
+ // been allocated yet mWriteSegment is negative. In these cases
+ // the stream is effectively using zero segments.
+ if (mWriteSegment < aReadState.mSegment) {
+ return 0;
+ }
+
+ MOZ_ASSERT(mWriteSegment >= 0);
+ MOZ_ASSERT(aReadState.mSegment >= 0);
+
+ // Otherwise at least one segment is being used. We add one here
+ // since a single segment is being used when the write and read
+ // segment indices are the same.
+ return 1 + mWriteSegment - aReadState.mSegment;
+}
+
+bool
+nsPipe::IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const
+{
+ // If we have fewer total segments than the limit we can immediately
+ // determine we are not full. Note, we must add one to mWriteSegment
+ // to convert from a index to a count.
+ MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= -1);
+ MOZ_DIAGNOSTIC_ASSERT(mWriteSegment < INT32_MAX);
+ uint32_t totalWriteSegments = mWriteSegment + 1;
+ if (totalWriteSegments < mMaxAdvanceBufferSegmentCount) {
+ return false;
+ }
+
+ // Otherwise we must inspect all of our reader streams. We need
+ // to determine the buffer depth of the fastest reader.
+ uint32_t minBufferSegments = UINT32_MAX;
+ for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+ // Only count buffer segments from input streams that are open.
+ if (NS_FAILED(mInputList[i]->Status(ev))) {
+ continue;
+ }
+ const nsPipeReadState& state = mInputList[i]->ReadState();
+ uint32_t bufferSegments = GetBufferSegmentCount(state, ev);
+ minBufferSegments = std::min(minBufferSegments, bufferSegments);
+ // We only care if any reader has fewer segments buffered than
+ // our threshold. We can stop once we hit that threshold.
+ if (minBufferSegments < mMaxAdvanceBufferSegmentCount) {
+ return false;
+ }
+ }
+
+ // Note, its possible for minBufferSegments to exceed our
+ // mMaxAdvanceBufferSegmentCount here. This happens when a cloned
+ // reader gets far behind, but then the fastest reader stream is
+ // closed. This leaves us with a single stream that is buffered
+ // beyond our max. Naturally we continue to indicate the pipe
+ // is full at this point.
+
+ return true;
+}
+
//-----------------------------------------------------------------------------
// nsPipeEvents methods:
//-----------------------------------------------------------------------------
nsPipeEvents::~nsPipeEvents()
{
// dispatch any pending events