Bug 1373555 - Move the Fetch consume body login in a separate class - part 3 - Move the consuming body logic from FetchBody to FetchBodyConsumer, r=bkelly
authorAndrea Marchesini <amarchesini@mozilla.com>
Tue, 20 Jun 2017 17:53:21 +0200
changeset 364944 f5b120a0b4803033a50a51280feef824021ba574
parent 364943 0092bff5f1c51d3a21300105a973d111db40f5c5
child 364945 5fc8b8976a9892b14c4187a5a133214fcc8f19d5
push id32057
push userkwierso@gmail.com
push dateWed, 21 Jun 2017 00:59:08 +0000
treeherdermozilla-central@f31652d75fb5 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersbkelly
bugs1373555
milestone56.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1373555 - Move the Fetch consume body login in a separate class - part 3 - Move the consuming body logic from FetchBody to FetchBodyConsumer, r=bkelly
dom/fetch/Fetch.cpp
dom/fetch/Fetch.h
dom/fetch/FetchConsumer.cpp
dom/fetch/FetchConsumer.h
dom/workers/ServiceWorkerScriptCache.cpp
dom/worklet/Worklet.cpp
--- a/dom/fetch/Fetch.cpp
+++ b/dom/fetch/Fetch.cpp
@@ -830,293 +830,29 @@ ExtractByteStreamFromBody(const fetch::B
     return body.GetAsStream(aStream, &aContentLength, aContentTypeWithCharset,
                             charset);
   }
 
   NS_NOTREACHED("Should never reach here");
   return NS_ERROR_FAILURE;
 }
 
-namespace {
-/*
- * Called on successfully reading the complete stream.
- */
-template <class Derived>
-class ContinueConsumeBodyRunnable final : public MainThreadWorkerRunnable
-{
-  RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
-  nsresult mStatus;
-  uint32_t mLength;
-  uint8_t* mResult;
-
-public:
-  ContinueConsumeBodyRunnable(FetchBodyConsumer<Derived>* aFetchBodyConsumer,
-                              nsresult aStatus, uint32_t aLength,
-                              uint8_t* aResult)
-    : MainThreadWorkerRunnable(aFetchBodyConsumer->Body()->mWorkerPrivate)
-    , mFetchBodyConsumer(aFetchBodyConsumer)
-    , mStatus(aStatus)
-    , mLength(aLength)
-    , mResult(aResult)
-  {
-    MOZ_ASSERT(NS_IsMainThread());
-  }
-
-  bool
-  WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
-  {
-    mFetchBodyConsumer->Body()->ContinueConsumeBody(mFetchBodyConsumer, mStatus,
-                                                    mLength, mResult);
-    return true;
-  }
-};
-
-/*
- * Called on successfully reading the complete stream for Blob.
- */
-template <class Derived>
-class ContinueConsumeBlobBodyRunnable final : public MainThreadWorkerRunnable
-{
-  RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
-  RefPtr<BlobImpl> mBlobImpl;
-
-public:
-  ContinueConsumeBlobBodyRunnable(FetchBodyConsumer<Derived>* aFetchBodyConsumer,
-                                  BlobImpl* aBlobImpl)
-    : MainThreadWorkerRunnable(aFetchBodyConsumer->Body()->mWorkerPrivate)
-    , mFetchBodyConsumer(aFetchBodyConsumer)
-    , mBlobImpl(aBlobImpl)
-  {
-    MOZ_ASSERT(NS_IsMainThread());
-    MOZ_ASSERT(mBlobImpl);
-  }
-
-  bool
-  WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
-  {
-    mFetchBodyConsumer->Body()->ContinueConsumeBlobBody(mFetchBodyConsumer,
-                                                        mBlobImpl);
-    return true;
-  }
-};
-
-template <class Derived>
-class FailConsumeBodyWorkerRunnable : public MainThreadWorkerControlRunnable
-{
-  RefPtr<FetchBodyConsumer<Derived>> mBodyConsumer;
-
-public:
-  explicit FailConsumeBodyWorkerRunnable(FetchBodyConsumer<Derived>* aBodyConsumer)
-    : MainThreadWorkerControlRunnable(aBodyConsumer->Body()->mWorkerPrivate)
-    , mBodyConsumer(aBodyConsumer)
-  {
-    AssertIsOnMainThread();
-  }
-
-  bool
-  WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
-  {
-    mBodyConsumer->Body()->ContinueConsumeBody(mBodyConsumer, NS_ERROR_FAILURE,
-                                               0, nullptr);
-    return true;
-  }
-};
-
-/*
- * In case of failure to create a stream pump or dispatch stream completion to
- * worker, ensure we cleanup properly. Thread agnostic.
- */
-template <class Derived>
-class MOZ_STACK_CLASS AutoFailConsumeBody final
-{
-  RefPtr<FetchBodyConsumer<Derived>> mBodyConsumer;
-
-public:
-  explicit AutoFailConsumeBody(FetchBodyConsumer<Derived>* aBodyConsumer)
-    : mBodyConsumer(aBodyConsumer)
-  {}
-
-  ~AutoFailConsumeBody()
-  {
-    AssertIsOnMainThread();
-
-    if (mBodyConsumer) {
-      if (mBodyConsumer->Body()->mWorkerPrivate) {
-        RefPtr<FailConsumeBodyWorkerRunnable<Derived>> r =
-          new FailConsumeBodyWorkerRunnable<Derived>(mBodyConsumer);
-        if (!r->Dispatch()) {
-          MOZ_CRASH("We are going to leak");
-        }
-      } else {
-        mBodyConsumer->Body()->ContinueConsumeBody(mBodyConsumer,
-                                                   NS_ERROR_FAILURE, 0,
-                                                   nullptr);
-      }
-    }
-  }
-
-  void
-  DontFail()
-  {
-    mBodyConsumer = nullptr;
-  }
-};
-
-template <class Derived>
-class ConsumeBodyDoneObserver : public nsIStreamLoaderObserver
-                              , public MutableBlobStorageCallback
-{
-  RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
-
-public:
-  NS_DECL_THREADSAFE_ISUPPORTS
-
-  explicit ConsumeBodyDoneObserver(FetchBodyConsumer<Derived>* aFetchBodyConsumer)
-    : mFetchBodyConsumer(aFetchBodyConsumer)
-  { }
-
-  NS_IMETHOD
-  OnStreamComplete(nsIStreamLoader* aLoader,
-                   nsISupports* aCtxt,
-                   nsresult aStatus,
-                   uint32_t aResultLength,
-                   const uint8_t* aResult) override
-  {
-    MOZ_ASSERT(NS_IsMainThread());
-
-    // If the binding requested cancel, we don't need to call
-    // ContinueConsumeBody, since that is the originator.
-    if (aStatus == NS_BINDING_ABORTED) {
-      return NS_OK;
-    }
-
-    uint8_t* nonconstResult = const_cast<uint8_t*>(aResult);
-    if (mFetchBodyConsumer->Body()->mWorkerPrivate) {
-      RefPtr<ContinueConsumeBodyRunnable<Derived>> r =
-        new ContinueConsumeBodyRunnable<Derived>(mFetchBodyConsumer,
-                                                 aStatus,
-                                                 aResultLength,
-                                                 nonconstResult);
-      if (!r->Dispatch()) {
-        // XXXcatalinb: The worker is shutting down, the pump will be canceled
-        // by FetchBodyWorkerHolder::Notify.
-        NS_WARNING("Could not dispatch ConsumeBodyRunnable");
-        // Return failure so that aResult is freed.
-        return NS_ERROR_FAILURE;
-      }
-    } else {
-      mFetchBodyConsumer->Body()->ContinueConsumeBody(mFetchBodyConsumer,
-                                                      aStatus, aResultLength,
-                                                      nonconstResult);
-    }
-
-    // FetchBody is responsible for data.
-    return NS_SUCCESS_ADOPTED_DATA;
-  }
-
-  virtual void BlobStoreCompleted(MutableBlobStorage* aBlobStorage,
-                                  Blob* aBlob,
-                                  nsresult aRv) override
-  {
-    // On error.
-    if (NS_FAILED(aRv)) {
-      OnStreamComplete(nullptr, nullptr, aRv, 0, nullptr);
-      return;
-    }
-
-    MOZ_ASSERT(aBlob);
-
-    if (mFetchBodyConsumer->Body()->mWorkerPrivate) {
-      RefPtr<ContinueConsumeBlobBodyRunnable<Derived>> r =
-        new ContinueConsumeBlobBodyRunnable<Derived>(mFetchBodyConsumer,
-                                                     aBlob->Impl());
-
-      if (!r->Dispatch()) {
-        NS_WARNING("Could not dispatch ConsumeBlobBodyRunnable");
-        return;
-      }
-    } else {
-      mFetchBodyConsumer->Body()->ContinueConsumeBlobBody(mFetchBodyConsumer,
-                                                          aBlob->Impl());
-    }
-  }
-
-private:
-  virtual ~ConsumeBodyDoneObserver()
-  { }
-};
-
-template <class Derived>
-NS_IMPL_ADDREF(ConsumeBodyDoneObserver<Derived>)
-template <class Derived>
-NS_IMPL_RELEASE(ConsumeBodyDoneObserver<Derived>)
-template <class Derived>
-NS_INTERFACE_MAP_BEGIN(ConsumeBodyDoneObserver<Derived>)
-  NS_INTERFACE_MAP_ENTRY(nsIStreamLoaderObserver)
-  NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIStreamLoaderObserver)
-NS_INTERFACE_MAP_END
-
-template <class Derived>
-class BeginConsumeBodyRunnable final : public Runnable
-{
-  RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
-
-public:
-  explicit BeginConsumeBodyRunnable(FetchBodyConsumer<Derived>* aConsumer)
-    : mFetchBodyConsumer(aConsumer)
-  { }
-
-  NS_IMETHOD
-  Run() override
-  {
-    mFetchBodyConsumer->Body()->BeginConsumeBodyMainThread(mFetchBodyConsumer);
-    return NS_OK;
-  }
-};
-
-template <class Derived>
-class CancelPumpRunnable final : public WorkerMainThreadRunnable
-{
-  // This is a sync runnable. What dispatches this runnable must keep the body
-  // alive.
-  FetchBody<Derived>* mBody;
-
-public:
-  explicit CancelPumpRunnable(FetchBody<Derived>* aBody)
-    : WorkerMainThreadRunnable(aBody->mWorkerPrivate,
-                               NS_LITERAL_CSTRING("Fetch :: Cancel Pump"))
-    , mBody(aBody)
-  {}
-
-  bool
-  MainThreadRun() override
-  {
-    mBody->CancelPump();
-    return true;
-  }
-};
-
-} // namespace
-
 template <class Derived>
 FetchBody<Derived>::FetchBody(nsIGlobalObject* aOwner)
   : mOwner(aOwner)
+  , mWorkerPrivate(nullptr)
   , mBodyUsed(false)
-#ifdef DEBUG
-  , mReadDone(false)
-#endif
 {
   MOZ_ASSERT(aOwner);
 
   if (!NS_IsMainThread()) {
     mWorkerPrivate = GetCurrentThreadWorkerPrivate();
     MOZ_ASSERT(mWorkerPrivate);
     mMainThreadEventTarget = mWorkerPrivate->MainThreadEventTarget();
   } else {
-    mWorkerPrivate = nullptr;
     mMainThreadEventTarget = aOwner->EventTargetFor(TaskCategory::Other);
   }
 
   MOZ_ASSERT(mMainThreadEventTarget);
 }
 
 template
 FetchBody<Request>::FetchBody(nsIGlobalObject* aOwner);
@@ -1125,344 +861,44 @@ template
 FetchBody<Response>::FetchBody(nsIGlobalObject* aOwner);
 
 template <class Derived>
 FetchBody<Derived>::~FetchBody()
 {
 }
 
 template <class Derived>
-void
-FetchBody<Derived>::CancelPump()
-{
-  AssertIsOnMainThread();
-  MOZ_ASSERT(mConsumeBodyPump);
-  mConsumeBodyPump->Cancel(NS_BINDING_ABORTED);
-}
-
-// Return value is used by ConsumeBody to bubble the error code up to WebIDL so
-// mConsumePromise doesn't have to be rejected on early exit.
-template <class Derived>
-nsresult
-FetchBody<Derived>::BeginConsumeBody()
-{
-  AssertIsOnTargetThread();
-  MOZ_ASSERT(mConsumePromise);
-
-  // The FetchBody is not thread-safe refcounted. We wrap it with a thread-safe
-  // object able to keep the current worker alive (if we are running in a
-  // worker).
-  RefPtr<FetchBodyConsumer<Derived>> consumer =
-    FetchBodyConsumer<Derived>::Create(this);
-  if (!consumer) {
-    return NS_ERROR_FAILURE;
-  }
-
-  nsCOMPtr<nsIRunnable> r = new BeginConsumeBodyRunnable<Derived>(consumer);
-  nsresult rv = mMainThreadEventTarget->Dispatch(r.forget(), NS_DISPATCH_NORMAL);
-  if (NS_WARN_IF(NS_FAILED(rv))) {
-    return rv;
-  }
-  return NS_OK;
-}
-
-/*
- * BeginConsumeBodyMainThread() will automatically reject the consume promise
- * and clean up on any failures, so there is no need for callers to do so,
- * reflected in a lack of error return code.
- */
-template <class Derived>
-void
-FetchBody<Derived>::BeginConsumeBodyMainThread(FetchBodyConsumer<Derived>* aConsumer)
-{
-  AssertIsOnMainThread();
-
-  AutoFailConsumeBody<Derived> autoReject(aConsumer);
-
-  nsresult rv;
-  nsCOMPtr<nsIInputStream> stream;
-  DerivedClass()->GetBody(getter_AddRefs(stream));
-  if (!stream) {
-    rv = NS_NewCStringInputStream(getter_AddRefs(stream), EmptyCString());
-    if (NS_WARN_IF(NS_FAILED(rv))) {
-      return;
-    }
-  }
-
-  nsCOMPtr<nsIInputStreamPump> pump;
-  rv = NS_NewInputStreamPump(getter_AddRefs(pump),
-                             stream, -1, -1, 0, 0, false,
-                             mMainThreadEventTarget);
-  if (NS_WARN_IF(NS_FAILED(rv))) {
-    return;
-  }
-
-  RefPtr<ConsumeBodyDoneObserver<Derived>> p =
-   new ConsumeBodyDoneObserver<Derived>(aConsumer);
-
-  nsCOMPtr<nsIStreamListener> listener;
-  if (mConsumeType == CONSUME_BLOB) {
-    MutableBlobStorage::MutableBlobStorageType type =
-      MutableBlobStorage::eOnlyInMemory;
-
-    const mozilla::UniquePtr<mozilla::ipc::PrincipalInfo>& principalInfo =
-      DerivedClass()->GetPrincipalInfo();
-    // We support temporary file for blobs only if the principal is known and
-    // it's system or content not in private Browsing.
-    if (principalInfo &&
-        (principalInfo->type() == mozilla::ipc::PrincipalInfo::TSystemPrincipalInfo ||
-         (principalInfo->type() == mozilla::ipc::PrincipalInfo::TContentPrincipalInfo &&
-          principalInfo->get_ContentPrincipalInfo().attrs().mPrivateBrowsingId == 0))) {
-      type = MutableBlobStorage::eCouldBeInTemporaryFile;
-    }
-
-    listener = new MutableBlobStreamListener(type, nullptr, mMimeType, p,
-                                             mMainThreadEventTarget);
-  } else {
-    nsCOMPtr<nsIStreamLoader> loader;
-    rv = NS_NewStreamLoader(getter_AddRefs(loader), p);
-    if (NS_WARN_IF(NS_FAILED(rv))) {
-      return;
-    }
-
-    listener = loader;
-  }
-
-  rv = pump->AsyncRead(listener, nullptr);
-  if (NS_WARN_IF(NS_FAILED(rv))) {
-    return;
-  }
-
-  // Now that everything succeeded, we can assign the pump to a pointer that
-  // stays alive for the lifetime of the FetchBody.
-  mConsumeBodyPump =
-    new nsMainThreadPtrHolder<nsIInputStreamPump>(
-      "FetchBody::mConsumeBodyPump", pump, mMainThreadEventTarget);
-  // It is ok for retargeting to fail and reads to happen on the main thread.
-  autoReject.DontFail();
-
-  // Try to retarget, otherwise fall back to main thread.
-  nsCOMPtr<nsIThreadRetargetableRequest> rr = do_QueryInterface(pump);
-  if (rr) {
-    nsCOMPtr<nsIEventTarget> sts = do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID);
-    rv = rr->RetargetDeliveryTo(sts);
-    if (NS_WARN_IF(NS_FAILED(rv))) {
-      NS_WARNING("Retargeting failed");
-    }
-  }
-}
-
-template <class Derived>
-void
-FetchBody<Derived>::ContinueConsumeBody(FetchBodyConsumer<Derived>* aBodyConsumer,
-                                        nsresult aStatus, uint32_t aResultLength,
-                                        uint8_t* aResult)
-{
-  AssertIsOnTargetThread();
-  // Just a precaution to ensure ContinueConsumeBody is not called out of
-  // sync with a body read.
-  MOZ_ASSERT(mBodyUsed);
-  MOZ_ASSERT(!mReadDone);
-#ifdef DEBUG
-  mReadDone = true;
-#endif
-
-  auto autoFree = mozilla::MakeScopeExit([&] {
-    free(aResult);
-  });
-
-  MOZ_ASSERT(mConsumePromise);
-  RefPtr<Promise> localPromise = mConsumePromise.forget();
-
-  auto autoReleaseObject = mozilla::MakeScopeExit([&] {
-    aBodyConsumer->ReleaseObject();
-  });
-
-  if (NS_WARN_IF(NS_FAILED(aStatus))) {
-    localPromise->MaybeReject(NS_ERROR_DOM_ABORT_ERR);
-
-    // If binding aborted, cancel the pump. We can't assert mConsumeBodyPump.
-    // In the (admittedly rare) situation that BeginConsumeBodyMainThread()
-    // context switches out, and the worker thread gets canceled before the
-    // pump is setup, mConsumeBodyPump will be null.
-    // We've to use the !! form since non-main thread pointer access on
-    // a nsMainThreadPtrHandle is not permitted.
-    if (aStatus == NS_BINDING_ABORTED && !!mConsumeBodyPump) {
-      if (NS_IsMainThread()) {
-        CancelPump();
-      } else {
-        MOZ_ASSERT(mWorkerPrivate);
-        // In case of worker thread, we block the worker while the request is
-        // canceled on the main thread. This ensures that OnStreamComplete has
-        // a valid FetchBody around to call CancelPump and we don't release the
-        // FetchBody on the main thread.
-        RefPtr<CancelPumpRunnable<Derived>> r =
-          new CancelPumpRunnable<Derived>(aBodyConsumer->Body());
-        ErrorResult rv;
-        r->Dispatch(Terminating, rv);
-        if (rv.Failed()) {
-          NS_WARNING("Could not dispatch CancelPumpRunnable. Nothing we can do here");
-          // None of our callers are callled directly from JS, so there is no
-          // point in trying to propagate this failure out of here.  And
-          // localPromise is already rejected.  Just suppress the failure.
-          rv.SuppressException();
-        }
-      }
-    }
-  }
-
-  // Release the pump and then early exit if there was an error.
-  // Uses NS_ProxyRelease internally, so this is safe.
-  mConsumeBodyPump = nullptr;
-
-  // Don't warn here since we warned above.
-  if (NS_FAILED(aStatus)) {
-    return;
-  }
-
-  // Finish successfully consuming body according to type.
-  MOZ_ASSERT(aResult);
-
-  AutoJSAPI jsapi;
-  if (!jsapi.Init(aBodyConsumer->Body()->DerivedClass()->GetParentObject())) {
-    localPromise->MaybeReject(NS_ERROR_UNEXPECTED);
-    return;
-  }
-
-  JSContext* cx = jsapi.cx();
-  ErrorResult error;
-
-  switch (mConsumeType) {
-    case CONSUME_ARRAYBUFFER: {
-      JS::Rooted<JSObject*> arrayBuffer(cx);
-      BodyUtil::ConsumeArrayBuffer(cx, &arrayBuffer, aResultLength, aResult,
-                                   error);
-
-      if (!error.Failed()) {
-        JS::Rooted<JS::Value> val(cx);
-        val.setObjectOrNull(arrayBuffer);
-
-        localPromise->MaybeResolve(cx, val);
-        // ArrayBuffer takes over ownership.
-        autoFree.release();
-      }
-      break;
-    }
-    case CONSUME_BLOB: {
-      MOZ_CRASH("This should not happen.");
-      break;
-    }
-    case CONSUME_FORMDATA: {
-      nsCString data;
-      data.Adopt(reinterpret_cast<char*>(aResult), aResultLength);
-      autoFree.release();
-
-      RefPtr<dom::FormData> fd = BodyUtil::ConsumeFormData(
-        aBodyConsumer->Body()->DerivedClass()->GetParentObject(),
-        mMimeType, data, error);
-      if (!error.Failed()) {
-        localPromise->MaybeResolve(fd);
-      }
-      break;
-    }
-    case CONSUME_TEXT:
-      // fall through handles early exit.
-    case CONSUME_JSON: {
-      nsString decoded;
-      if (NS_SUCCEEDED(BodyUtil::ConsumeText(aResultLength, aResult, decoded))) {
-        if (mConsumeType == CONSUME_TEXT) {
-          localPromise->MaybeResolve(decoded);
-        } else {
-          JS::Rooted<JS::Value> json(cx);
-          BodyUtil::ConsumeJson(cx, &json, decoded, error);
-          if (!error.Failed()) {
-            localPromise->MaybeResolve(cx, json);
-          }
-        }
-      };
-      break;
-    }
-    default:
-      NS_NOTREACHED("Unexpected consume body type");
-  }
-
-  error.WouldReportJSException();
-  if (error.Failed()) {
-    localPromise->MaybeReject(error);
-  }
-}
-
-template <class Derived>
-void
-FetchBody<Derived>::ContinueConsumeBlobBody(FetchBodyConsumer<Derived>* aBodyConsumer,
-                                            BlobImpl* aBlobImpl)
-{
-  AssertIsOnTargetThread();
-  // Just a precaution to ensure ContinueConsumeBody is not called out of
-  // sync with a body read.
-  MOZ_ASSERT(mBodyUsed);
-  MOZ_ASSERT(!mReadDone);
-  MOZ_ASSERT(mConsumeType == CONSUME_BLOB);
-#ifdef DEBUG
-  mReadDone = true;
-#endif
-
-  MOZ_ASSERT(mConsumePromise);
-  RefPtr<Promise> localPromise = mConsumePromise.forget();
-
-  auto autoReleaseObject = mozilla::MakeScopeExit([&] {
-    aBodyConsumer->ReleaseObject();
-  });
-
-  // Release the pump and then early exit if there was an error.
-  // Uses NS_ProxyRelease internally, so this is safe.
-  mConsumeBodyPump = nullptr;
-
-  RefPtr<dom::Blob> blob =
-    dom::Blob::Create(aBodyConsumer->Body()->DerivedClass()->GetParentObject(),
-                      aBlobImpl);
-  MOZ_ASSERT(blob);
-
-  localPromise->MaybeResolve(blob);
-}
-
-template <class Derived>
 already_AddRefed<Promise>
-FetchBody<Derived>::ConsumeBody(ConsumeType aType, ErrorResult& aRv)
+FetchBody<Derived>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv)
 {
   if (BodyUsed()) {
     aRv.ThrowTypeError<MSG_FETCH_BODY_CONSUMED_ERROR>();
     return nullptr;
   }
 
-  mConsumeType = aType;
   SetBodyUsed();
 
-  mConsumePromise = Promise::Create(DerivedClass()->GetParentObject(), aRv);
-  if (aRv.Failed()) {
+  RefPtr<Promise> promise =
+    FetchBodyConsumer<Derived>::Create(DerivedClass()->GetParentObject(),
+                                       mMainThreadEventTarget, this, aType,
+                                       aRv);
+  if (NS_WARN_IF(aRv.Failed())) {
     return nullptr;
   }
 
-  aRv = BeginConsumeBody();
-  if (NS_WARN_IF(aRv.Failed())) {
-    mConsumePromise = nullptr;
-    return nullptr;
-  }
-
-  RefPtr<Promise> promise = mConsumePromise;
   return promise.forget();
 }
 
 template
 already_AddRefed<Promise>
-FetchBody<Request>::ConsumeBody(ConsumeType aType, ErrorResult& aRv);
+FetchBody<Request>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv);
 
 template
 already_AddRefed<Promise>
-FetchBody<Response>::ConsumeBody(ConsumeType aType, ErrorResult& aRv);
+FetchBody<Response>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv);
 
 template <class Derived>
 void
 FetchBody<Derived>::SetMimeType()
 {
   // Extract mime type.
   ErrorResult result;
   nsCString contentTypeValues;
--- a/dom/fetch/Fetch.h
+++ b/dom/fetch/Fetch.h
@@ -3,17 +3,16 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef mozilla_dom_Fetch_h
 #define mozilla_dom_Fetch_h
 
 #include "nsAutoPtr.h"
-#include "nsIInputStreamPump.h"
 #include "nsIStreamLoader.h"
 
 #include "nsCOMPtr.h"
 #include "nsError.h"
 #include "nsProxyRelease.h"
 #include "nsString.h"
 
 #include "mozilla/DebugOnly.h"
@@ -68,16 +67,25 @@ ExtractByteStreamFromBody(const fetch::O
 nsresult
 ExtractByteStreamFromBody(const fetch::BodyInit& aBodyInit,
                           nsIInputStream** aStream,
                           nsCString& aContentType,
                           uint64_t& aContentLength);
 
 template <class Derived> class FetchBodyConsumer;
 
+enum FetchConsumeType
+{
+  CONSUME_ARRAYBUFFER,
+  CONSUME_BLOB,
+  CONSUME_FORMDATA,
+  CONSUME_JSON,
+  CONSUME_TEXT,
+};
+
 /*
  * FetchBody's body consumption uses nsIInputStreamPump to read from the
  * underlying stream to a block of memory, which is then adopted by
  * ContinueConsumeBody() and converted to the right type based on the JS
  * function called.
  *
  * Use of the nsIInputStreamPump complicates things on the worker thread.
  * The solution used here is similar to WebSockets.
@@ -105,16 +113,18 @@ template <class Derived> class FetchBody
  *    worry about keeping anything alive.
  *
  * The pump is always released on the main thread.
  */
 template <class Derived>
 class FetchBody
 {
 public:
+  friend class FetchBodyConsumer<Derived>;
+
   NS_INLINE_DECL_PURE_VIRTUAL_REFCOUNTING
 
   bool
   BodyUsed() const { return mBodyUsed; }
 
   already_AddRefed<Promise>
   ArrayBuffer(ErrorResult& aRv)
   {
@@ -141,69 +151,51 @@ public:
 
   already_AddRefed<Promise>
   Text(ErrorResult& aRv)
   {
     return ConsumeBody(CONSUME_TEXT, aRv);
   }
 
   // Utility public methods accessed by various runnables.
-  void
-  BeginConsumeBodyMainThread(FetchBodyConsumer<Derived>* aConsumer);
-
-  void
-  ContinueConsumeBody(FetchBodyConsumer<Derived>* aConsumer, nsresult aStatus,
-                      uint32_t aLength, uint8_t* aResult);
-
-  void
-  ContinueConsumeBlobBody(FetchBodyConsumer<Derived>* aConsumer,
-                          BlobImpl* aBlobImpl);
-
-  void
-  CancelPump();
 
   void
   SetBodyUsed()
   {
     mBodyUsed = true;
   }
 
-  // Always set whenever the FetchBody is created on the worker thread.
-  workers::WorkerPrivate* mWorkerPrivate;
+  const nsCString&
+  MimeType() const
+  {
+    return mMimeType;
+  }
 
 protected:
   nsCOMPtr<nsIGlobalObject> mOwner;
 
+  // Always set whenever the FetchBody is created on the worker thread.
+  workers::WorkerPrivate* mWorkerPrivate;
+
   explicit FetchBody(nsIGlobalObject* aOwner);
 
   virtual ~FetchBody();
 
   void
   SetMimeType();
+
 private:
-  enum ConsumeType
-  {
-    CONSUME_ARRAYBUFFER,
-    CONSUME_BLOB,
-    CONSUME_FORMDATA,
-    CONSUME_JSON,
-    CONSUME_TEXT,
-  };
-
   Derived*
   DerivedClass() const
   {
     return static_cast<Derived*>(const_cast<FetchBody*>(this));
   }
 
-  nsresult
-  BeginConsumeBody();
-
   already_AddRefed<Promise>
-  ConsumeBody(ConsumeType aType, ErrorResult& aRv);
+  ConsumeBody(FetchConsumeType aType, ErrorResult& aRv);
 
   bool
   IsOnTargetThread()
   {
     return NS_IsMainThread() == !mWorkerPrivate;
   }
 
   void
@@ -211,25 +203,16 @@ private:
   {
     MOZ_ASSERT(IsOnTargetThread());
   }
 
   // Only ever set once, always on target thread.
   bool mBodyUsed;
   nsCString mMimeType;
 
-  // Only touched on target thread.
-  ConsumeType mConsumeType;
-  RefPtr<Promise> mConsumePromise;
-#ifdef DEBUG
-  bool mReadDone;
-#endif
-
-  nsMainThreadPtrHandle<nsIInputStreamPump> mConsumeBodyPump;
-
   // The main-thread event target for runnable dispatching.
   nsCOMPtr<nsIEventTarget> mMainThreadEventTarget;
 };
 
 } // namespace dom
 } // namespace mozilla
 
 #endif // mozilla_dom_Fetch_h
--- a/dom/fetch/FetchConsumer.cpp
+++ b/dom/fetch/FetchConsumer.cpp
@@ -2,16 +2,17 @@
 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "Fetch.h"
 #include "FetchConsumer.h"
 
+#include "nsIInputStreamPump.h"
 #include "nsProxyRelease.h"
 #include "WorkerPrivate.h"
 #include "WorkerRunnable.h"
 #include "WorkerScope.h"
 #include "Workers.h"
 
 namespace mozilla {
 namespace dom {
@@ -38,67 +39,353 @@ public:
 
   bool Notify(workers::Status aStatus) override
   {
     MOZ_ASSERT(aStatus > workers::Running);
     if (!mWasNotified) {
       mWasNotified = true;
       // This will probably cause the releasing of the consumer.
       // The WorkerHolder will be released as well.
-      mConsumer->Body()->ContinueConsumeBody(mConsumer, NS_BINDING_ABORTED, 0,
-                                             nullptr);
+      mConsumer->ContinueConsumeBody(NS_BINDING_ABORTED, 0, nullptr);
     }
 
     return true;
   }
 };
 
+template <class Derived>
+class BeginConsumeBodyRunnable final : public Runnable
+{
+  RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
+
+public:
+  explicit BeginConsumeBodyRunnable(FetchBodyConsumer<Derived>* aConsumer)
+    : mFetchBodyConsumer(aConsumer)
+  { }
+
+  NS_IMETHOD
+  Run() override
+  {
+    mFetchBodyConsumer->BeginConsumeBodyMainThread();
+    return NS_OK;
+  }
+};
+
+/*
+ * Called on successfully reading the complete stream.
+ */
+template <class Derived>
+class ContinueConsumeBodyRunnable final : public MainThreadWorkerRunnable
+{
+  RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
+  nsresult mStatus;
+  uint32_t mLength;
+  uint8_t* mResult;
+
+public:
+  ContinueConsumeBodyRunnable(FetchBodyConsumer<Derived>* aFetchBodyConsumer,
+                              nsresult aStatus, uint32_t aLength,
+                              uint8_t* aResult)
+    : MainThreadWorkerRunnable(aFetchBodyConsumer->GetWorkerPrivate())
+    , mFetchBodyConsumer(aFetchBodyConsumer)
+    , mStatus(aStatus)
+    , mLength(aLength)
+    , mResult(aResult)
+  {
+    MOZ_ASSERT(NS_IsMainThread());
+  }
+
+  bool
+  WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
+  {
+    mFetchBodyConsumer->ContinueConsumeBody(mStatus, mLength, mResult);
+    return true;
+  }
+};
+
+template <class Derived>
+class FailConsumeBodyWorkerRunnable : public MainThreadWorkerControlRunnable
+{
+  RefPtr<FetchBodyConsumer<Derived>> mBodyConsumer;
+
+public:
+  explicit FailConsumeBodyWorkerRunnable(FetchBodyConsumer<Derived>* aBodyConsumer)
+    : MainThreadWorkerControlRunnable(aBodyConsumer->GetWorkerPrivate())
+    , mBodyConsumer(aBodyConsumer)
+  {
+    AssertIsOnMainThread();
+  }
+
+  bool
+  WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
+  {
+    mBodyConsumer->ContinueConsumeBody(NS_ERROR_FAILURE, 0, nullptr);
+    return true;
+  }
+};
+
+/*
+ * In case of failure to create a stream pump or dispatch stream completion to
+ * worker, ensure we cleanup properly. Thread agnostic.
+ */
+template <class Derived>
+class MOZ_STACK_CLASS AutoFailConsumeBody final
+{
+  RefPtr<FetchBodyConsumer<Derived>> mBodyConsumer;
+
+public:
+  explicit AutoFailConsumeBody(FetchBodyConsumer<Derived>* aBodyConsumer)
+    : mBodyConsumer(aBodyConsumer)
+  {}
+
+  ~AutoFailConsumeBody()
+  {
+    AssertIsOnMainThread();
+
+    if (mBodyConsumer) {
+      if (mBodyConsumer->GetWorkerPrivate()) {
+        RefPtr<FailConsumeBodyWorkerRunnable<Derived>> r =
+          new FailConsumeBodyWorkerRunnable<Derived>(mBodyConsumer);
+        if (!r->Dispatch()) {
+          MOZ_CRASH("We are going to leak");
+        }
+      } else {
+        mBodyConsumer->ContinueConsumeBody(NS_ERROR_FAILURE, 0, nullptr);
+      }
+    }
+  }
+
+  void
+  DontFail()
+  {
+    mBodyConsumer = nullptr;
+  }
+};
+
+/*
+ * Called on successfully reading the complete stream for Blob.
+ */
+template <class Derived>
+class ContinueConsumeBlobBodyRunnable final : public MainThreadWorkerRunnable
+{
+  RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
+  RefPtr<BlobImpl> mBlobImpl;
+
+public:
+  ContinueConsumeBlobBodyRunnable(FetchBodyConsumer<Derived>* aFetchBodyConsumer,
+                                  BlobImpl* aBlobImpl)
+    : MainThreadWorkerRunnable(aFetchBodyConsumer->GetWorkerPrivate())
+    , mFetchBodyConsumer(aFetchBodyConsumer)
+    , mBlobImpl(aBlobImpl)
+  {
+    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(mBlobImpl);
+  }
+
+  bool
+  WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
+  {
+    mFetchBodyConsumer->ContinueConsumeBlobBody(mBlobImpl);
+    return true;
+  }
+};
+
+template <class Derived>
+class ConsumeBodyDoneObserver : public nsIStreamLoaderObserver
+                              , public MutableBlobStorageCallback
+{
+  RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
+
+public:
+  NS_DECL_THREADSAFE_ISUPPORTS
+
+  explicit ConsumeBodyDoneObserver(FetchBodyConsumer<Derived>* aFetchBodyConsumer)
+    : mFetchBodyConsumer(aFetchBodyConsumer)
+  { }
+
+  NS_IMETHOD
+  OnStreamComplete(nsIStreamLoader* aLoader,
+                   nsISupports* aCtxt,
+                   nsresult aStatus,
+                   uint32_t aResultLength,
+                   const uint8_t* aResult) override
+  {
+    MOZ_ASSERT(NS_IsMainThread());
+
+    // If the binding requested cancel, we don't need to call
+    // ContinueConsumeBody, since that is the originator.
+    if (aStatus == NS_BINDING_ABORTED) {
+      return NS_OK;
+    }
+
+    uint8_t* nonconstResult = const_cast<uint8_t*>(aResult);
+    if (mFetchBodyConsumer->GetWorkerPrivate()) {
+      RefPtr<ContinueConsumeBodyRunnable<Derived>> r =
+        new ContinueConsumeBodyRunnable<Derived>(mFetchBodyConsumer,
+                                                 aStatus,
+                                                 aResultLength,
+                                                 nonconstResult);
+      if (!r->Dispatch()) {
+        // XXXcatalinb: The worker is shutting down, the pump will be canceled
+        // by FetchBodyWorkerHolder::Notify.
+        NS_WARNING("Could not dispatch ConsumeBodyRunnable");
+        // Return failure so that aResult is freed.
+        return NS_ERROR_FAILURE;
+      }
+    } else {
+      mFetchBodyConsumer->ContinueConsumeBody(aStatus, aResultLength,
+                                              nonconstResult);
+    }
+
+    // FetchBody is responsible for data.
+    return NS_SUCCESS_ADOPTED_DATA;
+  }
+
+  virtual void BlobStoreCompleted(MutableBlobStorage* aBlobStorage,
+                                  Blob* aBlob,
+                                  nsresult aRv) override
+  {
+    // On error.
+    if (NS_FAILED(aRv)) {
+      OnStreamComplete(nullptr, nullptr, aRv, 0, nullptr);
+      return;
+    }
+
+    MOZ_ASSERT(aBlob);
+
+    if (mFetchBodyConsumer->GetWorkerPrivate()) {
+      RefPtr<ContinueConsumeBlobBodyRunnable<Derived>> r =
+        new ContinueConsumeBlobBodyRunnable<Derived>(mFetchBodyConsumer,
+                                                     aBlob->Impl());
+
+      if (!r->Dispatch()) {
+        NS_WARNING("Could not dispatch ConsumeBlobBodyRunnable");
+        return;
+      }
+    } else {
+      mFetchBodyConsumer->ContinueConsumeBlobBody(aBlob->Impl());
+    }
+  }
+
+private:
+  virtual ~ConsumeBodyDoneObserver()
+  { }
+};
+
+template <class Derived>
+NS_IMPL_ADDREF(ConsumeBodyDoneObserver<Derived>)
+template <class Derived>
+NS_IMPL_RELEASE(ConsumeBodyDoneObserver<Derived>)
+template <class Derived>
+NS_INTERFACE_MAP_BEGIN(ConsumeBodyDoneObserver<Derived>)
+  NS_INTERFACE_MAP_ENTRY(nsIStreamLoaderObserver)
+  NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIStreamLoaderObserver)
+NS_INTERFACE_MAP_END
+
+template <class Derived>
+class CancelPumpRunnable final : public WorkerMainThreadRunnable
+{
+  RefPtr<FetchBodyConsumer<Derived>> mBodyConsumer;
+
+public:
+  explicit CancelPumpRunnable(FetchBodyConsumer<Derived>* aBodyConsumer)
+    : WorkerMainThreadRunnable(aBodyConsumer->GetWorkerPrivate(),
+                               NS_LITERAL_CSTRING("Fetch :: Cancel Pump"))
+    , mBodyConsumer(aBodyConsumer)
+  {}
+
+  bool
+  MainThreadRun() override
+  {
+    mBodyConsumer->CancelPump();
+    return true;
+  }
+};
+
 } // anonymous
 
 template <class Derived>
-/* static */ already_AddRefed<FetchBodyConsumer<Derived>>
-FetchBodyConsumer<Derived>::Create(FetchBody<Derived>* aBody)
+/* static */ already_AddRefed<Promise>
+FetchBodyConsumer<Derived>::Create(nsIGlobalObject* aGlobal,
+                                   nsIEventTarget* aMainThreadEventTarget,
+                                   FetchBody<Derived>* aBody,
+                                   FetchConsumeType aType,
+                                   ErrorResult& aRv)
 {
   MOZ_ASSERT(aBody);
+  MOZ_ASSERT(aMainThreadEventTarget);
+
+  RefPtr<Promise> promise = Promise::Create(aGlobal, aRv);
+  if (aRv.Failed()) {
+    return nullptr;
+  }
+
+  WorkerPrivate* workerPrivate = nullptr;
+  if (!NS_IsMainThread()) {
+    workerPrivate = GetCurrentThreadWorkerPrivate();
+    MOZ_ASSERT(workerPrivate);
+  }
 
   RefPtr<FetchBodyConsumer<Derived>> consumer =
-    new FetchBodyConsumer<Derived>(aBody);
+    new FetchBodyConsumer<Derived>(aMainThreadEventTarget, workerPrivate,
+                                   aBody, promise, aType);
 
   if (!NS_IsMainThread()) {
-    WorkerPrivate* workerPrivate = GetCurrentThreadWorkerPrivate();
     MOZ_ASSERT(workerPrivate);
-
-    if (!consumer->RegisterWorkerHolder(workerPrivate)) {
+    if (NS_WARN_IF(!consumer->RegisterWorkerHolder(workerPrivate))) {
+      aRv.Throw(NS_ERROR_FAILURE);
       return nullptr;
     }
   }
 
-  return consumer.forget();
+  nsCOMPtr<nsIRunnable> r = new BeginConsumeBodyRunnable<Derived>(consumer);
+  aRv = aMainThreadEventTarget->Dispatch(r.forget(), NS_DISPATCH_NORMAL);
+  if (NS_WARN_IF(aRv.Failed())) {
+    return nullptr;
+  }
+
+  return promise.forget();
 }
 
 template <class Derived>
 void
 FetchBodyConsumer<Derived>::ReleaseObject()
 {
   AssertIsOnTargetThread();
 
   mWorkerHolder = nullptr;
   mBody = nullptr;
 }
 
 template <class Derived>
-FetchBodyConsumer<Derived>::FetchBodyConsumer(FetchBody<Derived>* aBody)
+FetchBodyConsumer<Derived>::FetchBodyConsumer(nsIEventTarget* aMainThreadEventTarget,
+                                              WorkerPrivate* aWorkerPrivate,
+                                              FetchBody<Derived>* aBody,
+                                              Promise* aPromise,
+                                              FetchConsumeType aType)
   : mTargetThread(NS_GetCurrentThread())
+  , mMainThreadEventTarget(aMainThreadEventTarget)
   , mBody(aBody)
-{}
+  , mWorkerPrivate(aWorkerPrivate)
+  , mConsumeType(aType)
+  , mConsumePromise(aPromise)
+#ifdef DEBUG
+  , mReadDone(false)
+#endif
+{
+  MOZ_ASSERT(aMainThreadEventTarget);
+  MOZ_ASSERT(aBody);
+  MOZ_ASSERT(aPromise);
+}
 
 template <class Derived>
 FetchBodyConsumer<Derived>::~FetchBodyConsumer()
 {
-  NS_ProxyRelease(mTargetThread, mBody.forget());
+  NS_ProxyRelease("FetchBodyConsumer::mBody",
+                  mTargetThread, mBody.forget());
 }
 
 template <class Derived>
 void
 FetchBodyConsumer<Derived>::AssertIsOnTargetThread() const
 {
   MOZ_ASSERT(NS_GetCurrentThread() == mTargetThread);
 }
@@ -117,10 +404,276 @@ FetchBodyConsumer<Derived>::RegisterWork
     NS_WARNING("Failed to add workerHolder");
     mWorkerHolder = nullptr;
     return false;
   }
 
   return true;
 }
 
+/*
+ * BeginConsumeBodyMainThread() will automatically reject the consume promise
+ * and clean up on any failures, so there is no need for callers to do so,
+ * reflected in a lack of error return code.
+ */
+template <class Derived>
+void
+FetchBodyConsumer<Derived>::BeginConsumeBodyMainThread()
+{
+  AssertIsOnMainThread();
+
+  AutoFailConsumeBody<Derived> autoReject(this);
+
+  nsresult rv;
+  nsCOMPtr<nsIInputStream> stream;
+  mBody->DerivedClass()->GetBody(getter_AddRefs(stream));
+  if (!stream) {
+    rv = NS_NewCStringInputStream(getter_AddRefs(stream), EmptyCString());
+    if (NS_WARN_IF(NS_FAILED(rv))) {
+      return;
+    }
+  }
+
+  nsCOMPtr<nsIInputStreamPump> pump;
+  rv = NS_NewInputStreamPump(getter_AddRefs(pump),
+                             stream, -1, -1, 0, 0, false,
+                             mMainThreadEventTarget);
+  if (NS_WARN_IF(NS_FAILED(rv))) {
+    return;
+  }
+
+  RefPtr<ConsumeBodyDoneObserver<Derived>> p =
+   new ConsumeBodyDoneObserver<Derived>(this);
+
+  nsCOMPtr<nsIStreamListener> listener;
+  if (mConsumeType == CONSUME_BLOB) {
+    MutableBlobStorage::MutableBlobStorageType type =
+      MutableBlobStorage::eOnlyInMemory;
+
+    const mozilla::UniquePtr<mozilla::ipc::PrincipalInfo>& principalInfo =
+      mBody->DerivedClass()->GetPrincipalInfo();
+    // We support temporary file for blobs only if the principal is known and
+    // it's system or content not in private Browsing.
+    if (principalInfo &&
+        (principalInfo->type() == mozilla::ipc::PrincipalInfo::TSystemPrincipalInfo ||
+         (principalInfo->type() == mozilla::ipc::PrincipalInfo::TContentPrincipalInfo &&
+          principalInfo->get_ContentPrincipalInfo().attrs().mPrivateBrowsingId == 0))) {
+      type = MutableBlobStorage::eCouldBeInTemporaryFile;
+    }
+
+    listener = new MutableBlobStreamListener(type, nullptr, mBody->MimeType(),
+                                             p, mMainThreadEventTarget);
+  } else {
+    nsCOMPtr<nsIStreamLoader> loader;
+    rv = NS_NewStreamLoader(getter_AddRefs(loader), p);
+    if (NS_WARN_IF(NS_FAILED(rv))) {
+      return;
+    }
+
+    listener = loader;
+  }
+
+  rv = pump->AsyncRead(listener, nullptr);
+  if (NS_WARN_IF(NS_FAILED(rv))) {
+    return;
+  }
+
+  // Now that everything succeeded, we can assign the pump to a pointer that
+  // stays alive for the lifetime of the FetchBody.
+  mConsumeBodyPump =
+    new nsMainThreadPtrHolder<nsIInputStreamPump>("FetchBodyConsumer::mConsumeBodyPump",
+                                                  pump, mMainThreadEventTarget);
+  // It is ok for retargeting to fail and reads to happen on the main thread.
+  autoReject.DontFail();
+
+  // Try to retarget, otherwise fall back to main thread.
+  nsCOMPtr<nsIThreadRetargetableRequest> rr = do_QueryInterface(pump);
+  if (rr) {
+    nsCOMPtr<nsIEventTarget> sts = do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID);
+    rv = rr->RetargetDeliveryTo(sts);
+    if (NS_WARN_IF(NS_FAILED(rv))) {
+      NS_WARNING("Retargeting failed");
+    }
+  }
+}
+
+template <class Derived>
+void
+FetchBodyConsumer<Derived>::ContinueConsumeBody(nsresult aStatus,
+                                                uint32_t aResultLength,
+                                                uint8_t* aResult)
+{
+  AssertIsOnTargetThread();
+  // Just a precaution to ensure ContinueConsumeBody is not called out of
+  // sync with a body read.
+  MOZ_ASSERT(mBody->BodyUsed());
+  MOZ_ASSERT(!mReadDone);
+#ifdef DEBUG
+  mReadDone = true;
+#endif
+
+  auto autoFree = mozilla::MakeScopeExit([&] {
+    free(aResult);
+  });
+
+  MOZ_ASSERT(mConsumePromise);
+  RefPtr<Promise> localPromise = mConsumePromise.forget();
+
+  RefPtr<FetchBodyConsumer<Derived>> self = this;
+  auto autoReleaseObject = mozilla::MakeScopeExit([&] {
+    self->ReleaseObject();
+  });
+
+  if (NS_WARN_IF(NS_FAILED(aStatus))) {
+    localPromise->MaybeReject(NS_ERROR_DOM_ABORT_ERR);
+
+    // If binding aborted, cancel the pump. We can't assert mConsumeBodyPump.
+    // In the (admittedly rare) situation that BeginConsumeBodyMainThread()
+    // context switches out, and the worker thread gets canceled before the
+    // pump is setup, mConsumeBodyPump will be null.
+    // We've to use the !! form since non-main thread pointer access on
+    // a nsMainThreadPtrHandle is not permitted.
+    if (aStatus == NS_BINDING_ABORTED && !!mConsumeBodyPump) {
+      if (NS_IsMainThread()) {
+        CancelPump();
+      } else {
+        MOZ_ASSERT(mWorkerPrivate);
+        // In case of worker thread, we block the worker while the request is
+        // canceled on the main thread. This ensures that OnStreamComplete has
+        // a valid FetchBody around to call CancelPump and we don't release the
+        // FetchBody on the main thread.
+        RefPtr<CancelPumpRunnable<Derived>> r =
+          new CancelPumpRunnable<Derived>(this);
+        ErrorResult rv;
+        r->Dispatch(Terminating, rv);
+        if (rv.Failed()) {
+          NS_WARNING("Could not dispatch CancelPumpRunnable. Nothing we can do here");
+          // None of our callers are callled directly from JS, so there is no
+          // point in trying to propagate this failure out of here.  And
+          // localPromise is already rejected.  Just suppress the failure.
+          rv.SuppressException();
+        }
+      }
+    }
+  }
+
+  // Release the pump and then early exit if there was an error.
+  // Uses NS_ProxyRelease internally, so this is safe.
+  mConsumeBodyPump = nullptr;
+
+  // Don't warn here since we warned above.
+  if (NS_FAILED(aStatus)) {
+    return;
+  }
+
+  // Finish successfully consuming body according to type.
+  MOZ_ASSERT(aResult);
+
+  AutoJSAPI jsapi;
+  if (!jsapi.Init(mBody->DerivedClass()->GetParentObject())) {
+    localPromise->MaybeReject(NS_ERROR_UNEXPECTED);
+    return;
+  }
+
+  JSContext* cx = jsapi.cx();
+  ErrorResult error;
+
+  switch (mConsumeType) {
+    case CONSUME_ARRAYBUFFER: {
+      JS::Rooted<JSObject*> arrayBuffer(cx);
+      BodyUtil::ConsumeArrayBuffer(cx, &arrayBuffer, aResultLength, aResult,
+                                   error);
+
+      if (!error.Failed()) {
+        JS::Rooted<JS::Value> val(cx);
+        val.setObjectOrNull(arrayBuffer);
+
+        localPromise->MaybeResolve(cx, val);
+        // ArrayBuffer takes over ownership.
+        aResult = nullptr;
+      }
+      break;
+    }
+    case CONSUME_BLOB: {
+      MOZ_CRASH("This should not happen.");
+      break;
+    }
+    case CONSUME_FORMDATA: {
+      nsCString data;
+      data.Adopt(reinterpret_cast<char*>(aResult), aResultLength);
+      aResult = nullptr;
+
+      RefPtr<dom::FormData> fd =
+        BodyUtil::ConsumeFormData(mBody->DerivedClass()->GetParentObject(),
+                                  mBody->MimeType(), data, error);
+      if (!error.Failed()) {
+        localPromise->MaybeResolve(fd);
+      }
+      break;
+    }
+    case CONSUME_TEXT:
+      // fall through handles early exit.
+    case CONSUME_JSON: {
+      nsString decoded;
+      if (NS_SUCCEEDED(BodyUtil::ConsumeText(aResultLength, aResult, decoded))) {
+        if (mConsumeType == CONSUME_TEXT) {
+          localPromise->MaybeResolve(decoded);
+        } else {
+          JS::Rooted<JS::Value> json(cx);
+          BodyUtil::ConsumeJson(cx, &json, decoded, error);
+          if (!error.Failed()) {
+            localPromise->MaybeResolve(cx, json);
+          }
+        }
+      };
+      break;
+    }
+    default:
+      NS_NOTREACHED("Unexpected consume body type");
+  }
+
+  error.WouldReportJSException();
+  if (error.Failed()) {
+    localPromise->MaybeReject(error);
+  }
+}
+
+template <class Derived>
+void
+FetchBodyConsumer<Derived>::ContinueConsumeBlobBody(BlobImpl* aBlobImpl)
+{
+  AssertIsOnTargetThread();
+  // Just a precaution to ensure ContinueConsumeBody is not called out of
+  // sync with a body read.
+  MOZ_ASSERT(mBody->BodyUsed());
+  MOZ_ASSERT(!mReadDone);
+  MOZ_ASSERT(mConsumeType == CONSUME_BLOB);
+#ifdef DEBUG
+  mReadDone = true;
+#endif
+
+  MOZ_ASSERT(mConsumePromise);
+  RefPtr<Promise> localPromise = mConsumePromise.forget();
+
+  // Release the pump and then early exit if there was an error.
+  // Uses NS_ProxyRelease internally, so this is safe.
+  mConsumeBodyPump = nullptr;
+
+  RefPtr<dom::Blob> blob =
+    dom::Blob::Create(mBody->DerivedClass()->GetParentObject(), aBlobImpl);
+  MOZ_ASSERT(blob);
+
+  localPromise->MaybeResolve(blob);
+
+  ReleaseObject();
+}
+
+template <class Derived>
+void
+FetchBodyConsumer<Derived>::CancelPump()
+{
+  AssertIsOnMainThread();
+  MOZ_ASSERT(mConsumeBodyPump);
+  mConsumeBodyPump->Cancel(NS_BINDING_ABORTED);
+}
+
 } // namespace dom
 } // namespace mozilla
--- a/dom/fetch/FetchConsumer.h
+++ b/dom/fetch/FetchConsumer.h
@@ -2,65 +2,109 @@
 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef mozilla_dom_FetchConsumer_h
 #define mozilla_dom_FetchConsumer_h
 
+#include "Fetch.h"
+
 class nsIThread;
 
 namespace mozilla {
 namespace dom {
 
+class Promise;
+
 namespace workers {
 class WorkerPrivate;
 class WorkerHolder;
 }
 
 template <class Derived> class FetchBody;
 
 // FetchBody is not thread-safe but we need to move it around threads.
 // In order to keep it alive all the time, we use a WorkerHolder, if created on
 // workers, plus a this consumer.
 template <class Derived>
 class FetchBodyConsumer final
 {
 public:
   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(FetchBodyConsumer<Derived>)
 
-  static already_AddRefed<FetchBodyConsumer<Derived>>
-  Create(FetchBody<Derived>* aBody);
+  static already_AddRefed<Promise>
+  Create(nsIGlobalObject* aGlobal,
+         nsIEventTarget* aMainThreadEventTarget,
+         FetchBody<Derived>* aBody,
+         FetchConsumeType aType,
+         ErrorResult& aRv);
 
   void
   ReleaseObject();
 
   FetchBody<Derived>*
   Body() const
   {
     return mBody;
   }
 
+  void
+  BeginConsumeBodyMainThread();
+
+  void
+  ContinueConsumeBody(nsresult aStatus, uint32_t aLength, uint8_t* aResult);
+
+  void
+  ContinueConsumeBlobBody(BlobImpl* aBlobImpl);
+
+  void
+  CancelPump();
+
+  workers::WorkerPrivate*
+  GetWorkerPrivate() const
+  {
+    return mWorkerPrivate;
+  }
+
 private:
-  explicit FetchBodyConsumer(FetchBody<Derived>* aBody);
+  FetchBodyConsumer(nsIEventTarget* aMainThreadEventTarget,
+                    workers::WorkerPrivate* aWorkerPrivate,
+                    FetchBody<Derived>* aBody,
+                    Promise* aPromise,
+                    FetchConsumeType aType);
 
   ~FetchBodyConsumer();
 
   void
   AssertIsOnTargetThread() const;
 
   bool
   RegisterWorkerHolder(workers::WorkerPrivate* aWorkerPrivate);
 
   nsCOMPtr<nsIThread> mTargetThread;
+  nsCOMPtr<nsIEventTarget> mMainThreadEventTarget;
   RefPtr<FetchBody<Derived>> mBody;
 
   // Set when consuming the body is attempted on a worker.
   // Unset when consumption is done/aborted.
   // This WorkerHolder keeps alive the consumer via a cycle.
   UniquePtr<workers::WorkerHolder> mWorkerHolder;
+
+  // Always set whenever the FetchBodyConsumer is created on the worker thread.
+  workers::WorkerPrivate* mWorkerPrivate;
+
+  nsMainThreadPtrHandle<nsIInputStreamPump> mConsumeBodyPump;
+
+  // Only ever set once, always on target thread.
+  FetchConsumeType mConsumeType;
+  RefPtr<Promise> mConsumePromise;
+
+#ifdef DEBUG
+  bool mReadDone;
+#endif
 };
 
 } // namespace dom
 } // namespace mozilla
 
 #endif // mozilla_dom_FetchConsumer_h
--- a/dom/workers/ServiceWorkerScriptCache.cpp
+++ b/dom/workers/ServiceWorkerScriptCache.cpp
@@ -14,16 +14,17 @@
 #include "mozilla/dom/ScriptLoader.h"
 #include "mozilla/ipc/BackgroundUtils.h"
 #include "mozilla/ipc/PBackgroundSharedTypes.h"
 #include "nsICacheInfoChannel.h"
 #include "nsIHttpChannelInternal.h"
 #include "nsIStreamLoader.h"
 #include "nsIThreadRetargetableRequest.h"
 
+#include "nsIInputStreamPump.h"
 #include "nsIPrincipal.h"
 #include "nsIScriptError.h"
 #include "nsIScriptSecurityManager.h"
 #include "nsContentUtils.h"
 #include "nsNetUtil.h"
 #include "ServiceWorkerManager.h"
 #include "Workers.h"
 #include "nsStringStream.h"
--- a/dom/worklet/Worklet.cpp
+++ b/dom/worklet/Worklet.cpp
@@ -11,16 +11,17 @@
 #include "mozilla/dom/WorkletBinding.h"
 #include "mozilla/dom/BlobBinding.h"
 #include "mozilla/dom/Fetch.h"
 #include "mozilla/dom/PromiseNativeHandler.h"
 #include "mozilla/dom/RegisterWorkletBindings.h"
 #include "mozilla/dom/Response.h"
 #include "mozilla/dom/ScriptSettings.h"
 #include "mozilla/dom/ScriptLoader.h"
+#include "nsIInputStreamPump.h"
 #include "nsIThreadRetargetableRequest.h"
 #include "nsNetUtil.h"
 #include "xpcprivate.h"
 
 namespace mozilla {
 namespace dom {
 
 // ---------------------------------------------------------------------------