Bug 1371889 - Fetch body must be kept alive using refcounting. r=bkelly, a=jcristau
☠☠ backed out by d94ac92ea7c3 ☠ ☠
authorAndrea Marchesini <amarchesini@mozilla.com>
Thu, 29 Jun 2017 08:37:08 -0400
changeset 411924 bc1c85daa17a0b7bc85c9e0503bdeab005306148
parent 411923 59ea27873e7bd98491669e3c815879777542dd34
child 411925 a98632e47afd571c1a850322d86e777783f8b747
push id7503
push userryanvm@gmail.com
push dateWed, 12 Jul 2017 21:21:02 +0000
treeherdermozilla-beta@a98632e47afd [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersbkelly, jcristau
bugs1371889
milestone55.0
Bug 1371889 - Fetch body must be kept alive using refcounting. r=bkelly, a=jcristau
dom/fetch/Fetch.cpp
dom/fetch/Fetch.h
dom/workers/ServiceWorkerEvents.h
--- a/dom/fetch/Fetch.cpp
+++ b/dom/fetch/Fetch.cpp
@@ -827,83 +827,212 @@ ExtractByteStreamFromBody(const fetch::B
     return body.GetAsStream(aStream, &aContentLength, aContentTypeWithCharset,
                             charset);
   }
 
   NS_NOTREACHED("Should never reach here");
   return NS_ERROR_FAILURE;
 }
 
+template <class Derived>
+class FetchBodyWrapper;
+
+template <class Derived>
+class FetchBodyWorkerHolder final : public workers::WorkerHolder
+{
+  RefPtr<FetchBodyWrapper<Derived>> mWrapper;
+  bool mWasNotified;
+
+public:
+  explicit FetchBodyWorkerHolder(FetchBodyWrapper<Derived>* aWrapper)
+    : mWrapper(aWrapper)
+    , mWasNotified(false)
+  {
+    MOZ_ASSERT(aWrapper);
+  }
+
+  ~FetchBodyWorkerHolder() = default;
+
+  bool Notify(workers::Status aStatus) override;
+};
+
+// FetchBody is not thread-safe but we need to move it around threads.
+// In order to keep it alive all the time, we use a WorkerHolder, if created on
+// workers, plus a wrapper.
+template <class Derived>
+class FetchBodyWrapper final
+{
+public:
+  friend class ReleaseObjectHelper;
+
+  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(FetchBodyWrapper<Derived>)
+
+  static already_AddRefed<FetchBodyWrapper<Derived>>
+  Create(FetchBody<Derived>* aBody)
+  {
+    MOZ_ASSERT(aBody);
+
+    RefPtr<FetchBodyWrapper<Derived>> wrapper =
+      new FetchBodyWrapper<Derived>(aBody);
+
+    if (!NS_IsMainThread()) {
+      WorkerPrivate* workerPrivate = GetCurrentThreadWorkerPrivate();
+      MOZ_ASSERT(workerPrivate);
+
+      if (!wrapper->RegisterWorkerHolder(workerPrivate)) {
+        return nullptr;
+      }
+    }
+
+    return wrapper.forget();
+  }
+
+  void
+  ReleaseObject()
+  {
+    AssertIsOnTargetThread();
+
+    mWorkerHolder = nullptr;
+    mBody = nullptr;
+  }
+
+  FetchBody<Derived>*
+  Body() const
+  {
+    return mBody;
+  }
+
+private:
+  explicit FetchBodyWrapper(FetchBody<Derived>* aBody)
+    : mTargetThread(NS_GetCurrentThread())
+    , mBody(aBody)
+  {}
+
+  ~FetchBodyWrapper()
+  {
+    NS_ProxyRelease(mTargetThread, mBody.forget());
+  }
+
+  void
+  AssertIsOnTargetThread()
+  {
+    MOZ_ASSERT(NS_GetCurrentThread() == mTargetThread);
+  }
+
+  bool
+  RegisterWorkerHolder(WorkerPrivate* aWorkerPrivate)
+  {
+    MOZ_ASSERT(aWorkerPrivate);
+    aWorkerPrivate->AssertIsOnWorkerThread();
+
+    MOZ_ASSERT(!mWorkerHolder);
+    mWorkerHolder.reset(new FetchBodyWorkerHolder<Derived>(this));
+
+    if (!mWorkerHolder->HoldWorker(aWorkerPrivate, Closing)) {
+      NS_WARNING("Failed to add workerHolder");
+      mWorkerHolder = nullptr;
+      return false;
+    }
+
+    return true;
+  }
+
+  nsCOMPtr<nsIThread> mTargetThread;
+  RefPtr<FetchBody<Derived>> mBody;
+
+  // Set when consuming the body is attempted on a worker.
+  // Unset when consumption is done/aborted.
+  // This WorkerHolder keeps alive the wrapper via a cycle.
+  UniquePtr<workers::WorkerHolder> mWorkerHolder;
+};
+
+template <class Derived>
+bool
+FetchBodyWorkerHolder<Derived>::Notify(workers::Status aStatus)
+{
+  MOZ_ASSERT(aStatus > workers::Running);
+  if (!mWasNotified) {
+    mWasNotified = true;
+    // This will probably cause the releasing of the wrapper.
+    // The WorkerHolder will be released as well.
+    mWrapper->Body()->ContinueConsumeBody(mWrapper, NS_BINDING_ABORTED, 0,
+                                          nullptr);
+  }
+
+  return true;
+}
+
 namespace {
+
 /*
  * Called on successfully reading the complete stream.
  */
 template <class Derived>
 class ContinueConsumeBodyRunnable final : public MainThreadWorkerRunnable
 {
-  // This has been addrefed before this runnable is dispatched,
-  // released in WorkerRun().
-  FetchBody<Derived>* mFetchBody;
+  RefPtr<FetchBodyWrapper<Derived>> mFetchBodyWrapper;
   nsresult mStatus;
   uint32_t mLength;
   uint8_t* mResult;
 
 public:
-  ContinueConsumeBodyRunnable(FetchBody<Derived>* aFetchBody, nsresult aStatus,
-                              uint32_t aLength, uint8_t* aResult)
-    : MainThreadWorkerRunnable(aFetchBody->mWorkerPrivate)
-    , mFetchBody(aFetchBody)
+  ContinueConsumeBodyRunnable(FetchBodyWrapper<Derived>* aFetchBodyWrapper,
+                              nsresult aStatus, uint32_t aLength,
+                              uint8_t* aResult)
+    : MainThreadWorkerRunnable(aFetchBodyWrapper->Body()->mWorkerPrivate)
+    , mFetchBodyWrapper(aFetchBodyWrapper)
     , mStatus(aStatus)
     , mLength(aLength)
     , mResult(aResult)
   {
     MOZ_ASSERT(NS_IsMainThread());
   }
 
   bool
   WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
   {
-    mFetchBody->ContinueConsumeBody(mStatus, mLength, mResult);
+    mFetchBodyWrapper->Body()->ContinueConsumeBody(mFetchBodyWrapper, mStatus,
+                                                   mLength, mResult);
     return true;
   }
 };
 
 /*
  * Called on successfully reading the complete stream for Blob.
  */
 template <class Derived>
 class ContinueConsumeBlobBodyRunnable final : public MainThreadWorkerRunnable
 {
-  // This has been addrefed before this runnable is dispatched,
-  // released in WorkerRun().
-  FetchBody<Derived>* mFetchBody;
+  RefPtr<FetchBodyWrapper<Derived>> mFetchBodyWrapper;
   RefPtr<BlobImpl> mBlobImpl;
 
 public:
-  ContinueConsumeBlobBodyRunnable(FetchBody<Derived>* aFetchBody,
+  ContinueConsumeBlobBodyRunnable(FetchBodyWrapper<Derived>* aFetchBodyWrapper,
                                   BlobImpl* aBlobImpl)
-    : MainThreadWorkerRunnable(aFetchBody->mWorkerPrivate)
-    , mFetchBody(aFetchBody)
+    : MainThreadWorkerRunnable(aFetchBodyWrapper->Body()->mWorkerPrivate)
+    , mFetchBodyWrapper(aFetchBodyWrapper)
     , mBlobImpl(aBlobImpl)
   {
     MOZ_ASSERT(NS_IsMainThread());
     MOZ_ASSERT(mBlobImpl);
   }
 
   bool
   WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
   {
-    mFetchBody->ContinueConsumeBlobBody(mBlobImpl);
+    mFetchBodyWrapper->Body()->ContinueConsumeBlobBody(mFetchBodyWrapper,
+                                                       mBlobImpl);
     return true;
   }
 };
 
 // OnStreamComplete always adopts the buffer, utility class to release it in
 // a couple of places.
-class MOZ_STACK_CLASS AutoFreeBuffer final {
+class MOZ_STACK_CLASS AutoFreeBuffer final
+{
   uint8_t* mBuffer;
 
 public:
   explicit AutoFreeBuffer(uint8_t* aBuffer)
     : mBuffer(aBuffer)
   {}
 
   ~AutoFreeBuffer()
@@ -916,80 +1045,86 @@ public:
   {
     mBuffer= nullptr;
   }
 };
 
 template <class Derived>
 class FailConsumeBodyWorkerRunnable : public MainThreadWorkerControlRunnable
 {
-  FetchBody<Derived>* mBody;
+  RefPtr<FetchBodyWrapper<Derived>> mBodyWrapper;
+
 public:
-  explicit FailConsumeBodyWorkerRunnable(FetchBody<Derived>* aBody)
-    : MainThreadWorkerControlRunnable(aBody->mWorkerPrivate)
-    , mBody(aBody)
+  explicit FailConsumeBodyWorkerRunnable(FetchBodyWrapper<Derived>* aBodyWrapper)
+    : MainThreadWorkerControlRunnable(aBodyWrapper->Body()->mWorkerPrivate)
+    , mBodyWrapper(aBodyWrapper)
   {
     AssertIsOnMainThread();
   }
 
   bool
   WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
   {
-    mBody->ContinueConsumeBody(NS_ERROR_FAILURE, 0, nullptr);
+    mBodyWrapper->Body()->ContinueConsumeBody(mBodyWrapper, NS_ERROR_FAILURE,
+                                              0, nullptr);
     return true;
   }
 };
 
 /*
  * In case of failure to create a stream pump or dispatch stream completion to
  * worker, ensure we cleanup properly. Thread agnostic.
  */
 template <class Derived>
 class MOZ_STACK_CLASS AutoFailConsumeBody final
 {
-  FetchBody<Derived>* mBody;
+  RefPtr<FetchBodyWrapper<Derived>> mBodyWrapper;
+
 public:
-  explicit AutoFailConsumeBody(FetchBody<Derived>* aBody)
-    : mBody(aBody)
-  { }
+  explicit AutoFailConsumeBody(FetchBodyWrapper<Derived>* aBodyWrapper)
+    : mBodyWrapper(aBodyWrapper)
+  {}
 
   ~AutoFailConsumeBody()
   {
     AssertIsOnMainThread();
-    if (mBody) {
-      if (mBody->mWorkerPrivate) {
+
+    if (mBodyWrapper) {
+      if (mBodyWrapper->Body()->mWorkerPrivate) {
         RefPtr<FailConsumeBodyWorkerRunnable<Derived>> r =
-          new FailConsumeBodyWorkerRunnable<Derived>(mBody);
+          new FailConsumeBodyWorkerRunnable<Derived>(mBodyWrapper);
         if (!r->Dispatch()) {
           MOZ_CRASH("We are going to leak");
         }
       } else {
-        mBody->ContinueConsumeBody(NS_ERROR_FAILURE, 0, nullptr);
+        mBodyWrapper->Body()->ContinueConsumeBody(mBodyWrapper,
+                                                  NS_ERROR_FAILURE, 0,
+                                                  nullptr);
       }
     }
   }
 
   void
   DontFail()
   {
-    mBody = nullptr;
+    mBodyWrapper = nullptr;
   }
 };
 
 template <class Derived>
 class ConsumeBodyDoneObserver : public nsIStreamLoaderObserver
                               , public MutableBlobStorageCallback
 {
-  FetchBody<Derived>* mFetchBody;
+  RefPtr<FetchBodyWrapper<Derived>> mFetchBodyWrapper;
 
 public:
   NS_DECL_THREADSAFE_ISUPPORTS
 
-  explicit ConsumeBodyDoneObserver(FetchBody<Derived>* aFetchBody)
-    : mFetchBody(aFetchBody)
+  explicit ConsumeBodyDoneObserver(FetchBodyWrapper<Derived>* aFetchBodyWrapper)
+    : mFetchBodyWrapper(aFetchBodyWrapper)
   { }
 
   NS_IMETHOD
   OnStreamComplete(nsIStreamLoader* aLoader,
                    nsISupports* aCtxt,
                    nsresult aStatus,
                    uint32_t aResultLength,
                    const uint8_t* aResult) override
@@ -998,31 +1133,33 @@ public:
 
     // If the binding requested cancel, we don't need to call
     // ContinueConsumeBody, since that is the originator.
     if (aStatus == NS_BINDING_ABORTED) {
       return NS_OK;
     }
 
     uint8_t* nonconstResult = const_cast<uint8_t*>(aResult);
-    if (mFetchBody->mWorkerPrivate) {
+    if (mFetchBodyWrapper->Body()->mWorkerPrivate) {
       RefPtr<ContinueConsumeBodyRunnable<Derived>> r =
-        new ContinueConsumeBodyRunnable<Derived>(mFetchBody,
-                                        aStatus,
-                                        aResultLength,
-                                        nonconstResult);
+        new ContinueConsumeBodyRunnable<Derived>(mFetchBodyWrapper,
+                                                 aStatus,
+                                                 aResultLength,
+                                                 nonconstResult);
       if (!r->Dispatch()) {
         // XXXcatalinb: The worker is shutting down, the pump will be canceled
         // by FetchBodyWorkerHolder::Notify.
         NS_WARNING("Could not dispatch ConsumeBodyRunnable");
         // Return failure so that aResult is freed.
         return NS_ERROR_FAILURE;
       }
     } else {
-      mFetchBody->ContinueConsumeBody(aStatus, aResultLength, nonconstResult);
+      mFetchBodyWrapper->Body()->ContinueConsumeBody(mFetchBodyWrapper,
+                                                     aStatus, aResultLength,
+                                                     nonconstResult);
     }
 
     // FetchBody is responsible for data.
     return NS_SUCCESS_ADOPTED_DATA;
   }
 
   virtual void BlobStoreCompleted(MutableBlobStorage* aBlobStorage,
                                   Blob* aBlob,
@@ -1031,26 +1168,28 @@ public:
     // On error.
     if (NS_FAILED(aRv)) {
       OnStreamComplete(nullptr, nullptr, aRv, 0, nullptr);
       return;
     }
 
     MOZ_ASSERT(aBlob);
 
-    if (mFetchBody->mWorkerPrivate) {
+    if (mFetchBodyWrapper->Body()->mWorkerPrivate) {
       RefPtr<ContinueConsumeBlobBodyRunnable<Derived>> r =
-        new ContinueConsumeBlobBodyRunnable<Derived>(mFetchBody, aBlob->Impl());
+        new ContinueConsumeBlobBodyRunnable<Derived>(mFetchBodyWrapper,
+                                                     aBlob->Impl());
 
       if (!r->Dispatch()) {
         NS_WARNING("Could not dispatch ConsumeBlobBodyRunnable");
         return;
       }
     } else {
-      mFetchBody->ContinueConsumeBlobBody(aBlob->Impl());
+      mFetchBodyWrapper->Body()->ContinueConsumeBlobBody(mFetchBodyWrapper,
+                                                         aBlob->Impl());
     }
   }
 
 private:
   virtual ~ConsumeBodyDoneObserver()
   { }
 };
 
@@ -1062,86 +1201,60 @@ template <class Derived>
 NS_INTERFACE_MAP_BEGIN(ConsumeBodyDoneObserver<Derived>)
   NS_INTERFACE_MAP_ENTRY(nsIStreamLoaderObserver)
   NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIStreamLoaderObserver)
 NS_INTERFACE_MAP_END
 
 template <class Derived>
 class BeginConsumeBodyRunnable final : public Runnable
 {
-  FetchBody<Derived>* mFetchBody;
+  RefPtr<FetchBodyWrapper<Derived>> mFetchBodyWrapper;
+
 public:
-  explicit BeginConsumeBodyRunnable(FetchBody<Derived>* aBody)
-    : mFetchBody(aBody)
+  explicit BeginConsumeBodyRunnable(FetchBodyWrapper<Derived>* aWrapper)
+    : mFetchBodyWrapper(aWrapper)
   { }
 
   NS_IMETHOD
   Run() override
   {
-    mFetchBody->BeginConsumeBodyMainThread();
+    mFetchBodyWrapper->Body()->BeginConsumeBodyMainThread(mFetchBodyWrapper);
     return NS_OK;
   }
 };
 
 template <class Derived>
 class CancelPumpRunnable final : public WorkerMainThreadRunnable
 {
+  // This is a sync runnable. What dispatches this runnable must keep the body
+  // alive.
   FetchBody<Derived>* mBody;
+
 public:
   explicit CancelPumpRunnable(FetchBody<Derived>* aBody)
     : WorkerMainThreadRunnable(aBody->mWorkerPrivate,
                                NS_LITERAL_CSTRING("Fetch :: Cancel Pump"))
     , mBody(aBody)
-  { }
+  {}
 
   bool
   MainThreadRun() override
   {
     mBody->CancelPump();
     return true;
   }
 };
+
 } // namespace
 
 template <class Derived>
-class FetchBodyWorkerHolder final : public workers::WorkerHolder
-{
-  // This is addrefed before the workerHolder is created, and is released in
-  // ContinueConsumeBody() so we can hold a rawptr.
-  FetchBody<Derived>* mBody;
-  bool mWasNotified;
-
-public:
-  explicit FetchBodyWorkerHolder(FetchBody<Derived>* aBody)
-    : mBody(aBody)
-    , mWasNotified(false)
-  { }
-
-  ~FetchBodyWorkerHolder()
-  { }
-
-  bool Notify(workers::Status aStatus) override
-  {
-    MOZ_ASSERT(aStatus > workers::Running);
-    if (!mWasNotified) {
-      mWasNotified = true;
-      mBody->ContinueConsumeBody(NS_BINDING_ABORTED, 0, nullptr);
-    }
-    return true;
-  }
-};
-
-template <class Derived>
 FetchBody<Derived>::FetchBody(nsIGlobalObject* aOwner)
-  : mWorkerHolder(nullptr)
-  , mOwner(aOwner)
+  : mOwner(aOwner)
   , mBodyUsed(false)
-#ifdef DEBUG
-  , mReadDone(false)
-#endif
+  , mBodyConsumed(false)
 {
   MOZ_ASSERT(aOwner);
 
   if (!NS_IsMainThread()) {
     mWorkerPrivate = GetCurrentThreadWorkerPrivate();
     MOZ_ASSERT(mWorkerPrivate);
     mMainThreadEventTarget = mWorkerPrivate->MainThreadEventTarget();
   } else {
@@ -1158,125 +1271,65 @@ FetchBody<Request>::FetchBody(nsIGlobalO
 template
 FetchBody<Response>::FetchBody(nsIGlobalObject* aOwner);
 
 template <class Derived>
 FetchBody<Derived>::~FetchBody()
 {
 }
 
-// Returns true if addref succeeded.
-// Always succeeds on main thread.
-// May fail on worker if RegisterWorkerHolder() fails. In that case, it will
-// release the object before returning false.
-template <class Derived>
-bool
-FetchBody<Derived>::AddRefObject()
-{
-  AssertIsOnTargetThread();
-  DerivedClass()->AddRef();
-
-  if (mWorkerPrivate && !mWorkerHolder) {
-    if (!RegisterWorkerHolder()) {
-      ReleaseObject();
-      return false;
-    }
-  }
-  return true;
-}
-
-template <class Derived>
-void
-FetchBody<Derived>::ReleaseObject()
-{
-  AssertIsOnTargetThread();
-
-  if (mWorkerPrivate && mWorkerHolder) {
-    UnregisterWorkerHolder();
-  }
-
-  DerivedClass()->Release();
-}
-
-template <class Derived>
-bool
-FetchBody<Derived>::RegisterWorkerHolder()
-{
-  MOZ_ASSERT(mWorkerPrivate);
-  mWorkerPrivate->AssertIsOnWorkerThread();
-  MOZ_ASSERT(!mWorkerHolder);
-  mWorkerHolder = new FetchBodyWorkerHolder<Derived>(this);
-
-  if (!mWorkerHolder->HoldWorker(mWorkerPrivate, Closing)) {
-    NS_WARNING("Failed to add workerHolder");
-    mWorkerHolder = nullptr;
-    return false;
-  }
-
-  return true;
-}
-
-template <class Derived>
-void
-FetchBody<Derived>::UnregisterWorkerHolder()
-{
-  MOZ_ASSERT(mWorkerPrivate);
-  mWorkerPrivate->AssertIsOnWorkerThread();
-  MOZ_ASSERT(mWorkerHolder);
-
-  mWorkerHolder->ReleaseWorker();
-  mWorkerHolder = nullptr;
-}
-
 template <class Derived>
 void
 FetchBody<Derived>::CancelPump()
 {
   AssertIsOnMainThread();
   MOZ_ASSERT(mConsumeBodyPump);
   mConsumeBodyPump->Cancel(NS_BINDING_ABORTED);
 }
 
 // Return value is used by ConsumeBody to bubble the error code up to WebIDL so
 // mConsumePromise doesn't have to be rejected on early exit.
 template <class Derived>
 nsresult
 FetchBody<Derived>::BeginConsumeBody()
 {
   AssertIsOnTargetThread();
-  MOZ_ASSERT(!mWorkerHolder);
   MOZ_ASSERT(mConsumePromise);
 
-  // The FetchBody is not thread-safe refcounted. We addref it here and release
-  // it once the stream read is finished.
-  if (!AddRefObject()) {
+  // The FetchBody is not thread-safe refcounted. We wrap it with a thread-safe
+  // object able to keep the current worker alive (if we are running in a
+  // worker).
+  RefPtr<FetchBodyWrapper<Derived>> wrapper =
+    FetchBodyWrapper<Derived>::Create(this);
+  if (!wrapper) {
     return NS_ERROR_FAILURE;
   }
 
-  nsCOMPtr<nsIRunnable> r = new BeginConsumeBodyRunnable<Derived>(this);
+  nsCOMPtr<nsIRunnable> r = new BeginConsumeBodyRunnable<Derived>(wrapper);
   nsresult rv = NS_OK;
   mMainThreadEventTarget->Dispatch(r.forget(), NS_DISPATCH_NORMAL);
   if (NS_WARN_IF(NS_FAILED(rv))) {
-    ReleaseObject();
     return rv;
   }
   return NS_OK;
 }
 
 /*
  * BeginConsumeBodyMainThread() will automatically reject the consume promise
  * and clean up on any failures, so there is no need for callers to do so,
  * reflected in a lack of error return code.
  */
 template <class Derived>
 void
-FetchBody<Derived>::BeginConsumeBodyMainThread()
+FetchBody<Derived>::BeginConsumeBodyMainThread(FetchBodyWrapper<Derived>* aWrapper)
 {
   AssertIsOnMainThread();
-  AutoFailConsumeBody<Derived> autoReject(DerivedClass());
+
+  AutoFailConsumeBody<Derived> autoReject(aWrapper);
+
   nsresult rv;
   nsCOMPtr<nsIInputStream> stream;
   DerivedClass()->GetBody(getter_AddRefs(stream));
   if (!stream) {
     rv = NS_NewCStringInputStream(getter_AddRefs(stream), EmptyCString());
     if (NS_WARN_IF(NS_FAILED(rv))) {
       return;
     }
@@ -1285,17 +1338,18 @@ FetchBody<Derived>::BeginConsumeBodyMain
   nsCOMPtr<nsIInputStreamPump> pump;
   rv = NS_NewInputStreamPump(getter_AddRefs(pump),
                              stream, -1, -1, 0, 0, false,
                              mMainThreadEventTarget);
   if (NS_WARN_IF(NS_FAILED(rv))) {
     return;
   }
 
-  RefPtr<ConsumeBodyDoneObserver<Derived>> p = new ConsumeBodyDoneObserver<Derived>(this);
+  RefPtr<ConsumeBodyDoneObserver<Derived>> p =
+   new ConsumeBodyDoneObserver<Derived>(aWrapper);
 
   nsCOMPtr<nsIStreamListener> listener;
   if (mConsumeType == CONSUME_BLOB) {
     MutableBlobStorage::MutableBlobStorageType type =
       MutableBlobStorage::eOnlyInMemory;
 
     const mozilla::UniquePtr<mozilla::ipc::PrincipalInfo>& principalInfo =
       DerivedClass()->GetPrincipalInfo();
@@ -1340,35 +1394,38 @@ FetchBody<Derived>::BeginConsumeBodyMain
     if (NS_WARN_IF(NS_FAILED(rv))) {
       NS_WARNING("Retargeting failed");
     }
   }
 }
 
 template <class Derived>
 void
-FetchBody<Derived>::ContinueConsumeBody(nsresult aStatus, uint32_t aResultLength, uint8_t* aResult)
+FetchBody<Derived>::ContinueConsumeBody(FetchBodyWrapper<Derived>* aBodyWrapper,
+                                        nsresult aStatus, uint32_t aResultLength,
+                                        uint8_t* aResult)
 {
   AssertIsOnTargetThread();
   // Just a precaution to ensure ContinueConsumeBody is not called out of
   // sync with a body read.
   MOZ_ASSERT(mBodyUsed);
-  MOZ_ASSERT(!mReadDone);
-  MOZ_ASSERT_IF(mWorkerPrivate, mWorkerHolder);
-#ifdef DEBUG
-  mReadDone = true;
-#endif
+
+  if (mBodyConsumed) {
+    return;
+  }
+  mBodyConsumed = true;
 
   AutoFreeBuffer autoFree(aResult);
 
   MOZ_ASSERT(mConsumePromise);
   RefPtr<Promise> localPromise = mConsumePromise.forget();
 
-  RefPtr<Derived> derivedClass = DerivedClass();
-  ReleaseObject();
+  auto autoReleaseObject = mozilla::MakeScopeExit([&] {
+    aBodyWrapper->ReleaseObject();
+  });
 
   if (NS_WARN_IF(NS_FAILED(aStatus))) {
     localPromise->MaybeReject(NS_ERROR_DOM_ABORT_ERR);
 
     // If binding aborted, cancel the pump. We can't assert mConsumeBodyPump.
     // In the (admittedly rare) situation that BeginConsumeBodyMainThread()
     // context switches out, and the worker thread gets canceled before the
     // pump is setup, mConsumeBodyPump will be null.
@@ -1379,17 +1436,17 @@ FetchBody<Derived>::ContinueConsumeBody(
         CancelPump();
       } else {
         MOZ_ASSERT(mWorkerPrivate);
         // In case of worker thread, we block the worker while the request is
         // canceled on the main thread. This ensures that OnStreamComplete has
         // a valid FetchBody around to call CancelPump and we don't release the
         // FetchBody on the main thread.
         RefPtr<CancelPumpRunnable<Derived>> r =
-          new CancelPumpRunnable<Derived>(this);
+          new CancelPumpRunnable<Derived>(aBodyWrapper->Body());
         ErrorResult rv;
         r->Dispatch(Terminating, rv);
         if (rv.Failed()) {
           NS_WARNING("Could not dispatch CancelPumpRunnable. Nothing we can do here");
           // None of our callers are callled directly from JS, so there is no
           // point in trying to propagate this failure out of here.  And
           // localPromise is already rejected.  Just suppress the failure.
           rv.SuppressException();
@@ -1406,17 +1463,17 @@ FetchBody<Derived>::ContinueConsumeBody(
   if (NS_FAILED(aStatus)) {
     return;
   }
 
   // Finish successfully consuming body according to type.
   MOZ_ASSERT(aResult);
 
   AutoJSAPI jsapi;
-  if (!jsapi.Init(derivedClass->GetParentObject())) {
+  if (!jsapi.Init(aBodyWrapper->Body()->DerivedClass()->GetParentObject())) {
     localPromise->MaybeReject(NS_ERROR_UNEXPECTED);
     return;
   }
 
   JSContext* cx = jsapi.cx();
   ErrorResult error;
 
   switch (mConsumeType) {
@@ -1440,17 +1497,17 @@ FetchBody<Derived>::ContinueConsumeBody(
       break;
     }
     case CONSUME_FORMDATA: {
       nsCString data;
       data.Adopt(reinterpret_cast<char*>(aResult), aResultLength);
       autoFree.Reset();
 
       RefPtr<dom::FormData> fd = BodyUtil::ConsumeFormData(
-        derivedClass->GetParentObject(),
+        aBodyWrapper->Body()->DerivedClass()->GetParentObject(),
         mMimeType, data, error);
       if (!error.Failed()) {
         localPromise->MaybeResolve(fd);
       }
       break;
     }
     case CONSUME_TEXT:
       // fall through handles early exit.
@@ -1476,41 +1533,44 @@ FetchBody<Derived>::ContinueConsumeBody(
   error.WouldReportJSException();
   if (error.Failed()) {
     localPromise->MaybeReject(error);
   }
 }
 
 template <class Derived>
 void
-FetchBody<Derived>::ContinueConsumeBlobBody(BlobImpl* aBlobImpl)
+FetchBody<Derived>::ContinueConsumeBlobBody(FetchBodyWrapper<Derived>* aBodyWrapper,
+                                            BlobImpl* aBlobImpl)
 {
   AssertIsOnTargetThread();
   // Just a precaution to ensure ContinueConsumeBody is not called out of
   // sync with a body read.
   MOZ_ASSERT(mBodyUsed);
-  MOZ_ASSERT(!mReadDone);
   MOZ_ASSERT(mConsumeType == CONSUME_BLOB);
-  MOZ_ASSERT_IF(mWorkerPrivate, mWorkerHolder);
-#ifdef DEBUG
-  mReadDone = true;
-#endif
+
+  if (mBodyConsumed) {
+    return;
+  }
+  mBodyConsumed = true;
 
   MOZ_ASSERT(mConsumePromise);
   RefPtr<Promise> localPromise = mConsumePromise.forget();
 
-  RefPtr<Derived> derivedClass = DerivedClass();
-  ReleaseObject();
+  auto autoReleaseObject = mozilla::MakeScopeExit([&] {
+    aBodyWrapper->ReleaseObject();
+  });
 
   // Release the pump and then early exit if there was an error.
   // Uses NS_ProxyRelease internally, so this is safe.
   mConsumeBodyPump = nullptr;
 
   RefPtr<dom::Blob> blob =
-    dom::Blob::Create(derivedClass->GetParentObject(), aBlobImpl);
+    dom::Blob::Create(aBodyWrapper->Body()->DerivedClass()->GetParentObject(),
+                      aBlobImpl);
   MOZ_ASSERT(blob);
 
   localPromise->MaybeResolve(blob);
 }
 
 template <class Derived>
 already_AddRefed<Promise>
 FetchBody<Derived>::ConsumeBody(ConsumeType aType, ErrorResult& aRv)
--- a/dom/fetch/Fetch.h
+++ b/dom/fetch/Fetch.h
@@ -15,17 +15,16 @@
 #include "nsError.h"
 #include "nsProxyRelease.h"
 #include "nsString.h"
 
 #include "mozilla/DebugOnly.h"
 #include "mozilla/ErrorResult.h"
 #include "mozilla/dom/Promise.h"
 #include "mozilla/dom/RequestBinding.h"
-#include "mozilla/dom/workers/bindings/WorkerHolder.h"
 
 class nsIGlobalObject;
 class nsIEventTarget;
 
 namespace mozilla {
 namespace dom {
 
 class BlobOrArrayBufferViewOrArrayBufferOrFormDataOrURLSearchParamsOrUSVString;
@@ -67,17 +66,17 @@ ExtractByteStreamFromBody(const fetch::O
  * Non-owning version.
  */
 nsresult
 ExtractByteStreamFromBody(const fetch::BodyInit& aBodyInit,
                           nsIInputStream** aStream,
                           nsCString& aContentType,
                           uint64_t& aContentLength);
 
-template <class Derived> class FetchBodyWorkerHolder;
+template <class Derived> class FetchBodyWrapper;
 
 /*
  * FetchBody's body consumption uses nsIInputStreamPump to read from the
  * underlying stream to a block of memory, which is then adopted by
  * ContinueConsumeBody() and converted to the right type based on the JS
  * function called.
  *
  * Use of the nsIInputStreamPump complicates things on the worker thread.
@@ -103,18 +102,21 @@ template <class Derived> class FetchBody
  *    ensure that mFetchBody remains alive (since mConsumeBodyPump is strongly
  *    held by it) until pump->Cancel() is called. OnStreamComplete() will not
  *    do anything if the error code is NS_BINDING_ABORTED, so we don't have to
  *    worry about keeping anything alive.
  *
  * The pump is always released on the main thread.
  */
 template <class Derived>
-class FetchBody {
+class FetchBody
+{
 public:
+  NS_INLINE_DECL_PURE_VIRTUAL_REFCOUNTING
+
   bool
   BodyUsed() const { return mBodyUsed; }
 
   already_AddRefed<Promise>
   ArrayBuffer(ErrorResult& aRv)
   {
     return ConsumeBody(CONSUME_ARRAYBUFFER, aRv);
   }
@@ -140,40 +142,38 @@ public:
   already_AddRefed<Promise>
   Text(ErrorResult& aRv)
   {
     return ConsumeBody(CONSUME_TEXT, aRv);
   }
 
   // Utility public methods accessed by various runnables.
   void
-  BeginConsumeBodyMainThread();
+  BeginConsumeBodyMainThread(FetchBodyWrapper<Derived>* aWrapper);
 
   void
-  ContinueConsumeBody(nsresult aStatus, uint32_t aLength, uint8_t* aResult);
+  ContinueConsumeBody(FetchBodyWrapper<Derived>* aWrapper, nsresult aStatus,
+                      uint32_t aLength, uint8_t* aResult);
 
   void
-  ContinueConsumeBlobBody(BlobImpl* aBlobImpl);
+  ContinueConsumeBlobBody(FetchBodyWrapper<Derived>* aWrapper,
+                          BlobImpl* aBlobImpl);
 
   void
   CancelPump();
 
   void
   SetBodyUsed()
   {
     mBodyUsed = true;
   }
 
   // Always set whenever the FetchBody is created on the worker thread.
   workers::WorkerPrivate* mWorkerPrivate;
 
-  // Set when consuming the body is attempted on a worker.
-  // Unset when consumption is done/aborted.
-  nsAutoPtr<workers::WorkerHolder> mWorkerHolder;
-
 protected:
   nsCOMPtr<nsIGlobalObject> mOwner;
 
   explicit FetchBody(nsIGlobalObject* aOwner);
 
   virtual ~FetchBody();
 
   void
@@ -196,28 +196,16 @@ private:
 
   nsresult
   BeginConsumeBody();
 
   already_AddRefed<Promise>
   ConsumeBody(ConsumeType aType, ErrorResult& aRv);
 
   bool
-  AddRefObject();
-
-  void
-  ReleaseObject();
-
-  bool
-  RegisterWorkerHolder();
-
-  void
-  UnregisterWorkerHolder();
-
-  bool
   IsOnTargetThread()
   {
     return NS_IsMainThread() == !mWorkerPrivate;
   }
 
   void
   AssertIsOnTargetThread()
   {
@@ -226,19 +214,18 @@ private:
 
   // Only ever set once, always on target thread.
   bool mBodyUsed;
   nsCString mMimeType;
 
   // Only touched on target thread.
   ConsumeType mConsumeType;
   RefPtr<Promise> mConsumePromise;
-#ifdef DEBUG
-  bool mReadDone;
-#endif
+
+  bool mBodyConsumed;
 
   nsMainThreadPtrHandle<nsIInputStreamPump> mConsumeBodyPump;
 
   // The main-thread event target for runnable dispatching.
   nsCOMPtr<nsIEventTarget> mMainThreadEventTarget;
 };
 
 } // namespace dom
--- a/dom/workers/ServiceWorkerEvents.h
+++ b/dom/workers/ServiceWorkerEvents.h
@@ -10,16 +10,17 @@
 #include "mozilla/dom/Event.h"
 #include "mozilla/dom/ExtendableEventBinding.h"
 #include "mozilla/dom/ExtendableMessageEventBinding.h"
 #include "mozilla/dom/FetchEventBinding.h"
 #include "mozilla/dom/File.h"
 #include "mozilla/dom/Promise.h"
 #include "mozilla/dom/Response.h"
 #include "mozilla/dom/workers/bindings/ServiceWorker.h"
+#include "mozilla/dom/workers/Workers.h"
 
 #include "nsProxyRelease.h"
 #include "nsContentUtils.h"
 
 class nsIInterceptedChannel;
 
 namespace mozilla {
 namespace dom {