Bug 1255894: Part 11 - Move StreamFilterParent to STS thread. r=dragana
authorKris Maglione <maglione.k@gmail.com>
Sun, 27 Aug 2017 21:06:42 -0700
changeset 429127 74c32b25b40364c4126872c52ee24c02e8cb4a2e
parent 429126 4668460d74e8b61bbb3e7a29ecd74adb25b206cd
child 429128 604fd3302562dc2059a8de3231818a1b618b8ffe
push id7761
push userjlund@mozilla.com
push dateFri, 15 Sep 2017 00:19:52 +0000
treeherdermozilla-beta@c38455951db4 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersdragana
bugs1255894
milestone57.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1255894: Part 11 - Move StreamFilterParent to STS thread. r=dragana MozReview-Commit-ID: L5aPENDjVB3
toolkit/components/extensions/webrequest/StreamFilterParent.cpp
toolkit/components/extensions/webrequest/StreamFilterParent.h
--- a/toolkit/components/extensions/webrequest/StreamFilterParent.cpp
+++ b/toolkit/components/extensions/webrequest/StreamFilterParent.cpp
@@ -11,28 +11,28 @@
 #include "mozilla/dom/ContentParent.h"
 #include "nsHttpChannel.h"
 #include "nsIChannel.h"
 #include "nsIHttpChannelInternal.h"
 #include "nsIInputStream.h"
 #include "nsITraceableChannel.h"
 #include "nsProxyRelease.h"
 #include "nsQueryObject.h"
+#include "nsSocketTransportService2.h"
 #include "nsStringStream.h"
 
 namespace mozilla {
 namespace extensions {
 
 /*****************************************************************************
  * Initialization
  *****************************************************************************/
 
 StreamFilterParent::StreamFilterParent()
-  : mActorThread(GetCurrentThreadEventTarget())
-  , mMainThread(GetCurrentThreadEventTarget())
+  : mMainThread(GetCurrentThreadEventTarget())
   , mIOThread(mMainThread)
   , mBufferMutex("StreamFilter buffer mutex")
   , mReceivedStop(false)
   , mSentStop(false)
   , mContext(nullptr)
   , mOffset(0)
   , mState(State::Uninitialized)
 {
@@ -72,29 +72,40 @@ StreamFilterParent::Create(dom::ContentP
     return false;
   }
 
   *aEndpoint = Move(child);
   return true;
 }
 
 /* static */ void
-StreamFilterParent::Attach(nsIChannel* aChannel, mozilla::ipc::Endpoint<PStreamFilterParent>&& aEndpoint)
+StreamFilterParent::Attach(nsIChannel* aChannel, ParentEndpoint&& aEndpoint)
 {
   auto self = MakeRefPtr<StreamFilterParent>();
-  if (!aEndpoint.Bind(self)) {
-    MOZ_CRASH("Failed to attach StreamFilter endpoint");
-  }
+
+  self->ActorThread()->Dispatch(
+    NewRunnableMethod<ParentEndpoint&&>("StreamFilterParent::Bind",
+                                        self,
+                                        &StreamFilterParent::Bind,
+                                        Move(aEndpoint)),
+    NS_DISPATCH_NORMAL);
 
   self->Init(aChannel);
 
+  // IPC owns this reference now.
   Unused << self.forget();
 }
 
 void
+StreamFilterParent::Bind(ParentEndpoint&& aEndpoint)
+{
+  aEndpoint.Bind(this);
+}
+
+void
 StreamFilterParent::Init(nsIChannel* aChannel)
 {
   mChannel = aChannel;
 
   nsCOMPtr<nsITraceableChannel> traceable = do_QueryInterface(aChannel);
   MOZ_RELEASE_ASSERT(traceable);
 
   nsresult rv = traceable->SetNewListener(this, getter_AddRefs(mOrigListener));
@@ -156,20 +167,32 @@ StreamFilterParent::RecvClose()
     RefPtr<StreamFilterParent> self(this);
     RunOnMainThread(FUNC, [=] {
       nsresult rv = self->EmitStopRequest(NS_OK);
       Unused << NS_WARN_IF(NS_FAILED(rv));
     });
   }
 
   Unused << SendClosed();
-  Close();
+  Destroy();
   return IPC_OK();
 }
 
+void
+StreamFilterParent::Destroy()
+{
+  // Close the channel asynchronously so the actor is never destroyed before
+  // this message is fully processed.
+  ActorThread()->Dispatch(
+    NewRunnableMethod("StreamFilterParent::Close",
+                      this,
+                      &StreamFilterParent::Close),
+    NS_DISPATCH_NORMAL);
+}
+
 IPCResult
 StreamFilterParent::RecvSuspend()
 {
   AssertIsActorThread();
 
   if (mState == State::TransferringData) {
     RefPtr<StreamFilterParent> self(this);
     RunOnMainThread(FUNC, [=] {
@@ -231,17 +254,17 @@ StreamFilterParent::RecvDisconnect()
 
 IPCResult
 StreamFilterParent::RecvFlushedData()
 {
   AssertIsActorThread();
 
   MOZ_ASSERT(mState == State::Disconnecting);
 
-  Close();
+  Destroy();
 
   RefPtr<StreamFilterParent> self(this);
   RunOnIOThread(FUNC, [=] {
     self->FlushBufferedData();
 
     RunOnActorThread(FUNC, [=] {
       self->mState = State::Disconnected;
     });
@@ -253,22 +276,26 @@ StreamFilterParent::RecvFlushedData()
  * Data output
  *****************************************************************************/
 
 IPCResult
 StreamFilterParent::RecvWrite(Data&& aData)
 {
   AssertIsActorThread();
 
-  mIOThread->Dispatch(
-    NewRunnableMethod<Data&&>("StreamFilterParent::WriteMove",
-                              this,
-                              &StreamFilterParent::WriteMove,
-                              Move(aData)),
-    NS_DISPATCH_NORMAL);
+  if (IsIOThread()) {
+    Write(aData);
+  } else {
+    IOThread()->Dispatch(
+      NewRunnableMethod<Data&&>("StreamFilterParent::WriteMove",
+                                this,
+                                &StreamFilterParent::WriteMove,
+                                Move(aData)),
+      NS_DISPATCH_NORMAL);
+  }
   return IPC_OK();
 }
 
 void
 StreamFilterParent::WriteMove(Data&& aData)
 {
   nsresult rv = Write(aData);
   Unused << NS_WARN_IF(NS_FAILED(rv));
@@ -366,17 +393,21 @@ NS_IMETHODIMP
 StreamFilterParent::OnDataAvailable(nsIRequest* aRequest,
                                     nsISupports* aContext,
                                     nsIInputStream* aInputStream,
                                     uint64_t aOffset,
                                     uint32_t aCount)
 {
   // Note: No AssertIsIOThread here. Whatever thread we're on now is, by
   // definition, the IO thread.
-  mIOThread = NS_GetCurrentThread();
+  if (OnSocketThread()) {
+    mIOThread = nullptr;
+  } else {
+    mIOThread = NS_GetCurrentThread();
+  }
 
   if (mState == State::Disconnected) {
     // If we're offloading data in a thread pool, it's possible that we'll
     // have buffered some additional data while waiting for the buffer to
     // flush. So, if there's any buffered data left, flush that before we
     // flush this incoming data.
     //
     // Note: When in the eDisconnected state, the buffer list is guaranteed
@@ -400,17 +431,17 @@ StreamFilterParent::OnDataAvailable(nsIR
   NS_ENSURE_TRUE(count == aCount, NS_ERROR_UNEXPECTED);
 
   if (mState == State::Disconnecting) {
     MutexAutoLock al(mBufferMutex);
     BufferData(Move(data));
   } else if (mState == State::Closed) {
     return NS_ERROR_FAILURE;
   } else {
-    mActorThread->Dispatch(
+    ActorThread()->Dispatch(
       NewRunnableMethod<Data&&>("StreamFilterParent::DoSendData",
                                 this,
                                 &StreamFilterParent::DoSendData,
                                 Move(data)),
       NS_DISPATCH_NORMAL);
   }
   return NS_OK;
 }
@@ -441,16 +472,79 @@ StreamFilterParent::FlushBufferedData()
       }
     });
   }
 
   return NS_OK;
 }
 
 /*****************************************************************************
+ * Thread helpers
+ *****************************************************************************/
+
+void
+StreamFilterParent::AssertIsActorThread()
+{
+  MOZ_ASSERT(OnSocketThread());
+}
+
+nsIEventTarget*
+StreamFilterParent::ActorThread()
+{
+  return gSocketTransportService;
+}
+
+nsIEventTarget*
+StreamFilterParent::IOThread()
+{
+  if (mIOThread) {
+    return mIOThread;
+  }
+  return gSocketTransportService;
+}
+
+bool
+StreamFilterParent::IsIOThread()
+{
+  return (mIOThread ? NS_GetCurrentThread() == mIOThread
+                    : OnSocketThread());
+}
+
+void
+StreamFilterParent::AssertIsIOThread()
+{
+  MOZ_ASSERT(IsIOThread());
+}
+
+template<typename Function>
+void
+StreamFilterParent::RunOnActorThread(const char* aName, Function&& aFunc)
+{
+  if (OnSocketThread()) {
+    aFunc();
+  } else {
+    gSocketTransportService->Dispatch(
+      Move(NS_NewRunnableFunction(aName, aFunc)),
+      NS_DISPATCH_NORMAL);
+  }
+}
+
+template<typename Function>
+void
+StreamFilterParent::RunOnIOThread(const char* aName, Function&& aFunc)
+{
+  if (mIOThread) {
+    mIOThread->Dispatch(Move(NS_NewRunnableFunction(aName, aFunc)),
+                        NS_DISPATCH_NORMAL);
+  } else {
+    RunOnActorThread(aName, Move(aFunc));
+  }
+}
+
+/*****************************************************************************
  * Glue
  *****************************************************************************/
 
 void
 StreamFilterParent::ActorDestroy(ActorDestroyReason aWhy)
 {
   AssertIsActorThread();
 
--- a/toolkit/components/extensions/webrequest/StreamFilterParent.h
+++ b/toolkit/components/extensions/webrequest/StreamFilterParent.h
@@ -47,21 +47,23 @@ class StreamFilterParent final
 public:
   NS_DECL_THREADSAFE_ISUPPORTS
   NS_DECL_NSISTREAMLISTENER
   NS_DECL_NSIREQUESTOBSERVER
   NS_DECL_NSITHREADRETARGETABLESTREAMLISTENER
 
   StreamFilterParent();
 
+  using ParentEndpoint = mozilla::ipc::Endpoint<PStreamFilterParent>;
+
   static bool Create(ContentParent* aContentParent,
                      uint64_t aChannelId, const nsAString& aAddonId,
                      mozilla::ipc::Endpoint<PStreamFilterChild>* aEndpoint);
 
-  static void Attach(nsIChannel* aChannel, mozilla::ipc::Endpoint<PStreamFilterParent>&& aEndpoint);
+  static void Attach(nsIChannel* aChannel, ParentEndpoint&& aEndpoint);
 
   enum class State
   {
     // The parent has been created, but not yet constructed by the child.
     Uninitialized,
     // The parent has been successfully constructed.
     Initialized,
     // The OnRequestStarted event has been received, and data is being
@@ -98,16 +100,20 @@ private:
   {
     return (mState != State::Closed &&
             mState != State::Disconnecting &&
             mState != State::Disconnected);
   }
 
   void Init(nsIChannel* aChannel);
 
+  void Bind(ParentEndpoint&& aEndpoint);
+
+  void Destroy();
+
   nsresult FlushBufferedData();
 
   nsresult Write(Data& aData);
 
   void WriteMove(Data&& aData);
 
   void DoSendData(Data&& aData);
 
@@ -120,62 +126,49 @@ private:
   void
   CheckResult(bool aResult)
   {
     if (NS_WARN_IF(!aResult)) {
       Broken();
     }
   }
 
-  void
-  AssertIsActorThread()
-  {
-    MOZ_ASSERT(NS_GetCurrentThread() == mActorThread);
-  }
+  inline nsIEventTarget* ActorThread();
+
+  inline nsIEventTarget* IOThread();
 
-  void
-  AssertIsIOThread()
-  {
-    MOZ_ASSERT(NS_GetCurrentThread() == mIOThread);
-  }
+  inline bool IsIOThread();
+
+  inline void AssertIsActorThread();
+
+  inline void AssertIsIOThread();
 
   static void
   AssertIsMainThread()
   {
     MOZ_ASSERT(NS_IsMainThread());
   }
 
   template<typename Function>
   void
   RunOnMainThread(const char* aName, Function&& aFunc)
   {
     mMainThread->Dispatch(Move(NS_NewRunnableFunction(aName, aFunc)),
                           NS_DISPATCH_NORMAL);
   }
 
   template<typename Function>
-  void
-  RunOnActorThread(const char* aName, Function&& aFunc)
-  {
-    mActorThread->Dispatch(Move(NS_NewRunnableFunction(aName, aFunc)),
-                           NS_DISPATCH_NORMAL);
-  }
+  void RunOnActorThread(const char* aName, Function&& aFunc);
 
   template<typename Function>
-  void
-  RunOnIOThread(const char* aName, Function&& aFunc)
-  {
-    mIOThread->Dispatch(Move(NS_NewRunnableFunction(aName, aFunc)),
-                        NS_DISPATCH_NORMAL);
-  }
+  void RunOnIOThread(const char* aName, Function&& aFunc);
 
   nsCOMPtr<nsIChannel> mChannel;
   nsCOMPtr<nsIStreamListener> mOrigListener;
 
-  nsCOMPtr<nsIEventTarget> mActorThread;
   nsCOMPtr<nsIEventTarget> mMainThread;
   nsCOMPtr<nsIEventTarget> mIOThread;
 
   Mutex mBufferMutex;
 
   bool mReceivedStop;
   bool mSentStop;