Bug 1397627 - IPCBlobInputStream should be always async, r=smaug
authorAndrea Marchesini <amarchesini@mozilla.com>
Fri, 08 Sep 2017 16:06:25 +0200
changeset 661561 f77a81b06e4305db0abe0b12c29945cf067fcb4e
parent 661560 b4f8fad33b2f8013edb3aa265ef63629c8f18816
child 661562 85b18b5a5e439fbf31eeb40f94b22a278836ef82
push id78830
push userasasaki@mozilla.com
push dateFri, 08 Sep 2017 19:44:43 +0000
reviewerssmaug
bugs1397627
milestone57.0a1
Bug 1397627 - IPCBlobInputStream should be always async, r=smaug
dom/file/ipc/IPCBlobInputStream.cpp
dom/file/ipc/IPCBlobInputStream.h
--- a/dom/file/ipc/IPCBlobInputStream.cpp
+++ b/dom/file/ipc/IPCBlobInputStream.cpp
@@ -4,22 +4,27 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "IPCBlobInputStream.h"
 #include "IPCBlobInputStreamChild.h"
 #include "IPCBlobInputStreamStorage.h"
 #include "mozilla/ipc/InputStreamParams.h"
 #include "nsIAsyncInputStream.h"
+#include "nsIStreamTransportService.h"
+#include "nsITransport.h"
+#include "nsNetCID.h"
 
 namespace mozilla {
 namespace dom {
 
 namespace {
 
+static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
+
 class CallbackRunnable final : public CancelableRunnable
 {
 public:
   static void
   Execute(nsIInputStreamCallback* aCallback,
           nsIEventTarget* aEventTarget,
           IPCBlobInputStream* aStream)
   {
@@ -103,50 +108,73 @@ IPCBlobInputStream::Available(uint64_t* 
   // We don't have a remoteStream yet. Let's return the full known size.
   if (mState == eInit || mState == ePending) {
     *aLength = mActor->Size();
     return NS_OK;
   }
 
   if (mState == eRunning) {
     MOZ_ASSERT(mRemoteStream);
-    return mRemoteStream->Available(aLength);
+
+    nsresult rv = EnsureAsyncRemoteStream();
+    if (NS_WARN_IF(NS_FAILED(rv))) {
+      return rv;
+    }
+
+    MOZ_ASSERT(mAsyncRemoteStream);
+    return mAsyncRemoteStream->Available(aLength);
   }
 
   MOZ_ASSERT(mState == eClosed);
   return NS_BASE_STREAM_CLOSED;
 }
 
 NS_IMETHODIMP
 IPCBlobInputStream::Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount)
 {
   // Read is not available is we don't have a remoteStream.
   if (mState == eInit || mState == ePending) {
     return NS_BASE_STREAM_WOULD_BLOCK;
   }
 
   if (mState == eRunning) {
-    return mRemoteStream->Read(aBuffer, aCount, aReadCount);
+    MOZ_ASSERT(mRemoteStream);
+
+    nsresult rv = EnsureAsyncRemoteStream();
+    if (NS_WARN_IF(NS_FAILED(rv))) {
+      return rv;
+    }
+
+    MOZ_ASSERT(mAsyncRemoteStream);
+    return mAsyncRemoteStream->Read(aBuffer, aCount, aReadCount);
   }
 
   MOZ_ASSERT(mState == eClosed);
   return NS_BASE_STREAM_CLOSED;
 }
 
 NS_IMETHODIMP
 IPCBlobInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
                                  uint32_t aCount, uint32_t *aResult)
 {
   // ReadSegments is not available is we don't have a remoteStream.
   if (mState == eInit || mState == ePending) {
     return NS_BASE_STREAM_WOULD_BLOCK;
   }
 
   if (mState == eRunning) {
-    return mRemoteStream->ReadSegments(aWriter, aClosure, aCount, aResult);
+    MOZ_ASSERT(mRemoteStream);
+
+    nsresult rv = EnsureAsyncRemoteStream();
+    if (NS_WARN_IF(NS_FAILED(rv))) {
+      return rv;
+    }
+
+    MOZ_ASSERT(mAsyncRemoteStream);
+    return mAsyncRemoteStream->ReadSegments(aWriter, aClosure, aCount, aResult);
   }
 
   MOZ_ASSERT(mState == eClosed);
   return NS_BASE_STREAM_CLOSED;
 }
 
 NS_IMETHODIMP
 IPCBlobInputStream::IsNonBlocking(bool* aNonBlocking)
@@ -158,16 +186,21 @@ IPCBlobInputStream::IsNonBlocking(bool* 
 NS_IMETHODIMP
 IPCBlobInputStream::Close()
 {
   if (mActor) {
     mActor->ForgetStream(this);
     mActor = nullptr;
   }
 
+  if (mAsyncRemoteStream) {
+    mAsyncRemoteStream->Close();
+    mAsyncRemoteStream = nullptr;
+  }
+
   if (mRemoteStream) {
     mRemoteStream->Close();
     mRemoteStream = nullptr;
   }
 
   mCallback = nullptr;
 
   mState = eClosed;
@@ -282,70 +315,63 @@ IPCBlobInputStream::StreamReady(nsIInput
 
 nsresult
 IPCBlobInputStream::MaybeExecuteCallback(nsIInputStreamCallback* aCallback,
                                          nsIEventTarget* aCallbackEventTarget)
 {
   MOZ_ASSERT(mState == eRunning);
   MOZ_ASSERT(mRemoteStream);
 
-  // If the stream supports nsIAsyncInputStream, we need to call its AsyncWait
-  // and wait for OnInputStreamReady.
-  nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(mRemoteStream);
-  if (asyncStream) {
-    // If the callback has been already set, we return an error.
-    if (mCallback && aCallback) {
-      return NS_ERROR_FAILURE;
-    }
-
-    mCallback = aCallback;
-    mCallbackEventTarget = aCallbackEventTarget;
-
-    if (!mCallback) {
-      return NS_OK;
-    }
-
-    RefPtr<nsIEventTarget> target = GetCurrentThreadEventTarget();
-    return asyncStream->AsyncWait(this, 0, 0, target);
+  // If the callback has been already set, we return an error.
+  if (mCallback && aCallback) {
+    return NS_ERROR_FAILURE;
   }
 
-  MOZ_ASSERT(!mCallback);
-  MOZ_ASSERT(!mCallbackEventTarget);
+  mCallback = aCallback;
+  mCallbackEventTarget = aCallbackEventTarget;
 
-  if (!aCallback) {
+  if (!mCallback) {
     return NS_OK;
   }
 
-  CallbackRunnable::Execute(aCallback, aCallbackEventTarget, this);
-  return NS_OK;
+  nsresult rv = EnsureAsyncRemoteStream();
+  if (NS_WARN_IF(NS_FAILED(rv))) {
+    return rv;
+  }
+
+  MOZ_ASSERT(mAsyncRemoteStream);
+
+  return mAsyncRemoteStream->AsyncWait(this, 0, 0, aCallbackEventTarget);
 }
 
 // nsIInputStreamCallback
 
 NS_IMETHODIMP
 IPCBlobInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream)
 {
   // We have been closed in the meantime.
   if (mState == eClosed) {
     return NS_OK;
   }
 
   MOZ_ASSERT(mState == eRunning);
-  MOZ_ASSERT(mRemoteStream == aStream);
+  MOZ_ASSERT(mAsyncRemoteStream == aStream);
 
   // The callback has been canceled in the meantime.
   if (!mCallback) {
     return NS_OK;
   }
 
-  CallbackRunnable::Execute(mCallback, mCallbackEventTarget, this);
+  nsCOMPtr<nsIInputStreamCallback> callback;
+  callback.swap(mCallback);
 
-  mCallback = nullptr;
-  mCallbackEventTarget = nullptr;
-
+  nsCOMPtr<nsIEventTarget> callbackEventTarget;
+  callbackEventTarget.swap(mCallbackEventTarget);
+ 
+  CallbackRunnable::Execute(callback, callbackEventTarget, this);
   return NS_OK;
 }
 
 // nsIIPCSerializableInputStream
 
 void
 IPCBlobInputStream::Serialize(mozilla::ipc::InputStreamParams& aParams,
                               FileDescriptorArray& aFileDescriptors)
@@ -409,10 +435,66 @@ IPCBlobInputStream::GetFileDescriptor(PR
   nsCOMPtr<nsIFileMetadata> fileMetadata = do_QueryInterface(mRemoteStream);
   if (!fileMetadata) {
     return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_ERROR_FAILURE;
   }
 
   return fileMetadata->GetFileDescriptor(aRetval);
 }
 
+nsresult
+IPCBlobInputStream::EnsureAsyncRemoteStream()
+{
+  if (!mRemoteStream) {
+    return NS_ERROR_FAILURE;
+  }
+
+  // We already have an async remote stream.
+  if (mAsyncRemoteStream) {
+    return NS_OK;
+  }
+
+  // If the stream is blocking, we want to make it unblocking using a pipe.
+  bool nonBlocking = false;
+  nsresult rv = mRemoteStream->IsNonBlocking(&nonBlocking);
+  if (NS_WARN_IF(NS_FAILED(rv))) {
+    return rv;
+  }
+
+  nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(mRemoteStream);
+  if (!asyncStream || !nonBlocking) {
+    nsCOMPtr<nsIStreamTransportService> sts =
+      do_GetService(kStreamTransportServiceCID, &rv);
+    if (NS_WARN_IF(NS_FAILED(rv))) {
+      return rv;
+    }
+
+    nsCOMPtr<nsITransport> transport;
+    rv = sts->CreateInputTransport(mRemoteStream,
+                                   /* aStartOffset */ 0,
+                                   /* aReadLimit */ -1,
+                                   /* aCloseWhenDone */ true,
+                                   getter_AddRefs(transport));
+    if (NS_WARN_IF(NS_FAILED(rv))) {
+      return rv;
+    }
+
+    nsCOMPtr<nsIInputStream> wrapper;
+    rv = transport->OpenInputStream(/* aFlags */ 0,
+                                    /* aSegmentSize */ 0,
+                                    /* aSegmentCount */ 0,
+                                    getter_AddRefs(wrapper));
+    if (NS_WARN_IF(NS_FAILED(rv))) {
+      return rv;
+    }
+
+    asyncStream = do_QueryInterface(wrapper);
+  }
+
+  MOZ_ASSERT(asyncStream);
+  mAsyncRemoteStream = asyncStream;
+
+  return NS_OK;
+
+}
+
 } // namespace dom
 } // namespace mozilla
--- a/dom/file/ipc/IPCBlobInputStream.h
+++ b/dom/file/ipc/IPCBlobInputStream.h
@@ -40,16 +40,19 @@ public:
 
 private:
   ~IPCBlobInputStream();
 
   nsresult
   MaybeExecuteCallback(nsIInputStreamCallback* aCallback,
                        nsIEventTarget* aEventTarget);
 
+  nsresult
+  EnsureAsyncRemoteStream();
+
   bool
   IsFileMetadata() const;
 
   RefPtr<IPCBlobInputStreamChild> mActor;
 
   // This is the list of possible states.
   enum {
     // The initial state. Only ::Available() can be used without receiving an
@@ -67,16 +70,17 @@ private:
 
     // If Close() or CloseWithStatus() is called, we move to this state.
     // mRemoveStream is released and any method will return
     // NS_BASE_STREAM_CLOSED.
     eClosed,
   } mState;
 
   nsCOMPtr<nsIInputStream> mRemoteStream;
+  nsCOMPtr<nsIAsyncInputStream> mAsyncRemoteStream;
 
   // These 2 values are set only if mState is ePending.
   nsCOMPtr<nsIInputStreamCallback> mCallback;
   nsCOMPtr<nsIEventTarget> mCallbackEventTarget;
 };
 
 } // namespace dom
 } // namespace mozilla