Bug 1255894: Part 5 - Hook up StreamFilterParent as a stream listener. r=dragana,mixedpuppy
authorKris Maglione <maglione.k@gmail.com>
Fri, 21 Apr 2017 20:40:19 -0700
changeset 428216 acc5f5f5946cec6afcd87ed4cebc73a0a8c4226e
parent 428215 eedc32719af7a4a34c6fb2651b05d25d86688e3f
child 428217 26ce00544f9862adf2e07ce2b001280e59015969
push id7761
push userjlund@mozilla.com
push dateFri, 15 Sep 2017 00:19:52 +0000
treeherdermozilla-beta@c38455951db4 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersdragana, mixedpuppy
bugs1255894
milestone57.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1255894: Part 5 - Hook up StreamFilterParent as a stream listener. r=dragana,mixedpuppy This part hooks up the parent side of the StreamListener protocol to the channel, and implements the event handling and actual IO work. Dragana, can you please review the network portions, particularly the thread sanity, and Shane, the integration with the rest of the patch set? MozReview-Commit-ID: DFuALpSSgA7
toolkit/components/extensions/webrequest/StreamFilterParent.cpp
toolkit/components/extensions/webrequest/StreamFilterParent.h
--- a/toolkit/components/extensions/webrequest/StreamFilterParent.cpp
+++ b/toolkit/components/extensions/webrequest/StreamFilterParent.cpp
@@ -5,16 +5,18 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "StreamFilterParent.h"
 
 #include "mozilla/ScopeExit.h"
 #include "mozilla/Unused.h"
 #include "mozilla/dom/nsIContentParent.h"
 #include "nsIChannel.h"
+#include "nsIHttpChannelInternal.h"
+#include "nsIInputStream.h"
 #include "nsITraceableChannel.h"
 #include "nsProxyRelease.h"
 #include "nsStringStream.h"
 
 namespace mozilla {
 namespace extensions {
 
 /*****************************************************************************
@@ -24,22 +26,28 @@ namespace extensions {
 StreamFilterParent::StreamFilterParent(uint64_t aChannelId, const nsAString& aAddonId)
   : mChannelId(aChannelId)
   , mAddonId(NS_Atomize(aAddonId))
   , mPBackgroundThread(NS_GetCurrentThread())
   , mIOThread(do_GetMainThread())
   , mBufferMutex("StreamFilter buffer mutex")
   , mReceivedStop(false)
   , mSentStop(false)
+  , mContext(nullptr)
+  , mOffset(0)
   , mState(State::Uninitialized)
 {
 }
 
 StreamFilterParent::~StreamFilterParent()
 {
+  NS_ReleaseOnMainThreadSystemGroup("StreamFilterParent::mOrigListener",
+                                    mOrigListener.forget());
+  NS_ReleaseOnMainThreadSystemGroup("StreamFilterParent::mContext",
+                                    mContext.forget());
 }
 
 void
 StreamFilterParent::Init(already_AddRefed<nsIContentParent> aContentParent)
 {
   AssertIsPBackgroundThread();
 
   SystemGroup::Dispatch(
@@ -72,16 +80,36 @@ StreamFilterParent::DoInit(already_AddRe
 
   mChannel = webreq.GetTraceableChannel(mChannelId, mAddonId, contentParent);
   if (NS_WARN_IF(!mChannel)) {
     return;
   }
 
   nsCOMPtr<nsITraceableChannel> traceable = do_QueryInterface(mChannel);
   MOZ_RELEASE_ASSERT(traceable);
+
+  nsresult rv = traceable->SetNewListener(this, getter_AddRefs(mOrigListener));
+  success = NS_SUCCEEDED(rv);
+}
+
+/*****************************************************************************
+ * nsIThreadRetargetableStreamListener
+ *****************************************************************************/
+
+NS_IMETHODIMP
+StreamFilterParent::CheckListenerChain()
+{
+  AssertIsMainThread();
+
+  nsCOMPtr<nsIThreadRetargetableStreamListener> trsl =
+    do_QueryInterface(mOrigListener);
+  if (trsl) {
+    return trsl->CheckListenerChain();
+  }
+  return NS_ERROR_FAILURE;
 }
 
 /*****************************************************************************
  * Error handling
  *****************************************************************************/
 
 void
 StreamFilterParent::Broken()
@@ -235,32 +263,152 @@ StreamFilterParent::WriteMove(Data&& aDa
   Unused << NS_WARN_IF(NS_FAILED(rv));
 }
 
 nsresult
 StreamFilterParent::Write(Data& aData)
 {
   AssertIsIOThread();
 
+  nsCOMPtr<nsIInputStream> stream;
+  nsresult rv = NS_NewByteInputStream(getter_AddRefs(stream),
+                                      reinterpret_cast<char*>(aData.Elements()),
+                                      aData.Length());
+  NS_ENSURE_SUCCESS(rv, rv);
+
+  rv = mOrigListener->OnDataAvailable(mChannel, mContext, stream,
+                                      mOffset, aData.Length());
+  NS_ENSURE_SUCCESS(rv, rv);
+
+  mOffset += aData.Length();
   return NS_OK;
 }
 
 /*****************************************************************************
+ * nsIStreamListener
+ *****************************************************************************/
+
+NS_IMETHODIMP
+StreamFilterParent::OnStartRequest(nsIRequest* aRequest, nsISupports* aContext)
+{
+  AssertIsMainThread();
+
+  mContext = aContext;
+
+  if (mState != State::Disconnected) {
+    RefPtr<StreamFilterParent> self(this);
+    RunOnPBackgroundThread(FUNC, [=] {
+      if (self->IPCActive()) {
+        self->mState = State::TransferringData;
+        self->CheckResult(self->SendStartRequest());
+      }
+    });
+  }
+
+  return mOrigListener->OnStartRequest(aRequest, aContext);
+}
+
+NS_IMETHODIMP
+StreamFilterParent::OnStopRequest(nsIRequest* aRequest,
+                                  nsISupports* aContext,
+                                  nsresult aStatusCode)
+{
+  AssertIsMainThread();
+
+  mReceivedStop = true;
+  if (mState == State::Disconnected) {
+    return EmitStopRequest(aStatusCode);
+  }
+
+  RefPtr<StreamFilterParent> self(this);
+  RunOnPBackgroundThread(FUNC, [=] {
+    if (self->IPCActive()) {
+      self->CheckResult(self->SendStopRequest(aStatusCode));
+    }
+  });
+  return NS_OK;
+}
+
+nsresult
+StreamFilterParent::EmitStopRequest(nsresult aStatusCode)
+{
+  AssertIsMainThread();
+  MOZ_ASSERT(!mSentStop);
+
+  mSentStop = true;
+  return mOrigListener->OnStopRequest(mChannel, mContext, aStatusCode);
+}
+
+/*****************************************************************************
  * Incoming data handling
  *****************************************************************************/
 
 void
 StreamFilterParent::DoSendData(Data&& aData)
 {
   AssertIsPBackgroundThread();
 
   if (mState == State::TransferringData) {
+    CheckResult(SendData(aData));
   }
 }
 
+NS_IMETHODIMP
+StreamFilterParent::OnDataAvailable(nsIRequest* aRequest,
+                                    nsISupports* aContext,
+                                    nsIInputStream* aInputStream,
+                                    uint64_t aOffset,
+                                    uint32_t aCount)
+{
+  // Note: No AssertIsIOThread here. Whatever thread we're on now is, by
+  // definition, the IO thread.
+  mIOThread = NS_GetCurrentThread();
+
+  if (mState == State::Disconnected) {
+    // If we're offloading data in a thread pool, it's possible that we'll
+    // have buffered some additional data while waiting for the buffer to
+    // flush. So, if there's any buffered data left, flush that before we
+    // flush this incoming data.
+    //
+    // Note: When in the eDisconnected state, the buffer list is guaranteed
+    // never to be accessed by another thread during an OnDataAvailable call.
+    if (!mBufferedData.isEmpty()) {
+      FlushBufferedData();
+    }
+
+    mOffset += aCount;
+    return mOrigListener->OnDataAvailable(aRequest, aContext, aInputStream,
+                                          mOffset - aCount, aCount);
+  }
+
+  Data data;
+  data.SetLength(aCount);
+
+  uint32_t count;
+  nsresult rv = aInputStream->Read(reinterpret_cast<char*>(data.Elements()),
+                                   aCount, &count);
+  NS_ENSURE_SUCCESS(rv, rv);
+  NS_ENSURE_TRUE(count == aCount, NS_ERROR_UNEXPECTED);
+
+  if (mState == State::Disconnecting) {
+    MutexAutoLock al(mBufferMutex);
+    BufferData(Move(data));
+  } else if (mState == State::Closed) {
+    return NS_ERROR_FAILURE;
+  } else {
+    mPBackgroundThread->Dispatch(
+      NewRunnableMethod<Data&&>("StreamFilterParent::DoSendData",
+                                this,
+                                &StreamFilterParent::DoSendData,
+                                Move(data)),
+      NS_DISPATCH_NORMAL);
+  }
+  return NS_OK;
+}
+
 nsresult
 StreamFilterParent::FlushBufferedData()
 {
   AssertIsIOThread();
 
   // When offloading data to a thread pool, OnDataAvailable isn't guaranteed
   // to always run in the same thread, so it's possible for this function to
   // run in parallel with OnDataAvailable.
@@ -269,16 +417,23 @@ StreamFilterParent::FlushBufferedData()
   while (!mBufferedData.isEmpty()) {
     UniquePtr<BufferedData> data(mBufferedData.popFirst());
 
     nsresult rv = Write(data->mData);
     NS_ENSURE_SUCCESS(rv, rv);
   }
 
   if (mReceivedStop && !mSentStop) {
+    RefPtr<StreamFilterParent> self(this);
+    RunOnMainThread(FUNC, [=] {
+      if (!mSentStop) {
+        nsresult rv = self->EmitStopRequest(NS_OK);
+        Unused << NS_WARN_IF(NS_FAILED(rv));
+      }
+    });
   }
 
   return NS_OK;
 }
 
 /*****************************************************************************
  * Glue
  *****************************************************************************/
@@ -288,13 +443,13 @@ StreamFilterParent::ActorDestroy(ActorDe
 {
   AssertIsPBackgroundThread();
 
   if (mState != State::Disconnected && mState != State::Closed) {
     Broken();
   }
 }
 
-NS_IMPL_ISUPPORTS0(StreamFilterParent)
+NS_IMPL_ISUPPORTS(StreamFilterParent, nsIStreamListener, nsIRequestObserver, nsIThreadRetargetableStreamListener)
 
 } // namespace extensions
 } // namespace mozilla
 
--- a/toolkit/components/extensions/webrequest/StreamFilterParent.h
+++ b/toolkit/components/extensions/webrequest/StreamFilterParent.h
@@ -9,17 +9,19 @@
 
 #include "StreamFilterBase.h"
 #include "mozilla/extensions/PStreamFilterParent.h"
 
 #include "mozilla/LinkedList.h"
 #include "mozilla/Mutex.h"
 #include "mozilla/SystemGroup.h"
 #include "mozilla/WebRequestService.h"
+#include "nsIStreamListener.h"
 #include "nsIThread.h"
+#include "nsIThreadRetargetableStreamListener.h"
 #include "nsThreadUtils.h"
 
 #if defined(_MSC_VER)
 #  define FUNC __FUNCSIG__
 #else
 #  define FUNC __PRETTY_FUNCTION__
 #endif
 
@@ -30,21 +32,25 @@ namespace dom {
 
 namespace extensions {
 
 using namespace mozilla::dom;
 using mozilla::ipc::IPCResult;
 
 class StreamFilterParent final
   : public PStreamFilterParent
-  , public nsISupports
+  , public nsIStreamListener
+  , public nsIThreadRetargetableStreamListener
   , public StreamFilterBase
 {
 public:
   NS_DECL_THREADSAFE_ISUPPORTS
+  NS_DECL_NSISTREAMLISTENER
+  NS_DECL_NSIREQUESTOBSERVER
+  NS_DECL_NSITHREADRETARGETABLESTREAMLISTENER
 
   explicit StreamFilterParent(uint64_t aChannelId, const nsAString& aAddonId);
 
   enum class State
   {
     // The parent has been created, but not yet constructed by the child.
     Uninitialized,
     // The parent has been successfully constructed.
@@ -98,16 +104,18 @@ private:
   nsresult FlushBufferedData();
 
   nsresult Write(Data& aData);
 
   void WriteMove(Data&& aData);
 
   void DoSendData(Data&& aData);
 
+  nsresult EmitStopRequest(nsresult aStatusCode);
+
   virtual void ActorDestroy(ActorDestroyReason aWhy) override;
 
   void Broken();
 
   void
   CheckResult(bool aResult)
   {
     if (NS_WARN_IF(!aResult)) {
@@ -156,24 +164,28 @@ private:
     mIOThread->Dispatch(Move(NS_NewRunnableFunction(aName, aFunc)),
                         NS_DISPATCH_NORMAL);
   }
 
   const uint64_t mChannelId;
   const nsCOMPtr<nsIAtom> mAddonId;
 
   nsCOMPtr<nsIChannel> mChannel;
+  nsCOMPtr<nsIStreamListener> mOrigListener;
 
   nsCOMPtr<nsIThread> mPBackgroundThread;
   nsCOMPtr<nsIThread> mIOThread;
 
   Mutex mBufferMutex;
 
   bool mReceivedStop;
   bool mSentStop;
 
+  nsCOMPtr<nsISupports> mContext;
+  uint64_t mOffset;
+
   volatile State mState;
 };
 
 } // namespace extensions
 } // namespace mozilla
 
 #endif // mozilla_extensions_StreamFilterParent_h