Bug 1100398 P4 Make nsPipeInputStream cloneable. r=froydnj
authorBen Kelly <ben@wanderview.com>
Tue, 10 Feb 2015 23:55:43 -0500
changeset 228678 ef97268b8d6a5b55bb3965fe4f96a3b5b995855a
parent 228677 db0d65c0b7a7cecdeb2b8612729f19d03f49dffd
child 228679 48906d15a28eafe9a84d7607f17bcd6661a540fb
push id55493
push userbkelly@mozilla.com
push dateWed, 11 Feb 2015 19:57:53 +0000
treeherdermozilla-inbound@48906d15a28e [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersfroydnj
bugs1100398
milestone38.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1100398 P4 Make nsPipeInputStream cloneable. r=froydnj
xpcom/io/nsPipe3.cpp
xpcom/tests/gtest/TestPipes.cpp
--- a/xpcom/io/nsPipe3.cpp
+++ b/xpcom/io/nsPipe3.cpp
@@ -1,20 +1,22 @@
 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "mozilla/Attributes.h"
 #include "mozilla/ReentrantMonitor.h"
+#include "nsICloneableInputStream.h"
 #include "nsIPipe.h"
 #include "nsIEventTarget.h"
 #include "nsISeekableStream.h"
 #include "nsIProgrammingLanguage.h"
+#include "nsRefPtr.h"
 #include "nsSegmentedBuffer.h"
 #include "nsStreamUtils.h"
 #include "nsCOMPtr.h"
 #include "nsCRT.h"
 #include "prlog.h"
 #include "nsIClassInfoImpl.h"
 #include "nsAlgorithm.h"
 #include "nsMemory.h"
@@ -83,83 +85,122 @@ private:
   nsCOMPtr<nsIAsyncInputStream>     mInputStream;
   nsCOMPtr<nsIInputStreamCallback>  mInputCallback;
   nsCOMPtr<nsIAsyncOutputStream>    mOutputStream;
   nsCOMPtr<nsIOutputStreamCallback> mOutputCallback;
 };
 
 //-----------------------------------------------------------------------------
 
-// the input end of a pipe (allocated as a member of the pipe).
+// This class is used to maintain input stream state.  Its broken out from the
+// nsPipeInputStream class because generally the nsPipe should be modifying
+// this state and not the input stream itself.
+struct nsPipeReadState
+{
+  nsPipeReadState()
+    : mReadCursor(nullptr)
+    , mReadLimit(nullptr)
+    , mSegment(0)
+  { }
+
+  char*    mReadCursor;
+  char*    mReadLimit;
+  int32_t  mSegment;
+};
+
+//-----------------------------------------------------------------------------
+
+// an input end of a pipe (maintained as a list of refs within the pipe)
 class nsPipeInputStream
   : public nsIAsyncInputStream
   , public nsISeekableStream
   , public nsISearchableInputStream
+  , public nsICloneableInputStream
   , public nsIClassInfo
 {
 public:
-  // since this class will be allocated as a member of the pipe, we do not
-  // need our own ref count.  instead, we share the lifetime (the ref count)
-  // of the entire pipe.  this macro is just convenience since it does not
-  // declare a mRefCount variable; however, don't let the name fool you...
-  // we are not inheriting from nsPipe ;-)
-  NS_DECL_ISUPPORTS_INHERITED
-
+  NS_DECL_THREADSAFE_ISUPPORTS
   NS_DECL_NSIINPUTSTREAM
   NS_DECL_NSIASYNCINPUTSTREAM
   NS_DECL_NSISEEKABLESTREAM
   NS_DECL_NSISEARCHABLEINPUTSTREAM
+  NS_DECL_NSICLONEABLEINPUTSTREAM
   NS_DECL_NSICLASSINFO
 
   explicit nsPipeInputStream(nsPipe* aPipe)
     : mPipe(aPipe)
-    , mReaderRefCnt(0)
     , mLogicalOffset(0)
+    , mInputStatus(NS_OK)
     , mBlocking(true)
     , mBlocked(false)
     , mAvailable(0)
     , mCallbackFlags(0)
   { }
 
+  explicit nsPipeInputStream(const nsPipeInputStream& aOther)
+    : mPipe(aOther.mPipe)
+    , mLogicalOffset(aOther.mLogicalOffset)
+    , mInputStatus(aOther.mInputStatus)
+    , mBlocking(aOther.mBlocking)
+    , mBlocked(false)
+    , mAvailable(aOther.mAvailable)
+    , mCallbackFlags(0)
+    , mReadState(aOther.mReadState)
+  { }
+
   nsresult Fill();
   void SetNonBlocking(bool aNonBlocking)
   {
     mBlocking = !aNonBlocking;
   }
 
   uint32_t Available()
   {
     return mAvailable;
   }
-  void ReduceAvailable(uint32_t aAvail)
-  {
-    mAvailable -= aAvail;
-  }
 
   // synchronously wait for the pipe to become readable.
   nsresult Wait();
 
   // these functions return true to indicate that the pipe's monitor should
   // be notified, to wake up a blocked reader if any.
   bool OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&);
   bool OnInputException(nsresult, nsPipeEvents&);
 
-private:
-  nsPipe*                        mPipe;
+  nsPipeReadState& ReadState()
+  {
+    return mReadState;
+  }
+
+  const nsPipeReadState& ReadState() const
+  {
+    return mReadState;
+  }
 
-  // separate refcnt so that we know when to close the consumer
-  mozilla::ThreadSafeAutoRefCnt  mReaderRefCnt;
+  nsresult Status() const;
+
+private:
+  virtual ~nsPipeInputStream();
+
+  nsRefPtr<nsPipe>               mPipe;
+
   int64_t                        mLogicalOffset;
+  // Individual input streams can be closed without effecting the rest of the
+  // pipe.  So track individual input stream status separately.
+  nsresult                       mInputStatus;
   bool                           mBlocking;
 
   // these variables can only be accessed while inside the pipe's monitor
   bool                           mBlocked;
   uint32_t                       mAvailable;
   nsCOMPtr<nsIInputStreamCallback> mCallback;
   uint32_t                       mCallbackFlags;
+
+  // treat as an opaque token to pass to nsPipe
+  nsPipeReadState                mReadState;
 };
 
 //-----------------------------------------------------------------------------
 
 // the output end of a pipe (allocated as a member of the pipe).
 class nsPipeOutputStream
   : public nsIAsyncOutputStream
   , public nsIClassInfo
@@ -235,62 +276,85 @@ public:
 private:
   ~nsPipe();
 
 public:
   //
   // methods below may only be called while inside the pipe's monitor
   //
 
-  void PeekSegment(uint32_t aIndex, char*& aCursor, char*& aLimit);
+  void PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
+                   char*& aCursor, char*& aLimit);
 
   //
   // methods below may be called while outside the pipe's monitor
   //
 
-  nsresult GetReadSegment(const char*& aSegment, uint32_t& aSegmentLen);
-  void     AdvanceReadCursor(uint32_t aCount);
+  nsresult GetReadSegment(const nsPipeReadState& aReadState,
+                          const char*& aSegment, uint32_t& aSegmentLen);
+  void     AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount,
+                             uint32_t* aAvailableOut);
 
   nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen);
   void     AdvanceWriteCursor(uint32_t aCount);
 
+  void     OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason);
   void     OnPipeException(nsresult aReason, bool aOutputOnly = false);
 
+  nsresult CloneInputStream(nsPipeInputStream* aOriginal,
+                            nsIInputStream** aCloneOut);
+
 protected:
+  // Only callable with mReetrantMonitor locked
+  uint32_t CountSegmentReferences(int32_t aSegment);
+  void SetAllNullReadCursors();
+  bool AllReadCursorsMatchWriteCursor();
+  void RollBackAllReadCursors(char* aWriteCursor);
+  void UpdateAllReadCursors(char* aWriteCursor);
+  void ValidateAllReadCursors();
+
   // We can't inherit from both nsIInputStream and nsIOutputStream
   // because they collide on their Close method. Consequently we nest their
   // implementations to avoid the extra object allocation.
-  nsPipeInputStream   mInput;
   nsPipeOutputStream  mOutput;
 
+  // Since the input stream can be cloned, we may have more than one.  Use
+  // a weak reference as the streams will clear their entry here in their
+  // destructor.  Using a strong reference would create a reference cycle.
+  // Only usable while mReentrantMonitor is locked.
+  nsTArray<nsPipeInputStream*> mInputList;
+
+  // But hold a strong ref to our original input stream.  For backward
+  // compatibility we need to be able to consistently return this same
+  // object from GetInputStream().  Note, mOriginalInput is also stored
+  // in mInputList as a weak ref.
+  nsRefPtr<nsPipeInputStream> mOriginalInput;
+
   ReentrantMonitor    mReentrantMonitor;
   nsSegmentedBuffer   mBuffer;
 
-  char*               mReadCursor;
-  char*               mReadLimit;
-
   int32_t             mWriteSegment;
   char*               mWriteCursor;
   char*               mWriteLimit;
 
   nsresult            mStatus;
   bool                mInited;
 };
 
 //
 // NOTES on buffer architecture:
 //
 //       +-----------------+ - - mBuffer.GetSegment(0)
 //       |                 |
-//       + - - - - - - - - + - - mReadCursor
+//       + - - - - - - - - + - - nsPipeReadState.mReadCursor
 //       |/////////////////|
 //       |/////////////////|
 //       |/////////////////|
 //       |/////////////////|
-//       +-----------------+ - - mReadLimit
+//       +-----------------+ - - nsPipeReadState.mReadLimit
 //                |
 //       +-----------------+
 //       |/////////////////|
 //       |/////////////////|
 //       |/////////////////|
 //       |/////////////////|
 //       |/////////////////|
 //       |/////////////////|
@@ -302,46 +366,68 @@ protected:
 //       |/////////////////|
 //       + - - - - - - - - + - - mWriteCursor
 //       |                 |
 //       |                 |
 //       +-----------------+ - - mWriteLimit
 //
 // (shaded region contains data)
 //
+// NOTE: Each input stream produced by the nsPipe contains its own, separate
+//       nsPipeReadState.  This means there are multiple mReadCursor and
+//       mReadLimit values in play.  The pipe cannot discard old data until
+//       all mReadCursors have moved beyond that point in the stream.
+//
 // NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
 // small allocations (e.g., 64 byte allocations).  this means that buffers may
 // be allocated back-to-back.  in the diagram above, for example, mReadLimit
 // would actually be pointing at the beginning of the next segment.  when
 // making changes to this file, please keep this fact in mind.
 //
 
 //-----------------------------------------------------------------------------
 // nsPipe methods:
 //-----------------------------------------------------------------------------
 
 nsPipe::nsPipe()
-  : mInput(this)
-  , mOutput(this)
+  : mOutput(this)
+  , mOriginalInput(new nsPipeInputStream(this))
   , mReentrantMonitor("nsPipe.mReentrantMonitor")
-  , mReadCursor(nullptr)
-  , mReadLimit(nullptr)
   , mWriteSegment(-1)
   , mWriteCursor(nullptr)
   , mWriteLimit(nullptr)
   , mStatus(NS_OK)
   , mInited(false)
 {
+  mInputList.AppendElement(mOriginalInput);
 }
 
 nsPipe::~nsPipe()
 {
 }
 
-NS_IMPL_ISUPPORTS(nsPipe, nsIPipe)
+NS_IMPL_ADDREF(nsPipe)
+NS_IMPL_QUERY_INTERFACE(nsPipe, nsIPipe)
+
+NS_IMETHODIMP_(MozExternalRefCountType)
+nsPipe::Release()
+{
+  MOZ_ASSERT(int32_t(mRefCnt) > 0, "dup release");
+  nsrefcnt count = --mRefCnt;
+  NS_LOG_RELEASE(this, count, "nsPipe");
+  if (count == 0) {
+    delete (this);
+    return 0;
+  }
+  if (mOriginalInput && count == 1) {
+    mOriginalInput = nullptr;
+    return 1;
+  }
+  return count;
+}
 
 NS_IMETHODIMP
 nsPipe::Init(bool aNonBlockingIn,
              bool aNonBlockingOut,
              uint32_t aSegmentSize,
              uint32_t aSegmentCount)
 {
   mInited = true;
@@ -359,122 +445,148 @@ nsPipe::Init(bool aNonBlockingIn,
     aSegmentCount = maxCount;
   }
 
   nsresult rv = mBuffer.Init(aSegmentSize, aSegmentSize * aSegmentCount);
   if (NS_FAILED(rv)) {
     return rv;
   }
 
-  mInput.SetNonBlocking(aNonBlockingIn);
   mOutput.SetNonBlocking(aNonBlockingOut);
+  mOriginalInput->SetNonBlocking(aNonBlockingIn);
+
   return NS_OK;
 }
 
 NS_IMETHODIMP
 nsPipe::GetInputStream(nsIAsyncInputStream** aInputStream)
 {
-  NS_ADDREF(*aInputStream = &mInput);
+  nsRefPtr<nsPipeInputStream> ref = mOriginalInput;
+  ref.forget(aInputStream);
   return NS_OK;
 }
 
 NS_IMETHODIMP
 nsPipe::GetOutputStream(nsIAsyncOutputStream** aOutputStream)
 {
   if (NS_WARN_IF(!mInited)) {
     return NS_ERROR_NOT_INITIALIZED;
   }
   NS_ADDREF(*aOutputStream = &mOutput);
   return NS_OK;
 }
 
 void
-nsPipe::PeekSegment(uint32_t aIndex, char*& aCursor, char*& aLimit)
+nsPipe::PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
+                    char*& aCursor, char*& aLimit)
 {
   if (aIndex == 0) {
-    NS_ASSERTION(!mReadCursor || mBuffer.GetSegmentCount(), "unexpected state");
-    aCursor = mReadCursor;
-    aLimit = mReadLimit;
+    NS_ASSERTION(!aReadState.mReadCursor || mBuffer.GetSegmentCount(),
+                 "unexpected state");
+    aCursor = aReadState.mReadCursor;
+    aLimit = aReadState.mReadLimit;
   } else {
+    uint32_t absoluteIndex = aReadState.mSegment + aIndex;
     uint32_t numSegments = mBuffer.GetSegmentCount();
-    if (aIndex >= numSegments) {
+    if (absoluteIndex >= numSegments) {
       aCursor = aLimit = nullptr;
     } else {
-      aCursor = mBuffer.GetSegment(aIndex);
-      if (mWriteSegment == (int32_t)aIndex) {
+      aCursor = mBuffer.GetSegment(absoluteIndex);
+      if (mWriteSegment == (int32_t)absoluteIndex) {
         aLimit = mWriteCursor;
       } else {
         aLimit = aCursor + mBuffer.GetSegmentSize();
       }
     }
   }
 }
 
 nsresult
-nsPipe::GetReadSegment(const char*& aSegment, uint32_t& aSegmentLen)
+nsPipe::GetReadSegment(const nsPipeReadState& aReadState, const char*& aSegment,
+                       uint32_t& aSegmentLen)
 {
   ReentrantMonitorAutoEnter mon(mReentrantMonitor);
 
-  if (mReadCursor == mReadLimit) {
+  if (aReadState.mReadCursor == aReadState.mReadLimit) {
     return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK;
   }
 
-  aSegment    = mReadCursor;
-  aSegmentLen = mReadLimit - mReadCursor;
+  aSegment    = aReadState.mReadCursor;
+  aSegmentLen = aReadState.mReadLimit - aReadState.mReadCursor;
   return NS_OK;
 }
 
 void
-nsPipe::AdvanceReadCursor(uint32_t aBytesRead)
+nsPipe::AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aBytesRead,
+                          uint32_t* aAvailableOut)
 {
   NS_ASSERTION(aBytesRead, "don't call if no bytes read");
 
   nsPipeEvents events;
   {
     ReentrantMonitorAutoEnter mon(mReentrantMonitor);
 
     LOG(("III advancing read cursor by %u\n", aBytesRead));
     NS_ASSERTION(aBytesRead <= mBuffer.GetSegmentSize(), "read too much");
 
-    mReadCursor += aBytesRead;
-    NS_ASSERTION(mReadCursor <= mReadLimit, "read cursor exceeds limit");
+    aReadState.mReadCursor += aBytesRead;
+    NS_ASSERTION(aReadState.mReadCursor <= aReadState.mReadLimit,
+                 "read cursor exceeds limit");
 
-    mInput.ReduceAvailable(aBytesRead);
+    MOZ_ASSERT(*aAvailableOut >= aBytesRead);
+    *aAvailableOut -= aBytesRead;
 
-    if (mReadCursor == mReadLimit) {
+    if (aReadState.mReadCursor == aReadState.mReadLimit) {
       // we've reached the limit of how much we can read from this segment.
       // if at the end of this segment, then we must discard this segment.
 
       // if still writing in this segment then bail because we're not done
       // with the segment and have to wait for now...
-      if (mWriteSegment == 0 && mWriteLimit > mWriteCursor) {
-        NS_ASSERTION(mReadLimit == mWriteCursor, "unexpected state");
+      if (mWriteSegment == aReadState.mSegment && mWriteLimit > mWriteCursor) {
+        NS_ASSERTION(aReadState.mReadLimit == mWriteCursor, "unexpected state");
         return;
       }
 
-      // shift write segment index (-1 indicates an empty buffer).
-      --mWriteSegment;
+      uint32_t currentSegment = aReadState.mSegment;
+
+      // Move to the next segment to read
+      aReadState.mSegment += 1;
 
-      // done with this segment
-      mBuffer.DeleteFirstSegment();
-      LOG(("III deleting first segment\n"));
+      // If this was the last reference to the first segment, then remove it.
+      if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
+
+        // shift write and read segment index (-1 indicates an empty buffer).
+        mWriteSegment -= 1;
+
+        for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+          mInputList[i]->ReadState().mSegment -= 1;
+        }
 
-      if (mWriteSegment == -1) {
-        // buffer is completely empty
-        mReadCursor = nullptr;
-        mReadLimit = nullptr;
-        mWriteCursor = nullptr;
-        mWriteLimit = nullptr;
+        // done with this segment
+        mBuffer.DeleteFirstSegment();
+        LOG(("III deleting first segment\n"));
+      }
+
+      if (mWriteSegment < aReadState.mSegment) {
+        // read cursor has hit the end of written data, so reset it
+        MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
+        aReadState.mReadCursor = nullptr;
+        aReadState.mReadLimit = nullptr;
+        // also, the buffer is completely empty, so reset the write cursor
+        if (mWriteSegment == -1) {
+          mWriteCursor = nullptr;
+          mWriteLimit = nullptr;
+        }
       } else {
         // advance read cursor and limit to next buffer segment
-        mReadCursor = mBuffer.GetSegment(0);
-        if (mWriteSegment == 0) {
-          mReadLimit = mWriteCursor;
+        aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
+        if (mWriteSegment == aReadState.mSegment) {
+          aReadState.mReadLimit = mWriteCursor;
         } else {
-          mReadLimit = mReadCursor + mBuffer.GetSegmentSize();
+          aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
         }
       }
 
       // we've free'd up a segment, so notify output stream that pipe has
       // room for a new segment.
       if (mOutput.OnOutputWritable(events)) {
         mon.Notify();
       }
@@ -500,27 +612,25 @@ nsPipe::GetWriteSegment(char*& aSegment,
     }
     LOG(("OOO appended new segment\n"));
     mWriteCursor = seg;
     mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
     ++mWriteSegment;
   }
 
   // make sure read cursor is initialized
-  if (!mReadCursor) {
-    NS_ASSERTION(mWriteSegment == 0, "unexpected null read cursor");
-    mReadCursor = mReadLimit = mWriteCursor;
-  }
+  SetAllNullReadCursors();
 
   // check to see if we can roll-back our read and write cursors to the
   // beginning of the current/first segment.  this is purely an optimization.
-  if (mReadCursor == mWriteCursor && mWriteSegment == 0) {
+  if (mWriteSegment == 0 && AllReadCursorsMatchWriteCursor()) {
     char* head = mBuffer.GetSegment(0);
     LOG(("OOO rolling back write cursor %u bytes\n", mWriteCursor - head));
-    mWriteCursor = mReadCursor = mReadLimit = head;
+    RollBackAllReadCursors(head);
+    mWriteCursor = head;
   }
 
   aSegment    = mWriteCursor;
   aSegmentLen = mWriteLimit - mWriteCursor;
   return NS_OK;
 }
 
 void
@@ -533,56 +643,82 @@ nsPipe::AdvanceWriteCursor(uint32_t aByt
     ReentrantMonitorAutoEnter mon(mReentrantMonitor);
 
     LOG(("OOO advancing write cursor by %u\n", aBytesWritten));
 
     char* newWriteCursor = mWriteCursor + aBytesWritten;
     NS_ASSERTION(newWriteCursor <= mWriteLimit, "write cursor exceeds limit");
 
     // update read limit if reading in the same segment
-    if (mWriteSegment == 0 && mReadLimit == mWriteCursor) {
-      mReadLimit = newWriteCursor;
-    }
+    UpdateAllReadCursors(newWriteCursor);
 
     mWriteCursor = newWriteCursor;
 
-    // The only way mReadCursor == mWriteCursor is if:
-    //
-    // - mReadCursor is at the start of a segment (which, based on how
-    //   nsSegmentedBuffer works, means that this segment is the "first"
-    //   segment)
-    // - mWriteCursor points at the location past the end of the current
-    //   write segment (so the current write filled the current write
-    //   segment, so we've incremented mWriteCursor to point past the end
-    //   of it)
-    // - the segment to which data has just been written is located
-    //   exactly one segment's worth of bytes before the first segment
-    //   where mReadCursor is located
-    //
-    // Consequently, the byte immediately after the end of the current
-    // write segment is the first byte of the first segment, so
-    // mReadCursor == mWriteCursor.  (Another way to think about this is
-    // to consider the buffer architecture diagram above, but consider it
-    // with an arena allocator which allocates from the *end* of the
-    // arena to the *beginning* of the arena.)
-    NS_ASSERTION(mReadCursor != mWriteCursor ||
-                 (mBuffer.GetSegment(0) == mReadCursor &&
-                  mWriteCursor == mWriteLimit),
-                 "read cursor is bad");
+    ValidateAllReadCursors();
 
     // update the writable flag on the output stream
     if (mWriteCursor == mWriteLimit) {
       if (mBuffer.GetSize() >= mBuffer.GetMaxSize()) {
         mOutput.SetWritable(false);
       }
     }
 
     // notify input stream that pipe now contains additional data
-    if (mInput.OnInputReadable(aBytesWritten, events)) {
-      mon.Notify();
+    bool needNotify = false;
+    for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+      if (mInputList[i]->OnInputReadable(aBytesWritten, events)) {
+        needNotify = true;
+      }
+    }
+
+    if (needNotify) {
+      mon.NotifyAll();
+    }
+  }
+}
+
+void
+nsPipe::OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason)
+{
+  MOZ_ASSERT(NS_FAILED(aReason));
+
+  nsPipeEvents events;
+  {
+    ReentrantMonitorAutoEnter mon(mReentrantMonitor);
+
+    // Its possible to re-enter this method when we call OnPipeException() or
+    // OnInputExection() below.  If there is a caller stuck in our synchronous
+    // Wait() method, then they will get woken up with a failure code which
+    // re-enters this method.  Therefore, gracefully handle unknown streams
+    // here.
+
+    // If we only have one stream open and it is the given stream, then shut
+    // down the entire pipe.
+    if (mInputList.Length() == 1) {
+      if (mInputList[0] == aStream) {
+        OnPipeException(aReason);
+      }
+      return;
+    }
+
+    // Otherwise just close the particular stream that hit an exception.
+    for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+      if (mInputList[i] != aStream) {
+        continue;
+      }
+
+      bool needNotify = mInputList[i]->OnInputException(aReason, events);
+      mInputList.RemoveElementAt(i);
+
+      // Notify after element is removed in case we re-enter as a result.
+      if (needNotify) {
+        mon.Notify();
+      }
+
+      return;
     }
   }
 }
 
 void
 nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly)
 {
   LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n",
@@ -594,31 +730,155 @@ nsPipe::OnPipeException(nsresult aReason
 
     // if we've already hit an exception, then ignore this one.
     if (NS_FAILED(mStatus)) {
       return;
     }
 
     mStatus = aReason;
 
-    // an output-only exception applies to the input end if the pipe has
-    // zero bytes available.
-    if (aOutputOnly && !mInput.Available()) {
-      aOutputOnly = false;
+    bool needNotify = false;
+
+    nsTArray<nsPipeInputStream*> tmpInputList;
+    for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+      // an output-only exception applies to the input end if the pipe has
+      // zero bytes available.
+      if (aOutputOnly && mInputList[i]->Available()) {
+        tmpInputList.AppendElement(mInputList[i]);
+        continue;
+      }
+
+      if (mInputList[i]->OnInputException(aReason, events)) {
+        needNotify = true;
+      }
+    }
+    mInputList = tmpInputList;
+
+    if (mOutput.OnOutputException(aReason, events)) {
+      needNotify = true;
     }
 
-    if (!aOutputOnly)
-      if (mInput.OnInputException(aReason, events)) {
-        mon.Notify();
-      }
+    // Notify after we have removed any input streams from mInputList
+    if (needNotify) {
+      mon.NotifyAll();
+    }
+  }
+}
+
+nsresult
+nsPipe::CloneInputStream(nsPipeInputStream* aOriginal,
+                         nsIInputStream** aCloneOut)
+{
+  ReentrantMonitorAutoEnter mon(mReentrantMonitor);
+  nsRefPtr<nsPipeInputStream> ref = new nsPipeInputStream(*aOriginal);
+  mInputList.AppendElement(ref);
+  ref.forget(aCloneOut);
+  return NS_OK;
+}
 
-    if (mOutput.OnOutputException(aReason, events)) {
-      mon.Notify();
+uint32_t
+nsPipe::CountSegmentReferences(int32_t aSegment)
+{
+  mReentrantMonitor.AssertCurrentThreadIn();
+  uint32_t count = 0;
+  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+    if (aSegment >= mInputList[i]->ReadState().mSegment) {
+      count += 1;
+    }
+  }
+  return count;
+}
+
+void
+nsPipe::SetAllNullReadCursors()
+{
+  mReentrantMonitor.AssertCurrentThreadIn();
+  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+    nsPipeReadState& readState = mInputList[i]->ReadState();
+    if (!readState.mReadCursor) {
+      NS_ASSERTION(mWriteSegment == readState.mSegment,
+                   "unexpected null read cursor");
+      readState.mReadCursor = readState.mReadLimit = mWriteCursor;
+    }
+  }
+}
+
+bool
+nsPipe::AllReadCursorsMatchWriteCursor()
+{
+  mReentrantMonitor.AssertCurrentThreadIn();
+  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+    const nsPipeReadState& readState = mInputList[i]->ReadState();
+    if (readState.mSegment != mWriteSegment ||
+        readState.mReadCursor != mWriteCursor) {
+      return false;
     }
   }
+  return true;
+}
+
+void
+nsPipe::RollBackAllReadCursors(char* aWriteCursor)
+{
+  mReentrantMonitor.AssertCurrentThreadIn();
+  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+    nsPipeReadState& readState = mInputList[i]->ReadState();
+    MOZ_ASSERT(mWriteSegment == readState.mSegment);
+    MOZ_ASSERT(mWriteCursor == readState.mReadCursor);
+    MOZ_ASSERT(mWriteCursor == readState.mReadLimit);
+    readState.mReadCursor = aWriteCursor;
+    readState.mReadLimit = aWriteCursor;
+  }
+}
+
+void
+nsPipe::UpdateAllReadCursors(char* aWriteCursor)
+{
+  mReentrantMonitor.AssertCurrentThreadIn();
+  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+    nsPipeReadState& readState = mInputList[i]->ReadState();
+    if (mWriteSegment == readState.mSegment &&
+        readState.mReadLimit == mWriteCursor) {
+      readState.mReadLimit = aWriteCursor;
+    }
+  }
+}
+
+void
+nsPipe::ValidateAllReadCursors()
+{
+  mReentrantMonitor.AssertCurrentThreadIn();
+  // The only way mReadCursor == mWriteCursor is if:
+  //
+  // - mReadCursor is at the start of a segment (which, based on how
+  //   nsSegmentedBuffer works, means that this segment is the "first"
+  //   segment)
+  // - mWriteCursor points at the location past the end of the current
+  //   write segment (so the current write filled the current write
+  //   segment, so we've incremented mWriteCursor to point past the end
+  //   of it)
+  // - the segment to which data has just been written is located
+  //   exactly one segment's worth of bytes before the first segment
+  //   where mReadCursor is located
+  //
+  // Consequently, the byte immediately after the end of the current
+  // write segment is the first byte of the first segment, so
+  // mReadCursor == mWriteCursor.  (Another way to think about this is
+  // to consider the buffer architecture diagram above, but consider it
+  // with an arena allocator which allocates from the *end* of the
+  // arena to the *beginning* of the arena.)
+#ifdef DEBUG
+  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+    const nsPipeReadState& state = mInputList[i]->ReadState();
+    NS_ASSERTION(state.mReadCursor != mWriteCursor ||
+                 (mBuffer.GetSegment(state.mSegment) == state.mReadCursor &&
+                  mWriteCursor == mWriteLimit),
+                 "read cursor is bad");
+  }
+#endif
 }
 
 //-----------------------------------------------------------------------------
 // nsPipeEvents methods:
 //-----------------------------------------------------------------------------
 
 nsPipeEvents::~nsPipeEvents()
 {
@@ -635,50 +895,55 @@ nsPipeEvents::~nsPipeEvents()
     mOutputStream = 0;
   }
 }
 
 //-----------------------------------------------------------------------------
 // nsPipeInputStream methods:
 //-----------------------------------------------------------------------------
 
+NS_IMPL_ADDREF(nsPipeInputStream);
+NS_IMPL_RELEASE(nsPipeInputStream);
+
 NS_IMPL_QUERY_INTERFACE(nsPipeInputStream,
                         nsIInputStream,
                         nsIAsyncInputStream,
                         nsISeekableStream,
                         nsISearchableInputStream,
+                        nsICloneableInputStream,
                         nsIClassInfo)
 
 NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream,
                             nsIInputStream,
                             nsIAsyncInputStream,
                             nsISeekableStream,
-                            nsISearchableInputStream)
+                            nsISearchableInputStream,
+                            nsICloneableInputStream)
 
 NS_IMPL_THREADSAFE_CI(nsPipeInputStream)
 
 nsresult
 nsPipeInputStream::Wait()
 {
   NS_ASSERTION(mBlocking, "wait on non-blocking pipe input stream");
 
   ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
 
-  while (NS_SUCCEEDED(mPipe->mStatus) && (mAvailable == 0)) {
+  while (NS_SUCCEEDED(Status()) && (mAvailable == 0)) {
     LOG(("III pipe input: waiting for data\n"));
 
     mBlocked = true;
     mon.Wait();
     mBlocked = false;
 
-    LOG(("III pipe input: woke up [pipe-status=%x available=%u]\n",
-         mPipe->mStatus, mAvailable));
+    LOG(("III pipe input: woke up [status=%x available=%u]\n",
+         Status(), mAvailable));
   }
 
-  return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
+  return Status() == NS_BASE_STREAM_CLOSED ? NS_OK : Status();
 }
 
 bool
 nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten, nsPipeEvents& aEvents)
 {
   bool result = false;
 
   mAvailable += aBytesWritten;
@@ -699,74 +964,66 @@ nsPipeInputStream::OnInputException(nsre
 {
   LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n",
        this, aReason));
 
   bool result = false;
 
   NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception");
 
+  if (NS_SUCCEEDED(mInputStatus)) {
+    mInputStatus = aReason;
+  }
+
   // force count of available bytes to zero.
   mAvailable = 0;
 
   if (mCallback) {
     aEvents.NotifyInputReady(this, mCallback);
     mCallback = 0;
     mCallbackFlags = 0;
   } else if (mBlocked) {
     result = true;
   }
 
   return result;
 }
 
-NS_IMETHODIMP_(MozExternalRefCountType)
-nsPipeInputStream::AddRef(void)
-{
-  ++mReaderRefCnt;
-  return mPipe->AddRef();
-}
-
-NS_IMETHODIMP_(MozExternalRefCountType)
-nsPipeInputStream::Release(void)
-{
-  if (--mReaderRefCnt == 0) {
-    Close();
-  }
-  return mPipe->Release();
-}
-
 NS_IMETHODIMP
 nsPipeInputStream::CloseWithStatus(nsresult aReason)
 {
   LOG(("III CloseWithStatus [this=%x reason=%x]\n", this, aReason));
 
+  if (NS_FAILED(mInputStatus)) {
+    return NS_OK;
+  }
+
   if (NS_SUCCEEDED(aReason)) {
     aReason = NS_BASE_STREAM_CLOSED;
   }
 
-  mPipe->OnPipeException(aReason);
+  mPipe->OnInputStreamException(this, aReason);
   return NS_OK;
 }
 
 NS_IMETHODIMP
 nsPipeInputStream::Close()
 {
   return CloseWithStatus(NS_BASE_STREAM_CLOSED);
 }
 
 NS_IMETHODIMP
 nsPipeInputStream::Available(uint64_t* aResult)
 {
   // nsPipeInputStream supports under 4GB stream only
   ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
 
-  // return error if pipe closed
-  if (!mAvailable && NS_FAILED(mPipe->mStatus)) {
-    return mPipe->mStatus;
+  // return error if closed
+  if (!mAvailable && NS_FAILED(Status())) {
+    return Status();
   }
 
   *aResult = (uint64_t)mAvailable;
   return NS_OK;
 }
 
 NS_IMETHODIMP
 nsPipeInputStream::ReadSegments(nsWriteSegmentFun aWriter,
@@ -778,17 +1035,17 @@ nsPipeInputStream::ReadSegments(nsWriteS
 
   nsresult rv = NS_OK;
 
   const char* segment;
   uint32_t segmentLen;
 
   *aReadCount = 0;
   while (aCount) {
-    rv = mPipe->GetReadSegment(segment, segmentLen);
+    rv = mPipe->GetReadSegment(mReadState, segment, segmentLen);
     if (NS_FAILED(rv)) {
       // ignore this error if we've already read something.
       if (*aReadCount > 0) {
         rv = NS_OK;
         break;
       }
       if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
         // pipe is empty
@@ -801,17 +1058,17 @@ nsPipeInputStream::ReadSegments(nsWriteS
           continue;
         }
       }
       // ignore this error, just return.
       if (rv == NS_BASE_STREAM_CLOSED) {
         rv = NS_OK;
         break;
       }
-      mPipe->OnPipeException(rv);
+      mPipe->OnInputStreamException(this, rv);
       break;
     }
 
     // read no more than aCount
     if (segmentLen > aCount) {
       segmentLen = aCount;
     }
 
@@ -833,17 +1090,17 @@ nsPipeInputStream::ReadSegments(nsWriteS
       segment += writeCount;
       segmentLen -= writeCount;
       aCount -= writeCount;
       *aReadCount += writeCount;
       mLogicalOffset += writeCount;
     }
 
     if (segmentLen < originalLen) {
-      mPipe->AdvanceReadCursor(originalLen - segmentLen);
+      mPipe->AdvanceReadCursor(mReadState, originalLen - segmentLen, &mAvailable);
     }
   }
 
   return rv;
 }
 
 NS_IMETHODIMP
 nsPipeInputStream::Read(char* aToBuf, uint32_t aBufLen, uint32_t* aReadCount)
@@ -879,18 +1136,17 @@ nsPipeInputStream::AsyncWait(nsIInputStr
     }
 
     nsCOMPtr<nsIInputStreamCallback> proxy;
     if (aTarget) {
       proxy = NS_NewInputStreamReadyEvent(aCallback, aTarget);
       aCallback = proxy;
     }
 
-    if (NS_FAILED(mPipe->mStatus) ||
-        (mAvailable && !(aFlags & WAIT_CLOSURE_ONLY))) {
+    if (NS_FAILED(Status()) || (mAvailable && !(aFlags & WAIT_CLOSURE_ONLY))) {
       // stream is already closed or readable; post event.
       pipeEvents.NotifyInputReady(this, aCallback);
     } else {
       // queue up callback object to be notified when data becomes available
       mCallback = aCallback;
       mCallbackFlags = aFlags;
     }
   }
@@ -904,19 +1160,19 @@ nsPipeInputStream::Seek(int32_t aWhence,
   return NS_ERROR_NOT_IMPLEMENTED;
 }
 
 NS_IMETHODIMP
 nsPipeInputStream::Tell(int64_t* aOffset)
 {
   ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
 
-  // return error if pipe closed
-  if (!mAvailable && NS_FAILED(mPipe->mStatus)) {
-    return mPipe->mStatus;
+  // return error if closed
+  if (!mAvailable && NS_FAILED(Status())) {
+    return Status();
   }
 
   *aOffset = mLogicalOffset;
   return NS_OK;
 }
 
 NS_IMETHODIMP
 nsPipeInputStream::SetEOF()
@@ -940,17 +1196,17 @@ nsPipeInputStream::Search(const char* aF
 
   ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
 
   char* cursor1;
   char* limit1;
   uint32_t index = 0, offset = 0;
   uint32_t strLen = strlen(aForString);
 
-  mPipe->PeekSegment(0, cursor1, limit1);
+  mPipe->PeekSegment(mReadState, 0, cursor1, limit1);
   if (cursor1 == limit1) {
     *aFound = false;
     *aOffsetSearchedTo = 0;
     LOG(("  result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
     return NS_OK;
   }
 
   while (true) {
@@ -969,17 +1225,17 @@ nsPipeInputStream::Search(const char* aF
     // get the next segment
     char* cursor2;
     char* limit2;
     uint32_t len2;
 
     index++;
     offset += len1;
 
-    mPipe->PeekSegment(index, cursor2, limit2);
+    mPipe->PeekSegment(mReadState, index, cursor2, limit2);
     if (cursor2 == limit2) {
       *aFound = false;
       *aOffsetSearchedTo = offset - strLen + 1;
       LOG(("  result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
       return NS_OK;
     }
     len2 = limit2 - cursor2;
 
@@ -1003,16 +1259,40 @@ nsPipeInputStream::Search(const char* aF
     cursor1 = cursor2;
     limit1 = limit2;
   }
 
   NS_NOTREACHED("can't get here");
   return NS_ERROR_UNEXPECTED;    // keep compiler happy
 }
 
+NS_IMETHODIMP
+nsPipeInputStream::GetCloneable(bool* aCloneableOut)
+{
+  *aCloneableOut = true;
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::Clone(nsIInputStream** aCloneOut)
+{
+  return mPipe->CloneInputStream(this, aCloneOut);
+}
+
+nsresult
+nsPipeInputStream::Status() const
+{
+  return NS_FAILED(mInputStatus) ? mInputStatus : mPipe->mStatus;
+}
+
+nsPipeInputStream::~nsPipeInputStream()
+{
+  Close();
+}
+
 //-----------------------------------------------------------------------------
 // nsPipeOutputStream methods:
 //-----------------------------------------------------------------------------
 
 NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream,
                         nsIOutputStream,
                         nsIAsyncOutputStream,
                         nsIClassInfo)
--- a/xpcom/tests/gtest/TestPipes.cpp
+++ b/xpcom/tests/gtest/TestPipes.cpp
@@ -1,27 +1,33 @@
 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include <algorithm>
+#include "gtest/gtest.h"
+#include "Helpers.h"
+#include "mozilla/ReentrantMonitor.h"
+#include "nsCOMPtr.h"
+#include "nsCRT.h"
 #include "nsIAsyncInputStream.h"
 #include "nsIAsyncOutputStream.h"
+#include "nsICloneableInputStream.h"
+#include "nsIInputStream.h"
+#include "nsIOutputStream.h"
+#include "nsIPipe.h"
 #include "nsIThread.h"
 #include "nsIRunnable.h"
+#include "nsStreamUtils.h"
+#include "nsString.h"
 #include "nsThreadUtils.h"
 #include "prprf.h"
 #include "prinrval.h"
-#include "nsCRT.h"
-#include "nsIPipe.h"    // new implementation
 
-#include "mozilla/ReentrantMonitor.h"
-
-#include "gtest/gtest.h"
 using namespace mozilla;
 
 #define ITERATIONS      33333
 char kTestPattern[] = "My hovercraft is full of eels.\n";
 
 bool gTrace = false;
 
 static nsresult
@@ -384,8 +390,273 @@ RunTests(uint32_t segSize, uint32_t segC
     EXPECT_TRUE(NS_SUCCEEDED(rv));
 }
 
 TEST(Pipes, Main)
 {
     RunTests(16, 1);
     RunTests(4096, 16);
 }
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace {
+
+static const uint32_t DEFAULT_SEGMENT_SIZE = 4 * 1024;
+
+// An alternate pipe testing routing that uses NS_ConsumeStream() instead of
+// manual read loop.
+static void TestPipe2(uint32_t aNumBytes,
+                      uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE)
+{
+  nsCOMPtr<nsIInputStream> reader;
+  nsCOMPtr<nsIOutputStream> writer;
+
+  uint32_t maxSize = std::max(aNumBytes, aSegmentSize);
+
+  nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
+                           aSegmentSize, maxSize);
+  ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+  nsTArray<char> inputData;
+  testing::CreateData(aNumBytes, inputData);
+  testing::WriteAllAndClose(writer, inputData);
+  testing::ConsumeAndValidateStream(reader, inputData);
+}
+
+} // anonymous namespace
+
+TEST(Pipes, Blocking_32k)
+{
+  TestPipe2(32 * 1024);
+}
+
+TEST(Pipes, Blocking_64k)
+{
+  TestPipe2(64 * 1024);
+}
+
+TEST(Pipes, Blocking_128k)
+{
+  TestPipe2(128 * 1024);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace {
+
+// Utility routine to validate pipe clone before.  There are many knobs.
+//
+// aTotalBytes              Total number of bytes to write to the pipe.
+// aNumWrites               How many separate write calls should be made.  Bytes
+//                          are evenly distributed over these write calls.
+// aNumInitialClones        How many clones of the pipe input stream should be
+//                          made before writing begins.
+// aNumToCloseAfterWrite    How many streams should be closed after each write.
+//                          One stream is always kept open.  This verifies that
+//                          closing one stream does not effect other open
+//                          streams.
+// aNumToCloneAfterWrite    How many clones to create after each write.  Occurs
+//                          after closing any streams.  This tests cloning
+//                          active streams on a pipe that is being written to.
+// aNumStreamToReadPerWrite How many streams to read fully after each write.
+//                          This tests reading cloned streams at different rates
+//                          while the pipe is being written to.
+static void TestPipeClone(uint32_t aTotalBytes,
+                          uint32_t aNumWrites,
+                          uint32_t aNumInitialClones,
+                          uint32_t aNumToCloseAfterWrite,
+                          uint32_t aNumToCloneAfterWrite,
+                          uint32_t aNumStreamsToReadPerWrite,
+                          uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE)
+{
+  nsCOMPtr<nsIInputStream> reader;
+  nsCOMPtr<nsIOutputStream> writer;
+
+  uint32_t maxSize = std::max(aTotalBytes, aSegmentSize);
+
+  // Use async input streams so we can NS_ConsumeStream() the current data
+  // while the pipe is still being written to.
+  nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
+                           aSegmentSize, maxSize,
+                           true, false); // non-blocking - reader, writer
+  ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+  nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(reader);
+  ASSERT_TRUE(cloneable);
+  ASSERT_TRUE(cloneable->GetCloneable());
+
+  nsTArray<nsCString> outputDataList;
+
+  nsTArray<nsCOMPtr<nsIInputStream>> streamList;
+
+  // first stream is our original reader from the pipe
+  streamList.AppendElement(reader);
+  outputDataList.AppendElement();
+
+  // Clone the initial input stream the specified number of times
+  // before performing any writes.
+  for (uint32_t i = 0; i < aNumInitialClones; ++i) {
+    nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
+    rv = cloneable->Clone(getter_AddRefs(*clone));
+    ASSERT_TRUE(NS_SUCCEEDED(rv));
+    ASSERT_TRUE(*clone);
+
+    outputDataList.AppendElement();
+  }
+
+  nsTArray<char> inputData;
+  testing::CreateData(aTotalBytes, inputData);
+
+  const uint32_t bytesPerWrite = ((aTotalBytes - 1)/ aNumWrites) + 1;
+  uint32_t offset = 0;
+  uint32_t remaining = aTotalBytes;
+  uint32_t nextStreamToRead = 0;
+
+  while (remaining) {
+    uint32_t numToWrite = std::min(bytesPerWrite, remaining);
+    testing::Write(writer, inputData, offset, numToWrite);
+    offset += numToWrite;
+    remaining -= numToWrite;
+
+    // Close the specified number of streams.  This allows us to
+    // test that one closed clone does not break other open clones.
+    for (uint32_t i = 0; i < aNumToCloseAfterWrite &&
+                         streamList.Length() > 1; ++i) {
+
+      uint32_t lastIndex = streamList.Length() - 1;
+      streamList[lastIndex]->Close();
+      streamList.RemoveElementAt(lastIndex);
+      outputDataList.RemoveElementAt(lastIndex);
+
+      if (nextStreamToRead >= streamList.Length()) {
+        nextStreamToRead = 0;
+      }
+    }
+
+    // Create the specified number of clones.  This lets us verify
+    // that we can create clones in the middle of pipe reading and
+    // writing.
+    for (uint32_t i = 0; i < aNumToCloneAfterWrite; ++i) {
+      nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
+      rv = cloneable->Clone(getter_AddRefs(*clone));
+      ASSERT_TRUE(NS_SUCCEEDED(rv));
+      ASSERT_TRUE(*clone);
+
+      // Initialize the new output data to make whats been read to data for
+      // the original stream.  First stream is always the original stream.
+      nsCString* outputData = outputDataList.AppendElement();
+      *outputData = outputDataList[0];
+    }
+
+    // Read the specified number of streams.  This lets us verify that we
+    // can read from the clones at different rates while the pipe is being
+    // written to.
+    for (uint32_t i = 0; i < aNumStreamsToReadPerWrite; ++i) {
+      nsCOMPtr<nsIInputStream>& stream = streamList[nextStreamToRead];
+      nsCString& outputData = outputDataList[nextStreamToRead];
+
+      // Can't use ConsumeAndValidateStream() here because we're not
+      // guaranteed the exact amount read.  It should just be at least
+      // as many as numToWrite.
+      nsAutoCString tmpOutputData;
+      rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
+      ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
+      ASSERT_GE(tmpOutputData.Length(), numToWrite);
+
+      outputData += tmpOutputData;
+
+      nextStreamToRead += 1;
+      if (nextStreamToRead >= streamList.Length()) {
+        // Note: When we wrap around on the streams being read, its possible
+        //       we will trigger a segment to be deleted from the pipe.  It
+        //       would be nice to validate this here, but we don't have any
+        //       QI'able interface that would let us check easily.
+
+        nextStreamToRead = 0;
+      }
+    }
+  }
+
+  rv = writer->Close();
+  ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+  nsDependentCSubstring inputString(inputData.Elements(), inputData.Length());
+
+  // Finally, read the remaining bytes from each stream.  This may be
+  // different amounts of data depending on how much reading we did while
+  // writing.  Verify that the end result matches the input data.
+  for (uint32_t i = 0; i < streamList.Length(); ++i) {
+    nsCOMPtr<nsIInputStream>& stream = streamList[i];
+    nsCString& outputData = outputDataList[i];
+
+    nsAutoCString tmpOutputData;
+    rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
+    ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
+    stream->Close();
+
+    // Append to total amount read from the stream
+    outputData += tmpOutputData;
+
+    ASSERT_EQ(inputString.Length(), outputData.Length());
+    ASSERT_TRUE(inputString.Equals(outputData));
+  }
+}
+
+} // anonymous namespace
+
+TEST(Pipes, Clone_BeforeWrite_ReadAtEnd)
+{
+  TestPipeClone(32 * 1024, // total bytes
+                16,        // num writes
+                3,         // num initial clones
+                0,         // num streams to close after each write
+                0,         // num clones to add after each write
+                0);        // num streams to read after each write
+}
+
+TEST(Pipes, Clone_BeforeWrite_ReadDuringWrite)
+{
+  // Since this reads all streams on every write, it should trigger the
+  // pipe cursor roll back optimization.  Currently we can only verify
+  // this with logging.
+
+  TestPipeClone(32 * 1024, // total bytes
+                16,        // num writes
+                3,         // num initial clones
+                0,         // num streams to close after each write
+                0,         // num clones to add after each write
+                4);        // num streams to read after each write
+}
+
+TEST(Pipes, Clone_DuringWrite_ReadAtEnd)
+{
+  TestPipeClone(32 * 1024, // total bytes
+                16,        // num writes
+                0,         // num initial clones
+                0,         // num streams to close after each write
+                1,         // num clones to add after each write
+                0);        // num streams to read after each write
+}
+
+TEST(Pipes, Clone_DuringWrite_ReadDuringWrite)
+{
+  TestPipeClone(32 * 1024, // total bytes
+                16,        // num writes
+                0,         // num initial clones
+                0,         // num streams to close after each write
+                1,         // num clones to add after each write
+                1);        // num streams to read after each write
+}
+
+TEST(Pipes, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite)
+{
+  // Since this reads streams faster than we clone new ones, it should
+  // trigger pipe segment deletion periodically.  Currently we can
+  // only verify this with logging.
+
+  TestPipeClone(32 * 1024, // total bytes
+                16,        // num writes
+                1,         // num initial clones
+                1,         // num streams to close after each write
+                2,         // num clones to add after each write
+                3);        // num streams to read after each write
+}