Bug 1361443 - nsMultiplexInputStream should implement nsIAsyncInputStream, r=smaug
☠☠ backed out by 285b0cbc973e ☠ ☠
authorAndrea Marchesini <amarchesini@mozilla.com>
Thu, 04 May 2017 14:44:35 +0200
changeset 572793 c0e3f3edf36a8486c22d88e0d28f337690e439ff
parent 572792 816f2cd29ba688e49c9f23ae6ccfd80946d71b34
child 572794 1ab58ab887c642c9c1d6068bc565a9151f0b3106
push id57195
push userbmo:rbarker@mozilla.com
push dateThu, 04 May 2017 20:08:56 +0000
reviewerssmaug
bugs1361443
milestone55.0a1
Bug 1361443 - nsMultiplexInputStream should implement nsIAsyncInputStream, r=smaug
xpcom/io/nsMultiplexInputStream.cpp
--- a/xpcom/io/nsMultiplexInputStream.cpp
+++ b/xpcom/io/nsMultiplexInputStream.cpp
@@ -7,52 +7,58 @@
 /**
  * The multiplex stream concatenates a list of input streams into a single
  * stream.
  */
 
 #include "mozilla/Attributes.h"
 #include "mozilla/MathAlgorithms.h"
 #include "mozilla/Mutex.h"
+#include "mozilla/SystemGroup.h"
 
 #include "base/basictypes.h"
 
 #include "nsMultiplexInputStream.h"
 #include "nsICloneableInputStream.h"
 #include "nsIMultiplexInputStream.h"
 #include "nsISeekableStream.h"
 #include "nsCOMPtr.h"
 #include "nsCOMArray.h"
 #include "nsIClassInfoImpl.h"
 #include "nsIIPCSerializableInputStream.h"
 #include "mozilla/ipc/InputStreamUtils.h"
+#include "nsIAsyncInputStream.h"
 
 using namespace mozilla;
 using namespace mozilla::ipc;
 
 using mozilla::DeprecatedAbs;
 using mozilla::Maybe;
 using mozilla::Nothing;
 using mozilla::Some;
 
 class nsMultiplexInputStream final
   : public nsIMultiplexInputStream
   , public nsISeekableStream
   , public nsIIPCSerializableInputStream
   , public nsICloneableInputStream
+  , public nsIAsyncInputStream
 {
 public:
   nsMultiplexInputStream();
 
   NS_DECL_THREADSAFE_ISUPPORTS
   NS_DECL_NSIINPUTSTREAM
   NS_DECL_NSIMULTIPLEXINPUTSTREAM
   NS_DECL_NSISEEKABLESTREAM
   NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
   NS_DECL_NSICLONEABLEINPUTSTREAM
+  NS_DECL_NSIASYNCINPUTSTREAM
+
+  void AsyncWaitCompleted();
 
 private:
   ~nsMultiplexInputStream()
   {
   }
 
   struct MOZ_STACK_CLASS ReadSegmentsState
   {
@@ -65,38 +71,42 @@ private:
 
   static nsresult ReadSegCb(nsIInputStream* aIn, void* aClosure,
                             const char* aFromRawSegment, uint32_t aToOffset,
                             uint32_t aCount, uint32_t* aWriteCount);
 
   bool IsSeekable() const;
   bool IsIPCSerializable() const;
   bool IsCloneable() const;
+  bool IsAsyncInputStream() const;
 
   Mutex mLock; // Protects access to all data members.
   nsTArray<nsCOMPtr<nsIInputStream>> mStreams;
   uint32_t mCurrentStream;
   bool mStartedReadingCurrent;
   nsresult mStatus;
+  nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback;
 };
 
 NS_IMPL_ADDREF(nsMultiplexInputStream)
 NS_IMPL_RELEASE(nsMultiplexInputStream)
 
 NS_IMPL_CLASSINFO(nsMultiplexInputStream, nullptr, nsIClassInfo::THREADSAFE,
                   NS_MULTIPLEXINPUTSTREAM_CID)
 
 NS_INTERFACE_MAP_BEGIN(nsMultiplexInputStream)
   NS_INTERFACE_MAP_ENTRY(nsIMultiplexInputStream)
-  NS_INTERFACE_MAP_ENTRY(nsIInputStream)
+  NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsIInputStream, nsIMultiplexInputStream)
   NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, IsSeekable())
   NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
                                      IsIPCSerializable())
   NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream,
                                      IsCloneable())
+  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream,
+                                     IsAsyncInputStream())
   NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIMultiplexInputStream)
   NS_IMPL_QUERY_CLASSINFO(nsMultiplexInputStream)
 NS_INTERFACE_MAP_END
 
 NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream,
                             nsIMultiplexInputStream,
                             nsIInputStream,
                             nsISeekableStream)
@@ -211,16 +221,19 @@ nsMultiplexInputStream::Close()
   uint32_t len = mStreams.Length();
   for (uint32_t i = 0; i < len; ++i) {
     nsresult rv2 = mStreams[i]->Close();
     // We still want to close all streams, but we should return an error
     if (NS_FAILED(rv2)) {
       rv = rv2;
     }
   }
+
+  mAsyncWaitCallback = nullptr;
+
   return rv;
 }
 
 NS_IMETHODIMP
 nsMultiplexInputStream::Available(uint64_t* aResult)
 {
   MutexAutoLock lock(mLock);
   if (NS_FAILED(mStatus)) {
@@ -303,17 +316,17 @@ nsMultiplexInputStream::ReadSegments(nsW
   if (NS_FAILED(mStatus)) {
     return mStatus;
   }
 
   NS_ASSERTION(aWriter, "missing aWriter");
 
   nsresult rv = NS_OK;
   ReadSegmentsState state;
-  state.mThisStream = this;
+  state.mThisStream = static_cast<nsIMultiplexInputStream*>(this);
   state.mOffset = 0;
   state.mWriter = aWriter;
   state.mClosure = aClosure;
   state.mDone = false;
 
   uint32_t len = mStreams.Length();
   while (mCurrentStream < len && aCount) {
     uint32_t read;
@@ -672,16 +685,199 @@ nsMultiplexInputStream::Tell(int64_t* aR
 }
 
 NS_IMETHODIMP
 nsMultiplexInputStream::SetEOF()
 {
   return NS_ERROR_NOT_IMPLEMENTED;
 }
 
+NS_IMETHODIMP
+nsMultiplexInputStream::CloseWithStatus(nsresult aStatus)
+{
+  return Close();
+}
+
+// This class is used to inform nsMultiplexInputStream that it's time to execute
+// the asyncWait callback.
+class AsyncWaitRunnable final : public Runnable
+{
+  RefPtr<nsMultiplexInputStream> mStream;
+
+public:
+  explicit AsyncWaitRunnable(nsMultiplexInputStream* aStream)
+    : Runnable("AsyncWaitRunnable")
+    , mStream(aStream)
+  {
+    MOZ_ASSERT(aStream);
+  }
+
+  NS_IMETHOD
+  Run() override
+  {
+    mStream->AsyncWaitCompleted();
+    return NS_OK;
+  }
+};
+
+// This helper class processes an array of nsIAsyncInputStreams, calling
+// AsyncWait() for each one of them. When all of them have answered, this helper
+// dispatches a AsyncWaitRunnable object.
+class AsyncStreamHelper final : public nsIInputStreamCallback
+{
+public:
+  NS_DECL_THREADSAFE_ISUPPORTS
+
+  static nsresult
+  Process(nsMultiplexInputStream* aStream,
+          nsTArray<nsCOMPtr<nsIAsyncInputStream>>& aAsyncStreams,
+          uint32_t aFlags, uint32_t aRequestedCount,
+          nsIEventTarget* aEventTarget)
+  {
+    MOZ_ASSERT(aStream);
+    MOZ_ASSERT(!aAsyncStreams.IsEmpty());
+    MOZ_ASSERT(aEventTarget);
+
+    RefPtr<AsyncStreamHelper> helper =
+      new AsyncStreamHelper(aStream, aAsyncStreams, aEventTarget);
+    return helper->Run(aFlags, aRequestedCount);
+  }
+
+private:
+  AsyncStreamHelper(nsMultiplexInputStream* aStream,
+                    nsTArray<nsCOMPtr<nsIAsyncInputStream>>& aAsyncStreams,
+                    nsIEventTarget* aEventTarget)
+    : mMutex("AsyncStreamHelper::mMutex")
+    , mStream(aStream)
+    , mEventTarget(aEventTarget)
+    , mValid(true)
+  {
+    mPendingStreams.SwapElements(aAsyncStreams);
+  }
+
+  ~AsyncStreamHelper() = default;
+
+  nsresult
+  Run(uint32_t aFlags, uint32_t aRequestedCount)
+  {
+    MutexAutoLock lock(mMutex);
+
+    for (uint32_t i = 0; i < mPendingStreams.Length(); ++i) {
+      nsresult rv =
+        mPendingStreams[i]->AsyncWait(this, aFlags, aRequestedCount,
+                                      mEventTarget);
+      if (NS_WARN_IF(NS_FAILED(rv))) {
+        mValid = true;
+        return rv;
+      }
+    }
+
+    return NS_OK;
+  }
+
+  NS_IMETHOD
+  OnInputStreamReady(nsIAsyncInputStream* aStream) override
+  {
+    MOZ_ASSERT(aStream, "This cannot be one of ours.");
+
+    MutexAutoLock lock(mMutex);
+
+    // We failed during the Run().
+    if (!mValid) {
+      return NS_OK;
+    }
+
+    MOZ_ASSERT(mPendingStreams.Contains(aStream));
+    mPendingStreams.RemoveElement(aStream);
+
+    // The last asyncStream answered. We can inform nsMultiplexInputStream.
+    if (mPendingStreams.IsEmpty()) {
+      RefPtr<AsyncWaitRunnable> runnable = new AsyncWaitRunnable(mStream);
+      return mEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
+    }
+
+    return NS_OK;
+  }
+
+  Mutex mMutex;
+  RefPtr<nsMultiplexInputStream> mStream;
+  nsTArray<nsCOMPtr<nsIAsyncInputStream>> mPendingStreams;
+  nsCOMPtr<nsIEventTarget> mEventTarget;
+  bool mValid;
+};
+
+NS_IMPL_ISUPPORTS(AsyncStreamHelper, nsIInputStreamCallback)
+
+NS_IMETHODIMP
+nsMultiplexInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
+                                  uint32_t aFlags,
+                                  uint32_t aRequestedCount,
+                                  nsIEventTarget* aEventTarget)
+{
+  // When AsyncWait() is called, it's better to call AsyncWait() to any sub
+  // stream if they are valid nsIAsyncInputStream instances. In this way, when
+  // they all call OnInputStreamReady(), we can proceed with the Read().
+
+  MutexAutoLock lock(mLock);
+
+  if (NS_FAILED(mStatus)) {
+    return mStatus;
+  }
+
+  if (mAsyncWaitCallback && aCallback) {
+    return NS_ERROR_FAILURE;
+  }
+
+  mAsyncWaitCallback = aCallback;
+
+  if (!mAsyncWaitCallback) {
+      return NS_OK;
+  }
+
+  nsTArray<nsCOMPtr<nsIAsyncInputStream>> asyncStreams;
+  for (uint32_t i = mCurrentStream; i < mStreams.Length(); ++i) {
+    nsCOMPtr<nsIAsyncInputStream> asyncStream =
+      do_QueryInterface(mStreams.SafeElementAt(i, nullptr));
+    if (asyncStream) {
+      asyncStreams.AppendElement(asyncStream);
+    }
+  }
+
+  if (!aEventTarget) {
+    aEventTarget = SystemGroup::EventTargetFor(TaskCategory::Other);
+  }
+
+  if (asyncStreams.IsEmpty()) {
+    RefPtr<AsyncWaitRunnable> runnable = new AsyncWaitRunnable(this);
+    return aEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
+  }
+
+  return AsyncStreamHelper::Process(this, asyncStreams, aFlags, aRequestedCount,
+                                    aEventTarget);
+}
+
+void
+nsMultiplexInputStream::AsyncWaitCompleted()
+{
+  nsCOMPtr<nsIInputStreamCallback> callback;
+
+  {
+    MutexAutoLock lock(mLock);
+
+    // The callback has been nullified in the meantime.
+    if (!mAsyncWaitCallback) {
+      return;
+    }
+
+    mAsyncWaitCallback.swap(callback);
+  }
+
+  callback->OnInputStreamReady(this);
+}
+
 nsresult
 nsMultiplexInputStreamConstructor(nsISupports* aOuter,
                                   REFNSIID aIID,
                                   void** aResult)
 {
   *aResult = nullptr;
 
   if (aOuter) {
@@ -815,17 +1011,17 @@ nsMultiplexInputStream::Clone(nsIInputSt
   MutexAutoLock lock(mLock);
 
   //XXXnsm Cloning a multiplex stream which has started reading is not permitted
   //right now.
   if (mCurrentStream > 0 || mStartedReadingCurrent) {
     return NS_ERROR_FAILURE;
   }
 
-  RefPtr<nsMultiplexInputStream> clone = new nsMultiplexInputStream();
+  nsCOMPtr<nsIMultiplexInputStream> clone = new nsMultiplexInputStream();
 
   nsresult rv;
   uint32_t len = mStreams.Length();
   for (uint32_t i = 0; i < len; ++i) {
     nsCOMPtr<nsICloneableInputStream> substream = do_QueryInterface(mStreams[i]);
     if (NS_WARN_IF(!substream)) {
       return NS_ERROR_FAILURE;
     }
@@ -876,8 +1072,22 @@ nsMultiplexInputStream::IsCloneable() co
   for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
     nsCOMPtr<nsICloneableInputStream> substream = do_QueryInterface(mStreams[i]);
     if (!substream) {
       return false;
     }
   }
   return true;
 }
+
+bool
+nsMultiplexInputStream::IsAsyncInputStream() const
+{
+  // nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
+  // substream implements that interface.
+  for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
+    nsCOMPtr<nsIAsyncInputStream> substream = do_QueryInterface(mStreams[i]);
+    if (substream) {
+      return true;
+    }
+  }
+  return false;
+}