Backed out 2 changesets (bug 1133939) because of bug 1136453
authorEhsan Akhgari <ehsan@mozilla.com>
Wed, 25 Feb 2015 13:26:41 -0500
changeset 257402 0a882df01fc3dadfc369431670787fe6b998bf47
parent 257401 57f387aaa54b22623a1bd2f8f149febe51696b8b
child 257403 42d279a1e1f7c42db650ada48114e6194453c2ce
push id4610
push userjlund@mozilla.com
push dateMon, 30 Mar 2015 18:32:55 +0000
treeherdermozilla-beta@4df54044d9ef [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
bugs1133939, 1136453
milestone38.0a2
backs out212080d51fb7ad163eaf9da19315b42aa0c1af50
27de4b5539120f837604f3b1867b22b488ade6db
Backed out 2 changesets (bug 1133939) because of bug 1136453 Backed out changeset 212080d51fb7 (bug 1133939) Backed out changeset 27de4b553912 (bug 1133939)
xpcom/io/nsPipe3.cpp
xpcom/tests/gtest/Helpers.cpp
xpcom/tests/gtest/Helpers.h
xpcom/tests/gtest/TestPipes.cpp
--- a/xpcom/io/nsPipe3.cpp
+++ b/xpcom/io/nsPipe3.cpp
@@ -49,32 +49,16 @@ GetPipeLog()
 #define DEFAULT_SEGMENT_SIZE  4096
 #define DEFAULT_SEGMENT_COUNT 16
 
 class nsPipe;
 class nsPipeEvents;
 class nsPipeInputStream;
 class nsPipeOutputStream;
 
-namespace {
-
-enum MonitorAction
-{
-  DoNotNotifyMonitor,
-  NotifyMonitor
-};
-
-enum SegmentChangeResult
-{
-  SegmentNotChanged,
-  SegmentDeleted
-};
-
-} // anonymous namespace
-
 //-----------------------------------------------------------------------------
 
 // this class is used to delay notifications until the end of a particular
 // scope.  it helps avoid the complexity of issuing callbacks while inside
 // a critical section.
 class nsPipeEvents
 {
 public:
@@ -171,18 +155,20 @@ public:
   uint32_t Available()
   {
     return mAvailable;
   }
 
   // synchronously wait for the pipe to become readable.
   nsresult Wait();
 
-  MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&);
-  MonitorAction OnInputException(nsresult, nsPipeEvents&);
+  // these functions return true to indicate that the pipe's monitor should
+  // be notified, to wake up a blocked reader if any.
+  bool OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&);
+  bool OnInputException(nsresult, nsPipeEvents&);
 
   nsPipeReadState& ReadState()
   {
     return mReadState;
   }
 
   const nsPipeReadState& ReadState() const
   {
@@ -248,18 +234,20 @@ public:
   void SetWritable(bool aWritable)
   {
     mWritable = aWritable;
   }
 
   // synchronously wait for the pipe to become writable.
   nsresult Wait();
 
-  MonitorAction OnOutputWritable(nsPipeEvents&);
-  MonitorAction OnOutputException(nsresult, nsPipeEvents&);
+  // these functions return true to indicate that the pipe's monitor should
+  // be notified, to wake up a blocked writer if any.
+  bool OnOutputWritable(nsPipeEvents&);
+  bool OnOutputException(nsresult, nsPipeEvents&);
 
 private:
   nsPipe*                         mPipe;
 
   // separate refcnt so that we know when to close the producer
   mozilla::ThreadSafeAutoRefCnt   mWriterRefCnt;
   int64_t                         mLogicalOffset;
   bool                            mBlocking;
@@ -299,20 +287,16 @@ public:
   //
   // methods below may be called while outside the pipe's monitor
   //
 
   nsresult GetReadSegment(const nsPipeReadState& aReadState,
                           const char*& aSegment, uint32_t& aSegmentLen);
   void     AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount,
                              uint32_t* aAvailableOut);
-  SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState);
-  void     DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents,
-                            uint32_t* aAvailableOut);
-  bool     ReadSegmentBeingWritten(nsPipeReadState& aReadState);
 
   nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen);
   void     AdvanceWriteCursor(uint32_t aCount);
 
   void     OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason);
   void     OnPipeException(nsresult aReason, bool aOutputOnly = false);
 
   nsresult CloneInputStream(nsPipeInputStream* aOriginal,
@@ -551,120 +535,70 @@ nsPipe::AdvanceReadCursor(nsPipeReadStat
     *aAvailableOut -= aBytesRead;
 
     if (aReadState.mReadCursor == aReadState.mReadLimit) {
       // we've reached the limit of how much we can read from this segment.
       // if at the end of this segment, then we must discard this segment.
 
       // if still writing in this segment then bail because we're not done
       // with the segment and have to wait for now...
-      if (ReadSegmentBeingWritten(aReadState)) {
+      if (mWriteSegment == aReadState.mSegment && mWriteLimit > mWriteCursor) {
+        NS_ASSERTION(aReadState.mReadLimit == mWriteCursor, "unexpected state");
         return;
       }
 
-      // Check to see if we can free up any segments.  If we can, then notify
-      // the output stream that the pipe has room for a new segment.
-      if (AdvanceReadSegment(aReadState) == SegmentDeleted &&
-          mOutput.OnOutputWritable(events) == NotifyMonitor) {
-        mon.NotifyAll();
+      uint32_t currentSegment = aReadState.mSegment;
+
+      // Move to the next segment to read
+      aReadState.mSegment += 1;
+
+      // If this was the last reference to the first segment, then remove it.
+      if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
+
+        // shift write and read segment index (-1 indicates an empty buffer).
+        mWriteSegment -= 1;
+
+        for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+          mInputList[i]->ReadState().mSegment -= 1;
+        }
+
+        // done with this segment
+        mBuffer.DeleteFirstSegment();
+        LOG(("III deleting first segment\n"));
+      }
+
+      if (mWriteSegment < aReadState.mSegment) {
+        // read cursor has hit the end of written data, so reset it
+        MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
+        aReadState.mReadCursor = nullptr;
+        aReadState.mReadLimit = nullptr;
+        // also, the buffer is completely empty, so reset the write cursor
+        if (mWriteSegment == -1) {
+          mWriteCursor = nullptr;
+          mWriteLimit = nullptr;
+        }
+      } else {
+        // advance read cursor and limit to next buffer segment
+        aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
+        if (mWriteSegment == aReadState.mSegment) {
+          aReadState.mReadLimit = mWriteCursor;
+        } else {
+          aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
+        }
+      }
+
+      // we've free'd up a segment, so notify output stream that pipe has
+      // room for a new segment.
+      if (mOutput.OnOutputWritable(events)) {
+        mon.Notify();
       }
     }
   }
 }
 
-SegmentChangeResult
-nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState)
-{
-  int32_t currentSegment = aReadState.mSegment;
-
-  // Move to the next segment to read
-  aReadState.mSegment += 1;
-
-  SegmentChangeResult result = SegmentNotChanged;
-
-  // If this was the last reference to the first segment, then remove it.
-  if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
-
-    // shift write and read segment index (-1 indicates an empty buffer).
-    mWriteSegment -= 1;
-
-    for (uint32_t i = 0; i < mInputList.Length(); ++i) {
-      mInputList[i]->ReadState().mSegment -= 1;
-    }
-
-    // done with this segment
-    mBuffer.DeleteFirstSegment();
-    LOG(("III deleting first segment\n"));
-
-    result = SegmentDeleted;
-  }
-
-  if (mWriteSegment < aReadState.mSegment) {
-    // read cursor has hit the end of written data, so reset it
-    MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
-    aReadState.mReadCursor = nullptr;
-    aReadState.mReadLimit = nullptr;
-    // also, the buffer is completely empty, so reset the write cursor
-    if (mWriteSegment == -1) {
-      mWriteCursor = nullptr;
-      mWriteLimit = nullptr;
-    }
-  } else {
-    // advance read cursor and limit to next buffer segment
-    aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
-    if (mWriteSegment == aReadState.mSegment) {
-      aReadState.mReadLimit = mWriteCursor;
-    } else {
-      aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
-    }
-  }
-
-  return result;
-}
-
-void
-nsPipe::DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents,
-                         uint32_t* aAvailableOut)
-{
-  ReentrantMonitorAutoEnter mon(mReentrantMonitor);
-
-  *aAvailableOut = 0;
-
-  SegmentChangeResult result = SegmentNotChanged;
-  while(mWriteSegment >= aReadState.mSegment) {
-
-    // If the last segment to free is still being written to, we're done
-    // draining.  We can't free any more.
-    if (ReadSegmentBeingWritten(aReadState)) {
-      break;
-    }
-
-    if (AdvanceReadSegment(aReadState) == SegmentDeleted) {
-      result = SegmentDeleted;
-    }
-  }
-
-  // if we've free'd up a segment, notify output stream that pipe has
-  // room for a new segment.
-  if (result == SegmentDeleted &&
-      mOutput.OnOutputWritable(aEvents) == NotifyMonitor) {
-    mon.NotifyAll();
-  }
-}
-
-bool
-nsPipe::ReadSegmentBeingWritten(nsPipeReadState& aReadState)
-{
-  bool beingWritten = mWriteSegment == aReadState.mSegment &&
-                      mWriteLimit > mWriteCursor;
-  NS_ASSERTION(!beingWritten || aReadState.mReadLimit == mWriteCursor,
-               "unexpected state");
-  return beingWritten;
-}
-
 nsresult
 nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen)
 {
   ReentrantMonitorAutoEnter mon(mReentrantMonitor);
 
   if (NS_FAILED(mStatus)) {
     return mStatus;
   }
@@ -725,17 +659,17 @@ nsPipe::AdvanceWriteCursor(uint32_t aByt
       if (mBuffer.GetSize() >= mBuffer.GetMaxSize()) {
         mOutput.SetWritable(false);
       }
     }
 
     // notify input stream that pipe now contains additional data
     bool needNotify = false;
     for (uint32_t i = 0; i < mInputList.Length(); ++i) {
-      if (mInputList[i]->OnInputReadable(aBytesWritten, events) == NotifyMonitor) {
+      if (mInputList[i]->OnInputReadable(aBytesWritten, events)) {
         needNotify = true;
       }
     }
 
     if (needNotify) {
       mon.NotifyAll();
     }
   }
@@ -766,22 +700,22 @@ nsPipe::OnInputStreamException(nsPipeInp
     }
 
     // Otherwise just close the particular stream that hit an exception.
     for (uint32_t i = 0; i < mInputList.Length(); ++i) {
       if (mInputList[i] != aStream) {
         continue;
       }
 
-      MonitorAction action = mInputList[i]->OnInputException(aReason, events);
+      bool needNotify = mInputList[i]->OnInputException(aReason, events);
       mInputList.RemoveElementAt(i);
 
       // Notify after element is removed in case we re-enter as a result.
-      if (action == NotifyMonitor) {
-        mon.NotifyAll();
+      if (needNotify) {
+        mon.Notify();
       }
 
       return;
     }
   }
 }
 
 void
@@ -807,23 +741,23 @@ nsPipe::OnPipeException(nsresult aReason
     for (uint32_t i = 0; i < mInputList.Length(); ++i) {
       // an output-only exception applies to the input end if the pipe has
       // zero bytes available.
       if (aOutputOnly && mInputList[i]->Available()) {
         tmpInputList.AppendElement(mInputList[i]);
         continue;
       }
 
-      if (mInputList[i]->OnInputException(aReason, events) == NotifyMonitor) {
+      if (mInputList[i]->OnInputException(aReason, events)) {
         needNotify = true;
       }
     }
     mInputList = tmpInputList;
 
-    if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) {
+    if (mOutput.OnOutputException(aReason, events)) {
       needNotify = true;
     }
 
     // Notify after we have removed any input streams from mInputList
     if (needNotify) {
       mon.NotifyAll();
     }
   }
@@ -1002,57 +936,57 @@ nsPipeInputStream::Wait()
 
     LOG(("III pipe input: woke up [status=%x available=%u]\n",
          Status(), mAvailable));
   }
 
   return Status() == NS_BASE_STREAM_CLOSED ? NS_OK : Status();
 }
 
-MonitorAction
+bool
 nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten, nsPipeEvents& aEvents)
 {
-  MonitorAction result = DoNotNotifyMonitor;
+  bool result = false;
 
   mAvailable += aBytesWritten;
 
   if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
     aEvents.NotifyInputReady(this, mCallback);
     mCallback = 0;
     mCallbackFlags = 0;
   } else if (mBlocked) {
-    result = NotifyMonitor;
+    result = true;
   }
 
   return result;
 }
 
-MonitorAction
+bool
 nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents)
 {
   LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n",
        this, aReason));
 
-  MonitorAction result = DoNotNotifyMonitor;
+  bool result = false;
 
   NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception");
 
   if (NS_SUCCEEDED(mInputStatus)) {
     mInputStatus = aReason;
   }
 
   // force count of available bytes to zero.
-  mPipe->DrainInputStream(mReadState, aEvents, &mAvailable);
+  mAvailable = 0;
 
   if (mCallback) {
     aEvents.NotifyInputReady(this, mCallback);
     mCallback = 0;
     mCallbackFlags = 0;
   } else if (mBlocked) {
-    result = NotifyMonitor;
+    result = true;
   }
 
   return result;
 }
 
 NS_IMETHODIMP
 nsPipeInputStream::CloseWithStatus(nsresult aReason)
 {
@@ -1383,51 +1317,51 @@ nsPipeOutputStream::Wait()
     mBlocked = false;
     LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n",
          mPipe->mStatus, mWritable));
   }
 
   return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
 }
 
-MonitorAction
+bool
 nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents)
 {
-  MonitorAction result = DoNotNotifyMonitor;
+  bool result = false;
 
   mWritable = true;
 
   if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
     aEvents.NotifyOutputReady(this, mCallback);
     mCallback = 0;
     mCallbackFlags = 0;
   } else if (mBlocked) {
-    result = NotifyMonitor;
+    result = true;
   }
 
   return result;
 }
 
-MonitorAction
+bool
 nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents)
 {
   LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n",
        this, aReason));
 
-  MonitorAction result = DoNotNotifyMonitor;
+  bool result = false;
 
   NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception");
   mWritable = false;
 
   if (mCallback) {
     aEvents.NotifyOutputReady(this, mCallback);
     mCallback = 0;
     mCallbackFlags = 0;
   } else if (mBlocked) {
-    result = NotifyMonitor;
+    result = true;
   }
 
   return result;
 }
 
 
 NS_IMETHODIMP_(MozExternalRefCountType)
 nsPipeOutputStream::AddRef()
--- a/xpcom/tests/gtest/Helpers.cpp
+++ b/xpcom/tests/gtest/Helpers.cpp
@@ -89,27 +89,9 @@ ConsumeAndValidateStream(nsIInputStream*
 {
   nsAutoCString outputData;
   nsresult rv = NS_ConsumeStream(aStream, UINT32_MAX, outputData);
   ASSERT_TRUE(NS_SUCCEEDED(rv));
   ASSERT_EQ(aExpectedData.Length(), outputData.Length());
   ASSERT_TRUE(aExpectedData.Equals(outputData));
 }
 
-NS_IMPL_ISUPPORTS(OutputStreamCallback, nsIOutputStreamCallback);
-
-OutputStreamCallback::OutputStreamCallback()
-  : mCalled(false)
-{
-}
-
-OutputStreamCallback::~OutputStreamCallback()
-{
-}
-
-NS_IMETHODIMP
-OutputStreamCallback::OnOutputStreamReady(nsIAsyncOutputStream* aStream)
-{
-  mCalled = true;
-  return NS_OK;
-}
-
 } // namespace testing
--- a/xpcom/tests/gtest/Helpers.h
+++ b/xpcom/tests/gtest/Helpers.h
@@ -2,17 +2,16 @@
 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef __Helpers_h
 #define __Helpers_h
 
-#include "nsIAsyncOutputStream.h"
 #include "nsString.h"
 #include <stdint.h>
 
 class nsIInputStream;
 class nsIOutputStream;
 template <class T> class nsTArray;
 
 namespace testing {
@@ -30,27 +29,11 @@ WriteAllAndClose(nsIOutputStream* aStrea
 void
 ConsumeAndValidateStream(nsIInputStream* aStream,
                          const nsTArray<char>& aExpectedData);
 
 void
 ConsumeAndValidateStream(nsIInputStream* aStream,
                          const nsACString& aExpectedData);
 
-class OutputStreamCallback MOZ_FINAL : public nsIOutputStreamCallback
-{
-public:
-  OutputStreamCallback();
-
-  bool Called() const { return mCalled; }
-
-private:
-  ~OutputStreamCallback();
-
-  bool mCalled;
-public:
-  NS_DECL_ISUPPORTS
-  NS_DECL_NSIOUTPUTSTREAMCALLBACK
-};
-
 } // namespace testing
 
 #endif // __Helpers_h
--- a/xpcom/tests/gtest/TestPipes.cpp
+++ b/xpcom/tests/gtest/TestPipes.cpp
@@ -655,132 +655,8 @@ TEST(Pipes, Clone_DuringWrite_ReadDuring
 
   TestPipeClone(32 * 1024, // total bytes
                 16,        // num writes
                 1,         // num initial clones
                 1,         // num streams to close after each write
                 2,         // num clones to add after each write
                 3);        // num streams to read after each write
 }
-
-TEST(Pipes, Write_AsyncWait)
-{
-  nsCOMPtr<nsIAsyncInputStream> reader;
-  nsCOMPtr<nsIAsyncOutputStream> writer;
-
-  const uint32_t segmentSize = 1024;
-  const uint32_t numSegments = 1;
-
-  nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
-                            true, true,  // non-blocking - reader, writer
-                            segmentSize, numSegments);
-  ASSERT_TRUE(NS_SUCCEEDED(rv));
-
-  nsTArray<char> inputData;
-  testing::CreateData(segmentSize, inputData);
-
-  uint32_t numWritten = 0;
-  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
-  ASSERT_TRUE(NS_SUCCEEDED(rv));
-
-  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
-  ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
-
-  nsRefPtr<testing::OutputStreamCallback> cb =
-    new testing::OutputStreamCallback();
-
-  rv = writer->AsyncWait(cb, 0, 0, nullptr);
-  ASSERT_TRUE(NS_SUCCEEDED(rv));
-
-  ASSERT_FALSE(cb->Called());
-
-  testing::ConsumeAndValidateStream(reader, inputData);
-
-  ASSERT_TRUE(cb->Called());
-}
-
-TEST(Pipes, Write_AsyncWait_Clone)
-{
-  nsCOMPtr<nsIAsyncInputStream> reader;
-  nsCOMPtr<nsIAsyncOutputStream> writer;
-
-  const uint32_t segmentSize = 1024;
-  const uint32_t numSegments = 1;
-
-  nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
-                            true, true,  // non-blocking - reader, writer
-                            segmentSize, numSegments);
-  ASSERT_TRUE(NS_SUCCEEDED(rv));
-
-  nsCOMPtr<nsIInputStream> clone;
-  rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
-  ASSERT_TRUE(NS_SUCCEEDED(rv));
-
-  nsTArray<char> inputData;
-  testing::CreateData(segmentSize, inputData);
-
-  uint32_t numWritten = 0;
-  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
-  ASSERT_TRUE(NS_SUCCEEDED(rv));
-
-  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
-  ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
-
-  nsRefPtr<testing::OutputStreamCallback> cb =
-    new testing::OutputStreamCallback();
-
-  rv = writer->AsyncWait(cb, 0, 0, nullptr);
-  ASSERT_TRUE(NS_SUCCEEDED(rv));
-
-  ASSERT_FALSE(cb->Called());
-
-  testing::ConsumeAndValidateStream(reader, inputData);
-
-  ASSERT_FALSE(cb->Called());
-
-  testing::ConsumeAndValidateStream(clone, inputData);
-
-  ASSERT_TRUE(cb->Called());
-}
-
-TEST(Pipes, Write_AsyncWait_Clone_CloseOriginal)
-{
-  nsCOMPtr<nsIAsyncInputStream> reader;
-  nsCOMPtr<nsIAsyncOutputStream> writer;
-
-  const uint32_t segmentSize = 1024;
-  const uint32_t numSegments = 1;
-
-  nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
-                            true, true,  // non-blocking - reader, writer
-                            segmentSize, numSegments);
-  ASSERT_TRUE(NS_SUCCEEDED(rv));
-
-  nsCOMPtr<nsIInputStream> clone;
-  rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
-  ASSERT_TRUE(NS_SUCCEEDED(rv));
-
-  nsTArray<char> inputData;
-  testing::CreateData(segmentSize, inputData);
-
-  uint32_t numWritten = 0;
-  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
-  ASSERT_TRUE(NS_SUCCEEDED(rv));
-
-  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
-  ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
-
-  nsRefPtr<testing::OutputStreamCallback> cb =
-    new testing::OutputStreamCallback();
-
-  rv = writer->AsyncWait(cb, 0, 0, nullptr);
-  ASSERT_TRUE(NS_SUCCEEDED(rv));
-
-  ASSERT_FALSE(cb->Called());
-
-  testing::ConsumeAndValidateStream(clone, inputData);
-
-  ASSERT_FALSE(cb->Called());
-
-  reader->Close();
-
-  ASSERT_TRUE(cb->Called());
-}