Bug 1734873 - Fetch/Response/Blob ReadableStream integration r=smaug
authorMatthew Gaudet <mgaudet@mozilla.com>
Fri, 14 Jan 2022 21:09:20 +0000
changeset 604604 6e94875e1ef32abb08f9a66bd43dc06b0f8f979d
parent 604603 33580fdf015f42a1dcae0ba2b223569ed684b5a7
child 604605 f83f8d048441ce3c353c06f988bf63843f14ee27
push id39152
push userabutkovits@mozilla.com
push dateSat, 15 Jan 2022 09:45:36 +0000
treeherdermozilla-central@60998033086a [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerssmaug
bugs1734873
milestone98.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1734873 - Fetch/Response/Blob ReadableStream integration r=smaug Differential Revision: https://phabricator.services.mozilla.com/D131550
dom/base/BodyStream.cpp
dom/base/BodyStream.h
dom/bindings/Bindings.conf
dom/fetch/Fetch.cpp
dom/fetch/Fetch.h
dom/fetch/FetchStreamReader.cpp
dom/fetch/FetchStreamReader.h
dom/fetch/Request.cpp
dom/fetch/Response.cpp
dom/file/Blob.cpp
dom/file/Blob.h
dom/streams/ByteStreamHelpers.cpp
dom/streams/NativeUnderlyingSource.h
dom/streams/ReadableByteStreamController.cpp
dom/streams/ReadableByteStreamController.h
dom/streams/ReadableStream.cpp
dom/streams/ReadableStream.h
dom/streams/ReadableStreamDefaultController.cpp
dom/streams/ReadableStreamDefaultController.h
dom/streams/ReadableStreamDefaultReader.cpp
dom/streams/ReadableStreamDefaultReader.h
dom/streams/UnderlyingSourceCallbackHelpers.cpp
dom/streams/UnderlyingSourceCallbackHelpers.h
dom/streams/moz.build
dom/streams/test/xpcshell/fetch.js
dom/streams/test/xpcshell/response.js
dom/streams/test/xpcshell/xpcshell.ini
--- a/dom/base/BodyStream.cpp
+++ b/dom/base/BodyStream.cpp
@@ -3,16 +3,21 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "BodyStream.h"
 #include "mozilla/CycleCollectedJSContext.h"
 #include "mozilla/dom/AutoEntryScript.h"
 #include "mozilla/dom/DOMException.h"
+#ifdef MOZ_DOM_STREAMS
+#  include "mozilla/dom/NativeUnderlyingSource.h"
+#  include "mozilla/dom/ReadableStream.h"
+#  include "mozilla/dom/ReadableByteStreamController.h"
+#endif
 #include "mozilla/dom/ScriptSettings.h"
 #include "mozilla/dom/WorkerCommon.h"
 #include "mozilla/dom/WorkerPrivate.h"
 #include "mozilla/dom/WorkerRunnable.h"
 #include "mozilla/Maybe.h"
 #include "mozilla/ScopeExit.h"
 #include "mozilla/Unused.h"
 #include "nsProxyRelease.h"
@@ -45,20 +50,22 @@ NS_INTERFACE_MAP_END
 BodyStreamHolder::BodyStreamHolder() : mBodyStream(nullptr) {}
 
 void BodyStreamHolder::StoreBodyStream(BodyStream* aBodyStream) {
   MOZ_ASSERT(aBodyStream);
   MOZ_ASSERT(!mBodyStream);
   mBodyStream = aBodyStream;
 }
 
+#ifndef MOZ_DOM_STREAMS
 void BodyStreamHolder::ForgetBodyStream() {
   MOZ_ASSERT_IF(mStreamCreated, mBodyStream);
   mBodyStream = nullptr;
 }
+#endif
 
 // BodyStream
 // ---------------------------------------------------------------------------
 
 class BodyStream::WorkerShutdown final : public WorkerControlRunnable {
  public:
   WorkerShutdown(WorkerPrivate* aWorkerPrivate, RefPtr<BodyStream> aStream)
       : WorkerControlRunnable(aWorkerPrivate, WorkerThreadUnchangedBusyCount),
@@ -122,49 +129,133 @@ void BodyStream::Create(JSContext* aCx, 
     }
 
     // Note, this will create a ref-cycle between the holder and the stream.
     // The cycle is broken when the stream is closed or the worker begins
     // shutting down.
     stream->mWorkerRef = std::move(workerRef);
   }
 
+#ifdef MOZ_DOM_STREAMS
+  RefPtr<ReadableStream> body =
+      ReadableStream::Create(aCx, aGlobal, aStreamHolder, aRv);
+  if (aRv.Failed()) {
+    return;
+  }
+#else
   aRv.MightThrowJSException();
   JS::Rooted<JSObject*> body(aCx, JS::NewReadableExternalSourceStreamObject(
                                       aCx, stream, aStreamHolder));
   if (!body) {
     aRv.StealExceptionFromJSContext(aCx);
     return;
   }
 
   // This will be released in BodyStream::FinalizeCallback().  We are
   // guaranteed the jsapi will call FinalizeCallback when ReadableStream
   // js object is finalized.
   NS_ADDREF(stream.get());
+#endif
 
   cleanup.release();
 
   aStreamHolder->StoreBodyStream(stream);
   aStreamHolder->SetReadableStreamBody(body);
 
 #ifdef DEBUG
   aStreamHolder->mStreamCreated = true;
 #endif
 }
 
+#ifdef MOZ_DOM_STREAMS
+// UnderlyingSource.pull, implemented for BodyStream.
+already_AddRefed<Promise> BodyStream::PullCallback(
+    JSContext* aCx, ReadableStreamController& aController, ErrorResult& aRv) {
+  MOZ_ASSERT(aController.IsByte());
+  ReadableStream* stream = aController.AsByte()->Stream();
+  MOZ_ASSERT(stream);
+
+#  if MOZ_DIAGNOSTIC_ASSERT_ENABLED
+  MOZ_DIAGNOSTIC_ASSERT(stream->Disturbed());
+#  endif
+
+  AssertIsOnOwningThread();
+
+  MutexAutoLock lock(mMutex);
+
+  MOZ_DIAGNOSTIC_ASSERT(mState == eInitializing || mState == eWaiting ||
+                        mState == eChecking || mState == eReading);
+
+  RefPtr<Promise> resolvedWithUndefinedPromise =
+      Promise::CreateResolvedWithUndefined(aController.GetParentObject(), aRv);
+  if (aRv.Failed()) {
+    return nullptr;
+  }
+
+  if (mState == eReading) {
+    // We are already reading data.
+    return resolvedWithUndefinedPromise.forget();
+  }
+
+  if (mState == eChecking) {
+    // If we are looking for more data, there is nothing else we should do:
+    // let's move this checking operation in a reading.
+    MOZ_ASSERT(mInputStream);
+    mState = eReading;
+
+    return resolvedWithUndefinedPromise.forget();
+  }
+
+  if (mState == eInitializing) {
+    // The stream has been used for the first time.
+    mStreamHolder->MarkAsRead();
+  }
+
+  mState = eReading;
+
+  if (!mInputStream) {
+    // This is the first use of the stream. Let's convert the
+    // mOriginalInputStream into an nsIAsyncInputStream.
+    MOZ_ASSERT(mOriginalInputStream);
+
+    nsCOMPtr<nsIAsyncInputStream> asyncStream;
+    nsresult rv = NS_MakeAsyncNonBlockingInputStream(
+        mOriginalInputStream.forget(), getter_AddRefs(asyncStream));
+    if (NS_WARN_IF(NS_FAILED(rv))) {
+      ErrorPropagation(aCx, lock, stream, rv);
+      return nullptr;
+    }
+
+    mInputStream = asyncStream;
+    mOriginalInputStream = nullptr;
+  }
+
+  MOZ_DIAGNOSTIC_ASSERT(mInputStream);
+  MOZ_DIAGNOSTIC_ASSERT(!mOriginalInputStream);
+
+  nsresult rv = mInputStream->AsyncWait(this, 0, 0, mOwningEventTarget);
+  if (NS_WARN_IF(NS_FAILED(rv))) {
+    ErrorPropagation(aCx, lock, stream, rv);
+    return nullptr;
+  }
+
+  // All good.
+  return resolvedWithUndefinedPromise.forget();
+}
+#else
 void BodyStream::requestData(JSContext* aCx, JS::HandleObject aStream,
                              size_t aDesiredSize) {
-#if MOZ_DIAGNOSTIC_ASSERT_ENABLED
+#  if MOZ_DIAGNOSTIC_ASSERT_ENABLED
   bool disturbed;
   if (!JS::ReadableStreamIsDisturbed(aCx, aStream, &disturbed)) {
     JS_ClearPendingException(aCx);
   } else {
     MOZ_DIAGNOSTIC_ASSERT(disturbed);
   }
-#endif
+#  endif
 
   AssertIsOnOwningThread();
 
   MutexAutoLock lock(mMutex);
 
   MOZ_DIAGNOSTIC_ASSERT(mState == eInitializing || mState == eWaiting ||
                         mState == eChecking || mState == eReading);
 
@@ -211,21 +302,29 @@ void BodyStream::requestData(JSContext* 
   nsresult rv = mInputStream->AsyncWait(this, 0, 0, mOwningEventTarget);
   if (NS_WARN_IF(NS_FAILED(rv))) {
     ErrorPropagation(aCx, lock, aStream, rv);
     return;
   }
 
   // All good.
 }
+#endif
 
+#ifdef MOZ_DOM_STREAMS
+void BodyStream::WriteIntoReadRequestBuffer(JSContext* aCx,
+                                            ReadableStream* aStream,
+                                            void* aBuffer, size_t aLength,
+                                            size_t* aByteWritten) {
+#else
 void BodyStream::writeIntoReadRequestBuffer(JSContext* aCx,
                                             JS::HandleObject aStream,
                                             void* aBuffer, size_t aLength,
                                             size_t* aByteWritten) {
+#endif
   MOZ_DIAGNOSTIC_ASSERT(aBuffer);
   MOZ_DIAGNOSTIC_ASSERT(aByteWritten);
 
   AssertIsOnOwningThread();
 
   MutexAutoLock lock(mMutex);
 
   MOZ_DIAGNOSTIC_ASSERT(mInputStream);
@@ -251,16 +350,50 @@ void BodyStream::writeIntoReadRequestBuf
   if (NS_WARN_IF(NS_FAILED(rv))) {
     ErrorPropagation(aCx, lock, aStream, rv);
     return;
   }
 
   // All good.
 }
 
+#ifdef MOZ_DOM_STREAMS
+// UnderlyingSource.cancel callback, implmented for BodyStream.
+already_AddRefed<Promise> BodyStream::CancelCallback(
+    JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
+    ErrorResult& aRv) {
+  AssertIsOnOwningThread();
+
+  if (mState == eInitializing) {
+    // The stream has been used for the first time.
+    mStreamHolder->MarkAsRead();
+  }
+
+  if (mInputStream) {
+    mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
+  }
+
+  // It could be that we don't have mInputStream yet, but we still have the
+  // original stream. We need to close that too.
+  if (mOriginalInputStream) {
+    MOZ_ASSERT(!mInputStream);
+    mOriginalInputStream->Close();
+  }
+
+  RefPtr<Promise> promise = Promise::CreateResolvedWithUndefined(mGlobal, aRv);
+  if (aRv.Failed()) {
+    return nullptr;
+  }
+
+  // Must come after all uses of members!
+  ReleaseObjects();
+
+  return promise.forget();
+}
+#else
 JS::Value BodyStream::cancel(JSContext* aCx, JS::HandleObject aStream,
                              JS::HandleValue aReason) {
   AssertIsOnOwningThread();
 
   if (mState == eInitializing) {
     // The stream has been used for the first time.
     mStreamHolder->MarkAsRead();
   }
@@ -276,60 +409,86 @@ JS::Value BodyStream::cancel(JSContext* 
     mOriginalInputStream->Close();
   }
 
   ReleaseObjects();
   return JS::UndefinedValue();
 }
 
 void BodyStream::onClosed(JSContext* aCx, JS::HandleObject aStream) {}
+#endif
 
+#ifdef MOZ_DOM_STREAMS
+// Non-standard UnderlyingSource.error callback.
+void BodyStream::ErrorCallback() {
+  AssertIsOnOwningThread();
+
+  if (mState == eInitializing) {
+    // The stream has been used for the first time.
+    mStreamHolder->MarkAsRead();
+  }
+
+  if (mInputStream) {
+    mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
+  }
+
+  ReleaseObjects();
+}
+#else
 void BodyStream::onErrored(JSContext* aCx, JS::HandleObject aStream,
                            JS::HandleValue aReason) {
   AssertIsOnOwningThread();
 
   if (mState == eInitializing) {
     // The stream has been used for the first time.
     mStreamHolder->MarkAsRead();
   }
 
   if (mInputStream) {
     mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
   }
 
   ReleaseObjects();
 }
 
+#endif
+
+#ifndef MOZ_DOM_STREAMS
 void BodyStream::finalize() {
   // This can be called in any thread.
 
   // This takes ownership of the ref created in BodyStream::Create().
   RefPtr<BodyStream> stream = dont_AddRef(this);
 
   stream->ReleaseObjects();
 }
+#endif
 
 BodyStream::BodyStream(nsIGlobalObject* aGlobal,
                        BodyStreamHolder* aStreamHolder,
                        nsIInputStream* aInputStream)
     : mMutex("BodyStream::mMutex"),
       mState(eInitializing),
       mGlobal(aGlobal),
       mStreamHolder(aStreamHolder),
       mOwningEventTarget(aGlobal->EventTargetFor(TaskCategory::Other)),
       mOriginalInputStream(aInputStream) {
   MOZ_DIAGNOSTIC_ASSERT(aInputStream);
   MOZ_DIAGNOSTIC_ASSERT(aStreamHolder);
 }
 
-BodyStream::~BodyStream() = default;
-
+#ifdef MOZ_DOM_STREAMS
+void BodyStream::ErrorPropagation(JSContext* aCx,
+                                  const MutexAutoLock& aProofOfLock,
+                                  ReadableStream* aStream, nsresult aError) {
+#else
 void BodyStream::ErrorPropagation(JSContext* aCx,
                                   const MutexAutoLock& aProofOfLock,
                                   JS::HandleObject aStream, nsresult aError) {
+#endif
   AssertIsOnOwningThread();
 
   // Nothing to do.
   if (mState == eClosed) {
     return;
   }
 
   // Let's close the stream.
@@ -345,22 +504,72 @@ void BodyStream::ErrorPropagation(JSCont
   rv.ThrowTypeError("Error in body stream");
 
   JS::Rooted<JS::Value> errorValue(aCx);
   bool ok = ToJSValue(aCx, std::move(rv), &errorValue);
   MOZ_RELEASE_ASSERT(ok, "ToJSValue never fails for ErrorResult");
 
   {
     MutexAutoUnlock unlock(mMutex);
+#ifdef MOZ_DOM_STREAMS
+    // Don't re-error an already errored stream.
+    if (aStream->State() == ReadableStream::ReaderState::Readable) {
+      IgnoredErrorResult rv;
+      ReadableStreamError(aCx, aStream, errorValue, rv);
+      NS_WARNING_ASSERTION(!rv.Failed(), "Failed to error BodyStream");
+    }
+#else
     JS::ReadableStreamError(aCx, aStream, errorValue);
+#endif
   }
 
   ReleaseObjects(aProofOfLock);
 }
 
+#ifdef MOZ_DOM_STREAMS
+//
+void BodyStream::EnqueueChunkWithSizeIntoStream(JSContext* aCx,
+                                                ReadableStream* aStream,
+                                                uint64_t aAvailableData,
+                                                ErrorResult& aRv) {
+  // Create Chunk
+  aRv.MightThrowJSException();
+  JS::RootedObject chunk(aCx, JS_NewUint8Array(aCx, aAvailableData));
+  if (!chunk) {
+    aRv.StealExceptionFromJSContext(aCx);
+    return;
+  }
+
+  size_t bytesWritten = 0;
+  size_t unusedData = 0;
+  {
+    // JS::AutoSuppressGCAnalysis suppressGC(aCx);
+    JS::AutoCheckCannotGC noGC;
+    bool dummy;
+    void* buffer = JS_GetArrayBufferViewData(chunk, &dummy, noGC);
+    WriteIntoReadRequestBuffer(aCx, aStream, buffer, aAvailableData,
+                               &bytesWritten);
+
+    unusedData = aAvailableData - bytesWritten;
+  }
+
+  MOZ_ASSERT(aStream->Controller()->IsByte());
+  ReadableByteStreamController* byteStreamController =
+      aStream->Controller()->AsByte();
+
+  ReadableByteStreamControllerEnqueue(aCx, byteStreamController, chunk, aRv);
+  if (aRv.Failed()) {
+    return;
+  }
+
+  // Explicit cast to avoid narrowing warning.
+  byteStreamController->SetQueueTotalSize((double)unusedData);
+}
+#endif
+
 NS_IMETHODIMP
 BodyStream::OnInputStreamReady(nsIAsyncInputStream* aStream) {
   AssertIsOnOwningThread();
   MOZ_DIAGNOSTIC_ASSERT(aStream);
 
   // Acquire |mMutex| in order to safely inspect |mState| and use |mGlobal|.
   Maybe<MutexAutoLock> lock;
   lock.emplace(mMutex);
@@ -376,23 +585,30 @@ BodyStream::OnInputStreamReady(nsIAsyncI
   // because its destructor would reacquire |mMutex| before these objects'
   // destructors run.)
   nsAutoMicroTask mt;
   AutoEntryScript aes(mGlobal, "fetch body data available");
 
   MOZ_DIAGNOSTIC_ASSERT(mInputStream);
   MOZ_DIAGNOSTIC_ASSERT(mState == eReading || mState == eChecking);
 
+  JSContext* cx = aes.cx();
+#ifdef MOZ_DOM_STREAMS
+  ReadableStream* stream = mStreamHolder->GetReadableStreamBody();
+  if (!stream) {
+    return NS_ERROR_FAILURE;
+  }
+#else
   JSObject* streamObj = mStreamHolder->GetReadableStreamBody();
   if (!streamObj) {
     return NS_ERROR_FAILURE;
   }
 
-  JSContext* cx = aes.cx();
   JS::Rooted<JSObject*> stream(cx, streamObj);
+#endif
 
   uint64_t size = 0;
   nsresult rv = mInputStream->Available(&size);
   if (NS_SUCCEEDED(rv) && size == 0) {
     // In theory this should not happen. If size is 0, the stream should be
     // considered closed.
     rv = NS_BASE_STREAM_CLOSED;
   }
@@ -410,25 +626,60 @@ BodyStream::OnInputStreamReady(nsIAsyncI
   }
 
   mState = eWriting;
 
   // Release the mutex before the call below (which could execute JS), as well
   // as before the microtask checkpoint queued up above occurs.
   lock.reset();
 
+#ifdef MOZ_DOM_STREAMS
+  ErrorResult errorResult;
+  EnqueueChunkWithSizeIntoStream(cx, stream, size, errorResult);
+  errorResult.WouldReportJSException();
+  if (errorResult.Failed()) {
+    lock.emplace(mMutex);
+    ErrorPropagation(cx, *lock, stream, errorResult.StealNSResult());
+    return NS_OK;
+  }
+#else
   Unused << JS::ReadableStreamUpdateDataAvailableFromSource(cx, stream, size);
+#endif
 
-  // The previous call can execute JS (even up to running a nested event loop),
-  // so |mState| can't be asserted to have any particular value, even if the
-  // previous call succeeds.
+  // The previous call can execute JS (even up to running a nested event
+  // loop), so |mState| can't be asserted to have any particular value, even
+  // if the previous call succeeds.
 
   return NS_OK;
 }
 
+#ifdef MOZ_DOM_STREAMS
+/* static */
+nsresult BodyStream::RetrieveInputStream(BodyStreamHolder* aStream,
+                                         nsIInputStream** aInputStream) {
+  MOZ_ASSERT(aStream);
+  MOZ_ASSERT(aInputStream);
+  BodyStream* stream = aStream->GetBodyStream();
+  if (NS_WARN_IF(!stream)) {
+    return NS_ERROR_DOM_INVALID_STATE_ERR;
+  }
+
+  stream->AssertIsOnOwningThread();
+
+  // if mOriginalInputStream is null, the reading already started. We don't want
+  // to expose the internal inputStream.
+  if (NS_WARN_IF(!stream->mOriginalInputStream)) {
+    return NS_ERROR_DOM_INVALID_STATE_ERR;
+  }
+
+  nsCOMPtr<nsIInputStream> inputStream = stream->mOriginalInputStream;
+  inputStream.forget(aInputStream);
+  return NS_OK;
+}
+#else
 /* static */
 nsresult BodyStream::RetrieveInputStream(
     JS::ReadableStreamUnderlyingSource* aUnderlyingReadableStreamSource,
     nsIInputStream** aInputStream) {
   MOZ_ASSERT(aUnderlyingReadableStreamSource);
   MOZ_ASSERT(aInputStream);
 
   RefPtr<BodyStream> stream =
@@ -440,42 +691,70 @@ nsresult BodyStream::RetrieveInputStream
   if (NS_WARN_IF(!stream->mOriginalInputStream)) {
     return NS_ERROR_DOM_INVALID_STATE_ERR;
   }
 
   nsCOMPtr<nsIInputStream> inputStream = stream->mOriginalInputStream;
   inputStream.forget(aInputStream);
   return NS_OK;
 }
+#endif
 
 void BodyStream::Close() {
   AssertIsOnOwningThread();
 
   MutexAutoLock lock(mMutex);
 
   if (mState == eClosed) {
     return;
   }
 
   AutoJSAPI jsapi;
   if (NS_WARN_IF(!jsapi.Init(mGlobal))) {
     ReleaseObjects(lock);
     return;
   }
-
+#ifdef MOZ_DOM_STREAMS
+  ReadableStream* stream = mStreamHolder->GetReadableStreamBody();
+  if (stream) {
+    JSContext* cx = jsapi.cx();
+    CloseAndReleaseObjects(cx, lock, stream);
+  } else {
+    ReleaseObjects(lock);
+  }
+#else
   JSObject* streamObj = mStreamHolder->GetReadableStreamBody();
   if (streamObj) {
     JSContext* cx = jsapi.cx();
     JS::Rooted<JSObject*> stream(cx, streamObj);
     CloseAndReleaseObjects(cx, lock, stream);
   } else {
     ReleaseObjects(lock);
   }
+#endif
 }
 
+#ifdef MOZ_DOM_STREAMS
+void BodyStream::CloseAndReleaseObjects(JSContext* aCx,
+                                        const MutexAutoLock& aProofOfLock,
+                                        ReadableStream* aStream) {
+  AssertIsOnOwningThread();
+  MOZ_DIAGNOSTIC_ASSERT(mState != eClosed);
+
+  ReleaseObjects(aProofOfLock);
+
+  MutexAutoUnlock unlock(mMutex);
+
+  if (aStream->State() == ReadableStream::ReaderState::Readable) {
+    IgnoredErrorResult rv;
+    ReadableStreamClose(aCx, aStream, rv);
+    NS_WARNING_ASSERTION(!rv.Failed(), "Failed to Close Stream");
+  }
+}
+#else
 void BodyStream::CloseAndReleaseObjects(JSContext* aCx,
                                         const MutexAutoLock& aProofOfLock,
                                         JS::HandleObject aStream) {
   AssertIsOnOwningThread();
   MOZ_DIAGNOSTIC_ASSERT(mState != eClosed);
 
   ReleaseObjects(aProofOfLock);
 
@@ -483,16 +762,17 @@ void BodyStream::CloseAndReleaseObjects(
   bool readable;
   if (!JS::ReadableStreamIsReadable(aCx, aStream, &readable)) {
     return;
   }
   if (readable) {
     JS::ReadableStreamClose(aCx, aStream);
   }
 }
+#endif
 
 void BodyStream::ReleaseObjects() {
   MutexAutoLock lock(mMutex);
   ReleaseObjects(lock);
 }
 
 void BodyStream::ReleaseObjects(const MutexAutoLock& aProofOfLock) {
   // This method can be called on 2 possible threads: the owning one and a JS
@@ -528,26 +808,49 @@ void BodyStream::ReleaseObjects(const Mu
 
   if (NS_IsMainThread()) {
     nsCOMPtr<nsIObserverService> obs = mozilla::services::GetObserverService();
     if (obs) {
       obs->RemoveObserver(this, DOM_WINDOW_DESTROYED_TOPIC);
     }
   }
 
+#ifdef MOZ_DOM_STREAMS
+  ReadableStream* stream = mStreamHolder->GetReadableStreamBody();
+  if (stream) {
+    stream->ReleaseObjects();
+  }
+#else
   JSObject* streamObj = mStreamHolder->GetReadableStreamBody();
   if (streamObj) {
     // Let's inform the JSEngine that we are going to be released.
     JS::ReadableStreamReleaseCCObject(streamObj);
   }
+#endif
 
   mWorkerRef = nullptr;
   mGlobal = nullptr;
 
+#ifdef MOZ_DOM_STREAMS
+  // Since calling ForgetBodyStream can cause our current ref count to drop to
+  // zero, which would be bad, because this means we'd be destroying the mutex
+  // which aProofOfLock is holding; instead, we do this later by creating an
+  // event.
+  GetCurrentSerialEventTarget()->Dispatch(NS_NewCancelableRunnableFunction(
+      "BodyStream::ReleaseObjects",
+      [streamHolder = RefPtr{mStreamHolder->TakeBodyStream()}] {
+        // Intentionally left blank: The destruction of this lambda will free
+        // free the stream holder, thus releasing the bodystream.
+        //
+        // This is cancelable because if a worker cancels this, we're still fine
+        // as the lambda will be successfully destroyed.
+      }));
+#else
   mStreamHolder->ForgetBodyStream();
+#endif
   mStreamHolder->NullifyStream();
   mStreamHolder = nullptr;
 }
 
 #ifdef DEBUG
 void BodyStream::AssertIsOnOwningThread() {
   NS_ASSERT_OWNINGTHREAD(BodyStream);
 }
--- a/dom/base/BodyStream.h
+++ b/dom/base/BodyStream.h
@@ -4,16 +4,21 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef mozilla_dom_BodyStream_h
 #define mozilla_dom_BodyStream_h
 
 #include "jsapi.h"
 #include "js/Stream.h"
+#include "mozilla/AlreadyAddRefed.h"
+#ifdef MOZ_DOM_STREAMS
+#  include "mozilla/dom/BindingDeclarations.h"
+#  include "mozilla/dom/NativeUnderlyingSource.h"
+#endif
 #include "nsIAsyncInputStream.h"
 #include "nsCycleCollectionParticipant.h"
 #include "nsIObserver.h"
 #include "nsISupportsImpl.h"
 #include "nsNetCID.h"
 #include "nsWeakReference.h"
 #include "mozilla/Mutex.h"
 
@@ -23,84 +28,147 @@ class nsIInputStream;
 
 namespace mozilla {
 class ErrorResult;
 
 namespace dom {
 
 class BodyStream;
 class WeakWorkerRef;
+class ReadableStream;
+
+class BodyStreamUnderlyingSourcePullCallbackHelper;
+class BodyStreamUnderlyingSourceCancelCallbackHelper;
+class BodyStreamUnderlyingSourceErrorCallbackHelper;
 
 class BodyStreamHolder : public nsISupports {
   friend class BodyStream;
+  friend class BodyStreamUnderlyingSourcePullCallbackHelper;
+  friend class BodyStreamUnderlyingSourceCancelCallbackHelper;
+  friend class BodyStreamUnderlyingSourceErrorCallbackHelper;
 
  public:
   NS_DECL_CYCLE_COLLECTING_ISUPPORTS
   NS_DECL_CYCLE_COLLECTION_CLASS(BodyStreamHolder)
 
   BodyStreamHolder();
 
   virtual void NullifyStream() = 0;
 
   virtual void MarkAsRead() = 0;
 
+#ifndef MOZ_DOM_STREAMS
   virtual void SetReadableStreamBody(JSObject* aBody) = 0;
-
   virtual JSObject* GetReadableStreamBody() = 0;
+#else
+  virtual void SetReadableStreamBody(ReadableStream* aBody) = 0;
+  virtual ReadableStream* GetReadableStreamBody() = 0;
+#endif
 
  protected:
   virtual ~BodyStreamHolder() = default;
 
  private:
   void StoreBodyStream(BodyStream* aBodyStream);
+#ifdef MOZ_DOM_STREAMS
+  already_AddRefed<BodyStream> TakeBodyStream() {
+    MOZ_ASSERT_IF(mStreamCreated, mBodyStream);
+    return mBodyStream.forget();
+  }
+#else
   void ForgetBodyStream();
+#endif
+  BodyStream* GetBodyStream() { return mBodyStream; }
 
+#ifdef MOZ_DOM_STREAMS
+  RefPtr<BodyStream> mBodyStream;
+#else
   // Raw pointer because BodyStream keeps BodyStreamHolder alive and it
   // nullifies this stream before being released.
   BodyStream* mBodyStream;
+#endif
 
 #ifdef DEBUG
   bool mStreamCreated = false;
 #endif
 };
 
 class BodyStream final : public nsIInputStreamCallback,
                          public nsIObserver,
                          public nsSupportsWeakReference,
-                         private JS::ReadableStreamUnderlyingSource {
+#ifndef MOZ_DOM_STREAMS
+                         private JS::ReadableStreamUnderlyingSource
+#else
+                         public NativeUnderlyingSource
+#endif
+{
   friend class BodyStreamHolder;
 
  public:
   NS_DECL_THREADSAFE_ISUPPORTS
   NS_DECL_NSIINPUTSTREAMCALLBACK
   NS_DECL_NSIOBSERVER
 
   // This method creates a JS ReadableStream object and it assigns it to the
   // aStreamHolder calling SetReadableStreamBody().
   static void Create(JSContext* aCx, BodyStreamHolder* aStreamHolder,
                      nsIGlobalObject* aGlobal, nsIInputStream* aInputStream,
                      ErrorResult& aRv);
 
   void Close();
 
+#ifdef MOZ_DOM_STREAMS
+  static nsresult RetrieveInputStream(BodyStreamHolder* aStream,
+                                      nsIInputStream** aInputStream);
+#else
   static nsresult RetrieveInputStream(
       JS::ReadableStreamUnderlyingSource* aUnderlyingReadableStreamSource,
       nsIInputStream** aInputStream);
+#endif
 
  private:
   BodyStream(nsIGlobalObject* aGlobal, BodyStreamHolder* aStreamHolder,
              nsIInputStream* aInputStream);
-  ~BodyStream();
+  ~BodyStream() = default;
 
 #ifdef DEBUG
   void AssertIsOnOwningThread();
 #else
   void AssertIsOnOwningThread() {}
 #endif
 
+#ifdef MOZ_DOM_STREAMS
+ public:
+  // Cancel Callback
+  virtual already_AddRefed<Promise> CancelCallback(
+      JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
+      ErrorResult& aRv) override;
+
+  // Pull Callback
+  virtual already_AddRefed<Promise> PullCallback(
+      JSContext* aCx, ReadableStreamController& aController,
+      ErrorResult& aRv) override;
+
+  void ErrorCallback() override;
+
+ private:
+  // Fills a buffer with bytes from the stream.
+  void WriteIntoReadRequestBuffer(JSContext* aCx, ReadableStream* aStream,
+                                  void* aBuffer, size_t aLength,
+                                  size_t* aByteWritten);
+
+  void EnqueueChunkWithSizeIntoStream(JSContext* aCx, ReadableStream* aStream,
+                                      uint64_t bytes, ErrorResult& aRv);
+
+  void ErrorPropagation(JSContext* aCx, const MutexAutoLock& aProofOfLock,
+                        ReadableStream* aStream, nsresult aRv);
+
+  void CloseAndReleaseObjects(JSContext* aCx, const MutexAutoLock& aProofOfLock,
+                              ReadableStream* aStream);
+#else
   void requestData(JSContext* aCx, JS::HandleObject aStream,
                    size_t aDesiredSize) override;
 
   void writeIntoReadRequestBuffer(JSContext* aCx, JS::HandleObject aStream,
                                   void* aBuffer, size_t aLength,
                                   size_t* aBytesWritten) override;
 
   JS::Value cancel(JSContext* aCx, JS::HandleObject aStream,
@@ -112,17 +180,18 @@ class BodyStream final : public nsIInput
                  JS::HandleValue aReason) override;
 
   void finalize() override;
 
   void ErrorPropagation(JSContext* aCx, const MutexAutoLock& aProofOfLock,
                         JS::HandleObject aStream, nsresult aRv);
 
   void CloseAndReleaseObjects(JSContext* aCx, const MutexAutoLock& aProofOfLock,
-                              JS::HandleObject aSteam);
+                              JS::HandleObject aStream);
+#endif
 
   class WorkerShutdown;
 
   void ReleaseObjects(const MutexAutoLock& aProofOfLock);
 
   void ReleaseObjects();
 
   // Common methods
--- a/dom/bindings/Bindings.conf
+++ b/dom/bindings/Bindings.conf
@@ -81,16 +81,20 @@ DOMInterfaces = {
     'nativeType': 'mozilla::dom::AudioContext',
 },
 
 'BatteryManager': {
     'nativeType': 'mozilla::dom::battery::BatteryManager',
     'headerFile': 'BatteryManager.h'
 },
 
+'Blob': {
+    'implicitJSContext': [ 'stream' ],
+},
+
 'BrowsingContext': {
     'concrete': True,
 },
 
 'Cache': {
     'implicitJSContext': [ 'add', 'addAll', 'match', 'matchAll', 'put',
                            'delete', 'keys' ],
     'nativeType': 'mozilla::dom::cache::Cache',
@@ -697,17 +701,17 @@ DOMInterfaces = {
 },
 
 'ResizeObserverSize': {
     'nativeType': 'mozilla::dom::ResizeObserverSize',
     'headerFile': 'mozilla/dom/ResizeObserver.h',
 },
 
 'Response': {
-    'implicitJSContext': [ 'arrayBuffer', 'blob', 'formData', 'json', 'text',
+    'implicitJSContext': [ 'arrayBuffer', 'blob', 'body', 'formData', 'json', 'text',
                            'clone', 'cloneUnfiltered' ],
 },
 
 'RTCDataChannel': {
     'nativeType': 'nsDOMDataChannel',
 },
 
 'RTCDtlsTransport': {
--- a/dom/fetch/Fetch.cpp
+++ b/dom/fetch/Fetch.cpp
@@ -2,16 +2,17 @@
 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "Fetch.h"
 
 #include "js/Value.h"
+#include "mozilla/CycleCollectedJSContext.h"
 #include "mozilla/dom/Document.h"
 #include "mozilla/ipc/PBackgroundSharedTypes.h"
 #include "nsIGlobalObject.h"
 
 #include "nsDOMString.h"
 #include "nsJSUtils.h"
 #include "nsNetUtil.h"
 #include "nsReadableUtils.h"
@@ -47,16 +48,34 @@
 #include "mozilla/dom/WorkerRef.h"
 #include "mozilla/dom/WorkerRunnable.h"
 #include "mozilla/dom/WorkerScope.h"
 
 namespace mozilla::dom {
 
 namespace {
 
+#ifdef MOZ_DOM_STREAMS
+void AbortStream(JSContext* aCx, ReadableStream* aReadableStream,
+                 ErrorResult& aRv) {
+  if (aReadableStream->State() != ReadableStream::ReaderState::Readable) {
+    return;
+  }
+
+  RefPtr<DOMException> e = DOMException::Create(NS_ERROR_DOM_ABORT_ERR);
+  JS::Rooted<JS::Value> value(aCx);
+  if (!GetOrCreateDOMReflector(aCx, e, &value)) {
+    return;
+  }
+
+  ReadableStreamError(aCx, aReadableStream, value, aRv);
+}
+
+#else
+
 void AbortStream(JSContext* aCx, JS::Handle<JSObject*> aStream,
                  ErrorResult& aRv) {
   aRv.MightThrowJSException();
 
   bool isReadable;
   if (!JS::ReadableStreamIsReadable(aCx, aStream, &isReadable)) {
     aRv.StealExceptionFromJSContext(aCx);
     return;
@@ -71,16 +90,17 @@ void AbortStream(JSContext* aCx, JS::Han
   if (!GetOrCreateDOMReflector(aCx, e, &value)) {
     return;
   }
 
   if (!JS::ReadableStreamError(aCx, aStream, value)) {
     aRv.StealExceptionFromJSContext(aCx);
   }
 }
+#endif
 
 }  // namespace
 
 class AbortSignalMainThread final : public AbortSignalImpl {
  public:
   NS_DECL_CYCLE_COLLECTING_ISUPPORTS
   NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(AbortSignalMainThread)
 
@@ -1133,16 +1153,19 @@ template FetchBody<Response>::~FetchBody
 template <class Derived>
 bool FetchBody<Derived>::GetBodyUsed(ErrorResult& aRv) const {
   if (mBodyUsed) {
     return true;
   }
 
   // If this stream is disturbed, return true.
   if (mReadableStreamBody) {
+#ifdef MOZ_DOM_STREAMS
+    return mReadableStreamBody->Disturbed();
+#else
     aRv.MightThrowJSException();
 
     AutoJSAPI jsapi;
     if (!jsapi.Init(mOwner)) {
       aRv.Throw(NS_ERROR_FAILURE);
       return true;
     }
 
@@ -1150,16 +1173,17 @@ bool FetchBody<Derived>::GetBodyUsed(Err
     JS::Rooted<JSObject*> body(cx, mReadableStreamBody);
     bool disturbed;
     if (!JS::ReadableStreamIsDisturbed(cx, body, &disturbed)) {
       aRv.StealExceptionFromJSContext(cx);
       return false;
     }
 
     return disturbed;
+#endif
   }
 
   return false;
 }
 
 template bool FetchBody<Request>::GetBodyUsed(ErrorResult&) const;
 
 template bool FetchBody<Response>::GetBodyUsed(ErrorResult&) const;
@@ -1185,21 +1209,21 @@ void FetchBody<Derived>::SetBodyUsed(JSC
   MOZ_ASSERT(mOwner->EventTargetFor(TaskCategory::Other)->IsOnCurrentThread());
 
   if (mBodyUsed) {
     return;
   }
 
   mBodyUsed = true;
 
+#ifndef MOZ_DOM_STREAMS
   // If we already have a ReadableStreamBody and it has been created by DOM, we
   // have to lock it now because it can have been shared with other objects.
   if (mReadableStreamBody) {
     aRv.MightThrowJSException();
-
     JSAutoRealm ar(aCx, mOwner->GetGlobalJSObject());
 
     JS::Rooted<JSObject*> readableStreamObj(aCx, mReadableStreamBody);
 
     JS::ReadableStreamMode mode;
     if (!JS::ReadableStreamGetMode(aCx, readableStreamObj, &mode)) {
       aRv.StealExceptionFromJSContext(aCx);
       return;
@@ -1218,16 +1242,39 @@ void FetchBody<Derived>::SetBodyUsed(JSC
       mFetchStreamReader->StartConsuming(aCx, readableStreamObj, &reader, aRv);
       if (NS_WARN_IF(aRv.Failed())) {
         return;
       }
 
       mReadableStreamReader = reader;
     }
   }
+#else
+  // If we already have a ReadableStreamBody and it has been created by DOM, we
+  // have to lock it now because it can have been shared with other objects.
+  if (mReadableStreamBody) {
+    if (mReadableStreamBody->HasNativeUnderlyingSource()) {
+      LockStream(aCx, mReadableStreamBody, aRv);
+      if (NS_WARN_IF(aRv.Failed())) {
+        return;
+      }
+    } else {
+      MOZ_ASSERT(mFetchStreamReader);
+      //  Let's activate the FetchStreamReader.
+      RefPtr<ReadableStreamDefaultReader> reader;
+      mFetchStreamReader->StartConsuming(aCx, mReadableStreamBody,
+                                         getter_AddRefs(reader), aRv);
+      if (NS_WARN_IF(aRv.Failed())) {
+        return;
+      }
+
+      mReadableStreamReader = reader.forget();
+    }
+  }
+#endif
 }
 
 template void FetchBody<Request>::SetBodyUsed(JSContext* aCx, ErrorResult& aRv);
 
 template void FetchBody<Response>::SetBodyUsed(JSContext* aCx,
                                                ErrorResult& aRv);
 
 template <class Derived>
@@ -1356,16 +1403,17 @@ const nsAString& FetchBody<Derived>::Bod
 }
 
 template const nsAString& FetchBody<Request>::BodyLocalPath() const;
 
 template const nsAString& FetchBody<Response>::BodyLocalPath() const;
 
 template const nsAString& FetchBody<EmptyBody>::BodyLocalPath() const;
 
+#ifndef MOZ_DOM_STREAMS
 template <class Derived>
 void FetchBody<Derived>::SetReadableStreamBody(JSContext* aCx,
                                                JSObject* aBody) {
   MOZ_ASSERT(!mReadableStreamBody);
   MOZ_ASSERT(aBody);
   mReadableStreamBody = aBody;
 
   RefPtr<AbortSignalImpl> signalImpl = DerivedClass()->GetSignalImpl();
@@ -1386,16 +1434,47 @@ void FetchBody<Derived>::SetReadableStre
   }
 }
 
 template void FetchBody<Request>::SetReadableStreamBody(JSContext* aCx,
                                                         JSObject* aBody);
 
 template void FetchBody<Response>::SetReadableStreamBody(JSContext* aCx,
                                                          JSObject* aBody);
+#else
+template <class Derived>
+void FetchBody<Derived>::SetReadableStreamBody(JSContext* aCx,
+                                               ReadableStream* aBody) {
+  MOZ_ASSERT(!mReadableStreamBody);
+  MOZ_ASSERT(aBody);
+  mReadableStreamBody = aBody;
+
+  RefPtr<AbortSignalImpl> signalImpl = DerivedClass()->GetSignalImpl();
+  if (!signalImpl) {
+    return;
+  }
+
+  bool aborted = signalImpl->Aborted();
+  if (aborted) {
+    IgnoredErrorResult result;
+    AbortStream(aCx, mReadableStreamBody, result);
+    if (NS_WARN_IF(result.Failed())) {
+      return;
+    }
+  } else if (!IsFollowing()) {
+    Follow(signalImpl);
+  }
+}
+
+template void FetchBody<Request>::SetReadableStreamBody(JSContext* aCx,
+                                                        ReadableStream* aBody);
+
+template void FetchBody<Response>::SetReadableStreamBody(JSContext* aCx,
+                                                         ReadableStream* aBody);
+#endif
 
 #ifndef MOZ_DOM_STREAMS
 template <class Derived>
 void FetchBody<Derived>::GetBody(JSContext* aCx,
                                  JS::MutableHandle<JSObject*> aBodyOut,
                                  ErrorResult& aRv) {
   if (mReadableStreamBody) {
     aBodyOut.set(mReadableStreamBody);
@@ -1448,31 +1527,112 @@ void FetchBody<Derived>::GetBody(JSConte
 }
 
 template void FetchBody<Request>::GetBody(JSContext* aCx,
                                           JS::MutableHandle<JSObject*> aMessage,
                                           ErrorResult& aRv);
 
 template void FetchBody<Response>::GetBody(
     JSContext* aCx, JS::MutableHandle<JSObject*> aMessage, ErrorResult& aRv);
+#else
+template <class Derived>
+already_AddRefed<ReadableStream> FetchBody<Derived>::GetBody(JSContext* aCx,
+                                                             ErrorResult& aRv) {
+  if (mReadableStreamBody) {
+    RefPtr<ReadableStream> body(mReadableStreamBody);
+    return body.forget();
+  }
+
+  nsCOMPtr<nsIInputStream> inputStream;
+  DerivedClass()->GetBody(getter_AddRefs(inputStream));
+
+  if (!inputStream) {
+    return nullptr;
+  }
+
+  BodyStream::Create(aCx, this, DerivedClass()->GetParentObject(), inputStream,
+                     aRv);
+  if (NS_WARN_IF(aRv.Failed())) {
+    return nullptr;
+  }
+
+  MOZ_ASSERT(mReadableStreamBody);
+
+  RefPtr<ReadableStream> body(mReadableStreamBody);
+
+  // If the body has been already consumed, we lock the stream.
+  bool bodyUsed = GetBodyUsed(aRv);
+  if (NS_WARN_IF(aRv.Failed())) {
+    return nullptr;
+  }
+  if (bodyUsed) {
+    LockStream(aCx, body, aRv);
+    if (NS_WARN_IF(aRv.Failed())) {
+      return nullptr;
+    }
+  }
+
+  RefPtr<AbortSignalImpl> signalImpl = DerivedClass()->GetSignalImpl();
+  if (signalImpl) {
+    if (signalImpl->Aborted()) {
+      AbortStream(aCx, body, aRv);
+      if (NS_WARN_IF(aRv.Failed())) {
+        return nullptr;
+      }
+    } else if (!IsFollowing()) {
+      Follow(signalImpl);
+    }
+  }
+
+  return body.forget();
+}
+
+template already_AddRefed<ReadableStream> FetchBody<Request>::GetBody(
+    JSContext* aCx, ErrorResult& aRv);
+
+template already_AddRefed<ReadableStream> FetchBody<Response>::GetBody(
+    JSContext* aCx, ErrorResult& aRv);
+
 #endif
 
+#ifdef MOZ_DOM_STREAMS
+template <class Derived>
+void FetchBody<Derived>::LockStream(JSContext* aCx, ReadableStream* aStream,
+                                    ErrorResult& aRv) {
+  // This is native stream, creating a reader will not execute any JS code.
+  RefPtr<ReadableStreamDefaultReader> reader =
+      AcquireReadableStreamDefaultReader(aCx, aStream, aRv);
+  if (aRv.Failed()) {
+    return;
+  }
+
+  mReadableStreamReader = reader;
+}
+
+template void FetchBody<Request>::LockStream(JSContext* aCx,
+                                             ReadableStream* aStream,
+                                             ErrorResult& aRv);
+
+template void FetchBody<Response>::LockStream(JSContext* aCx,
+                                              ReadableStream* aStream,
+                                              ErrorResult& aRv);
+#else
 template <class Derived>
 void FetchBody<Derived>::LockStream(JSContext* aCx, JS::HandleObject aStream,
                                     ErrorResult& aRv) {
   aRv.MightThrowJSException();
 
-#if DEBUG
+#  if DEBUG
   JS::ReadableStreamMode streamMode;
   if (!JS::ReadableStreamGetMode(aCx, aStream, &streamMode)) {
     aRv.StealExceptionFromJSContext(aCx);
     return;
   }
   MOZ_ASSERT(streamMode == JS::ReadableStreamMode::ExternalSource);
-#endif  // DEBUG
+#  endif  // DEBUG
 
   // This is native stream, creating a reader will not execute any JS code.
   JS::Rooted<JSObject*> reader(
       aCx, JS::ReadableStreamGetReader(aCx, aStream,
                                        JS::ReadableStreamReaderMode::Default));
   if (!reader) {
     aRv.StealExceptionFromJSContext(aCx);
     return;
@@ -1483,17 +1643,19 @@ void FetchBody<Derived>::LockStream(JSCo
 
 template void FetchBody<Request>::LockStream(JSContext* aCx,
                                              JS::HandleObject aStream,
                                              ErrorResult& aRv);
 
 template void FetchBody<Response>::LockStream(JSContext* aCx,
                                               JS::HandleObject aStream,
                                               ErrorResult& aRv);
+#endif
 
+#ifndef MOZ_DOM_STREAMS
 template <class Derived>
 void FetchBody<Derived>::MaybeTeeReadableStreamBody(
     JSContext* aCx, JS::MutableHandle<JSObject*> aBodyOut,
     FetchStreamReader** aStreamReader, nsIInputStream** aInputStream,
     ErrorResult& aRv) {
   MOZ_DIAGNOSTIC_ASSERT(aStreamReader);
   MOZ_DIAGNOSTIC_ASSERT(aInputStream);
   MOZ_DIAGNOSTIC_ASSERT(!CheckBodyUsed());
@@ -1547,30 +1709,87 @@ template void FetchBody<Request>::MaybeT
     FetchStreamReader** aStreamReader, nsIInputStream** aInputStream,
     ErrorResult& aRv);
 
 template void FetchBody<Response>::MaybeTeeReadableStreamBody(
     JSContext* aCx, JS::MutableHandle<JSObject*> aMessage,
     FetchStreamReader** aStreamReader, nsIInputStream** aInputStream,
     ErrorResult& aRv);
 
+#else
+
+template <class Derived>
+void FetchBody<Derived>::MaybeTeeReadableStreamBody(
+    JSContext* aCx, ReadableStream** aBodyOut,
+    FetchStreamReader** aStreamReader, nsIInputStream** aInputStream,
+    ErrorResult& aRv) {
+  MOZ_DIAGNOSTIC_ASSERT(aStreamReader);
+  MOZ_DIAGNOSTIC_ASSERT(aInputStream);
+  MOZ_DIAGNOSTIC_ASSERT(!CheckBodyUsed());
+
+  *aBodyOut = nullptr;
+  *aStreamReader = nullptr;
+  *aInputStream = nullptr;
+
+  if (!mReadableStreamBody) {
+    return;
+  }
+
+  // If this is a ReadableStream with an native source, this has been
+  // generated by a Fetch. In this case, Fetch will be able to recreate it
+  // again when GetBody() is called.
+  if (mReadableStreamBody->HasNativeUnderlyingSource()) {
+    *aBodyOut = nullptr;
+    return;
+  }
+
+  nsTArray<RefPtr<ReadableStream> > branches;
+  mReadableStreamBody->Tee(aCx, branches, aRv);
+  if (aRv.Failed()) {
+    return;
+  }
+
+  mReadableStreamBody = branches[0];
+  branches[1].forget(aBodyOut);
+
+  aRv = FetchStreamReader::Create(aCx, mOwner, aStreamReader, aInputStream);
+  if (NS_WARN_IF(aRv.Failed())) {
+    return;
+  }
+}
+
+template void FetchBody<Request>::MaybeTeeReadableStreamBody(
+    JSContext* aCx, ReadableStream** aBodyOut,
+    FetchStreamReader** aStreamReader, nsIInputStream** aInputStream,
+    ErrorResult& aRv);
+
+template void FetchBody<Response>::MaybeTeeReadableStreamBody(
+    JSContext* aCx, ReadableStream** aBodyOut,
+    FetchStreamReader** aStreamReader, nsIInputStream** aInputStream,
+    ErrorResult& aRv);
+#endif
+
 template <class Derived>
 void FetchBody<Derived>::RunAbortAlgorithm() {
   if (!mReadableStreamBody) {
     return;
   }
 
   AutoJSAPI jsapi;
   if (!jsapi.Init(mOwner)) {
     return;
   }
 
   JSContext* cx = jsapi.cx();
 
+#ifdef MOZ_DOM_STREAMS
+  RefPtr<ReadableStream> body(mReadableStreamBody);
+#else
   JS::Rooted<JSObject*> body(cx, mReadableStreamBody);
+#endif
   IgnoredErrorResult result;
   AbortStream(cx, body, result);
 }
 
 template void FetchBody<Request>::RunAbortAlgorithm();
 
 template void FetchBody<Response>::RunAbortAlgorithm();
 
@@ -1579,24 +1798,32 @@ NS_IMPL_RELEASE_INHERITED(EmptyBody, Fet
 
 NS_IMPL_CYCLE_COLLECTION_CLASS(EmptyBody)
 
 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN_INHERITED(EmptyBody, FetchBody<EmptyBody>)
   AbortFollower::Unlink(static_cast<AbortFollower*>(tmp));
   NS_IMPL_CYCLE_COLLECTION_UNLINK(mOwner)
   NS_IMPL_CYCLE_COLLECTION_UNLINK(mAbortSignalImpl)
   NS_IMPL_CYCLE_COLLECTION_UNLINK(mFetchStreamReader)
+#ifdef MOZ_DOM_STREAMS
+  NS_IMPL_CYCLE_COLLECTION_UNLINK(mReadableStreamBody)
+  NS_IMPL_CYCLE_COLLECTION_UNLINK(mReadableStreamReader)
+#endif
 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
 
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED(EmptyBody,
                                                   FetchBody<EmptyBody>)
   AbortFollower::Traverse(static_cast<AbortFollower*>(tmp), cb);
   NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mOwner)
   NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mAbortSignalImpl)
   NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mFetchStreamReader)
+#ifdef MOZ_DOM_STREAMS
+  NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReadableStreamBody)
+  NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReadableStreamReader)
+#endif
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
 
 NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN_INHERITED(EmptyBody, FetchBody<EmptyBody>)
 NS_IMPL_CYCLE_COLLECTION_TRACE_END
 
 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(EmptyBody)
 NS_INTERFACE_MAP_END_INHERITING(FetchBody<EmptyBody>)
 
--- a/dom/fetch/Fetch.h
+++ b/dom/fetch/Fetch.h
@@ -13,16 +13,20 @@
 #include "nsString.h"
 
 #include "mozilla/DebugOnly.h"
 #include "mozilla/dom/AbortSignal.h"
 #include "mozilla/dom/BodyConsumer.h"
 #include "mozilla/dom/BodyStream.h"
 #include "mozilla/dom/Promise.h"
 #include "mozilla/dom/FetchStreamReader.h"
+#ifdef MOZ_DOM_STREAMS
+#  include "mozilla/dom/ReadableStream.h"
+#  include "mozilla/dom/ReadableStreamDefaultReaderBinding.h"
+#endif
 #include "mozilla/dom/RequestBinding.h"
 
 class nsIGlobalObject;
 class nsIEventTarget;
 
 namespace mozilla {
 class ErrorResult;
 
@@ -34,17 +38,18 @@ namespace dom {
 
 class BlobOrArrayBufferViewOrArrayBufferOrFormDataOrURLSearchParamsOrUSVString;
 class
     BlobOrArrayBufferViewOrArrayBufferOrFormDataOrURLSearchParamsOrReadableStreamOrUSVString;
 class BlobImpl;
 class InternalRequest;
 class
     OwningBlobOrArrayBufferViewOrArrayBufferOrFormDataOrURLSearchParamsOrUSVString;
-class ReadableStream;
+
+class ReadableStreamDefaultReader;
 class RequestOrUSVString;
 class WorkerPrivate;
 
 enum class CallerType : uint32_t;
 
 already_AddRefed<Promise> FetchRequest(nsIGlobalObject* aGlobal,
                                        const RequestOrUSVString& aInput,
                                        const RequestInit& aInit,
@@ -153,36 +158,43 @@ class FetchBody : public BodyStreamHolde
     return ConsumeBody(aCx, BodyConsumer::CONSUME_JSON, aRv);
   }
 
   already_AddRefed<Promise> Text(JSContext* aCx, ErrorResult& aRv) {
     return ConsumeBody(aCx, BodyConsumer::CONSUME_TEXT, aRv);
   }
 
 #ifdef MOZ_DOM_STREAMS
-  already_AddRefed<ReadableStream> GetBody(ErrorResult& aRv) {
-    MOZ_CRASH("MOZ_DOM_STREAMS:NYI");
-  }
+  already_AddRefed<ReadableStream> GetBody(JSContext* aCx, ErrorResult& aRv);
 #else
   void GetBody(JSContext* aCx, JS::MutableHandle<JSObject*> aBodyOut,
                ErrorResult& aRv);
 #endif
   void GetMimeType(nsACString& aMimeType);
 
   const nsACString& BodyBlobURISpec() const;
 
   const nsAString& BodyLocalPath() const;
 
+#ifdef MOZ_DOM_STREAMS
+  // If the body contains a ReadableStream body object, this method produces a
+  // tee() of it.
+  void MaybeTeeReadableStreamBody(JSContext* aCx, ReadableStream** aBodyOut,
+                                  FetchStreamReader** aStreamReader,
+                                  nsIInputStream** aInputStream,
+                                  ErrorResult& aRv);
+#else
   // If the body contains a ReadableStream body object, this method produces a
   // tee() of it.
   void MaybeTeeReadableStreamBody(JSContext* aCx,
                                   JS::MutableHandle<JSObject*> aBodyOut,
                                   FetchStreamReader** aStreamReader,
                                   nsIInputStream** aInputStream,
                                   ErrorResult& aRv);
+#endif
 
   // Utility public methods accessed by various runnables.
 
   // This method _must_ be called in order to set the body as used. If the body
   // is a ReadableStream, this method will start reading the stream.
   // More in details, this method does:
   // 1) It uses an internal flag to track if the body is used.  This is tracked
   // separately from the ReadableStream disturbed state due to purely native
@@ -205,21 +217,29 @@ class FetchBody : public BodyStreamHolde
 
   // BodyStreamHolder
   void NullifyStream() override {
     mReadableStreamBody = nullptr;
     mReadableStreamReader = nullptr;
     mFetchStreamReader = nullptr;
   }
 
+#ifndef MOZ_DOM_STREAMS
   void SetReadableStreamBody(JSObject* aBody) override {
     mReadableStreamBody = aBody;
   }
-
   JSObject* GetReadableStreamBody() override { return mReadableStreamBody; }
+#else
+  void SetReadableStreamBody(ReadableStream* aBody) override {
+    mReadableStreamBody = aBody;
+  }
+  ReadableStream* GetReadableStreamBody() override {
+    return mReadableStreamBody;
+  }
+#endif
 
   void MarkAsRead() override { mBodyUsed = true; }
 
   virtual AbortSignalImpl* GetSignalImpl() const = 0;
 
   // AbortFollower
   void RunAbortAlgorithm() override;
 
@@ -228,36 +248,54 @@ class FetchBody : public BodyStreamHolde
                                         ErrorResult& aRv);
 
  protected:
   nsCOMPtr<nsIGlobalObject> mOwner;
 
   // Always set whenever the FetchBody is created on the worker thread.
   WorkerPrivate* mWorkerPrivate;
 
+#ifdef MOZ_DOM_STREAMS
+  // This is the ReadableStream exposed to content. It's underlying source is a
+  // BodyStream object. This needs to be traversed by subclasses.
+  RefPtr<ReadableStream> mReadableStreamBody;
+
+  // This is the Reader used to retrieve data from the body. This needs to be
+  // traversed by subclasses.
+  RefPtr<ReadableStreamDefaultReader> mReadableStreamReader;
+#else
   // This is the ReadableStream exposed to content. It's underlying source is a
   // BodyStream object.
   JS::Heap<JSObject*> mReadableStreamBody;
 
   // This is the Reader used to retrieve data from the body.
   JS::Heap<JSObject*> mReadableStreamReader;
+#endif
   RefPtr<FetchStreamReader> mFetchStreamReader;
 
   explicit FetchBody(nsIGlobalObject* aOwner);
 
   virtual ~FetchBody();
 
+#ifdef MOZ_DOM_STREAMS
+  void SetReadableStreamBody(JSContext* aCx, ReadableStream* aBody);
+#else
   void SetReadableStreamBody(JSContext* aCx, JSObject* aBody);
+#endif
 
  private:
   Derived* DerivedClass() const {
     return static_cast<Derived*>(const_cast<FetchBody*>(this));
   }
 
+#ifdef MOZ_DOM_STREAMS
+  void LockStream(JSContext* aCx, ReadableStream* aStream, ErrorResult& aRv);
+#else
   void LockStream(JSContext* aCx, JS::HandleObject aStream, ErrorResult& aRv);
+#endif
 
   bool IsOnTargetThread() { return NS_IsMainThread() == !mWorkerPrivate; }
 
   void AssertIsOnTargetThread() { MOZ_ASSERT(IsOnTargetThread()); }
 
   // Only ever set once, always on target thread.
   bool mBodyUsed;
 
--- a/dom/fetch/FetchStreamReader.cpp
+++ b/dom/fetch/FetchStreamReader.cpp
@@ -3,49 +3,65 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "FetchStreamReader.h"
 #include "InternalResponse.h"
 #include "js/Stream.h"
 #include "mozilla/ConsoleReportCollector.h"
+#include "mozilla/ErrorResult.h"
 #include "mozilla/dom/AutoEntryScript.h"
 #include "mozilla/dom/DOMException.h"
 #include "mozilla/dom/Promise.h"
 #include "mozilla/dom/PromiseBinding.h"
+#ifdef MOZ_DOM_STREAMS
+#  include "mozilla/dom/ReadableStream.h"
+#  include "mozilla/dom/ReadableStreamDefaultController.h"
+#  include "mozilla/dom/ReadableStreamDefaultReader.h"
+#endif
 #include "mozilla/dom/WorkerPrivate.h"
 #include "mozilla/dom/WorkerRef.h"
 #include "mozilla/HoldDropJSObjects.h"
 #include "mozilla/TaskCategory.h"
 #include "nsContentUtils.h"
+#include "nsDebug.h"
 #include "nsIAsyncInputStream.h"
 #include "nsIPipe.h"
 #include "nsIScriptError.h"
 #include "nsPIDOMWindow.h"
 #include "jsapi.h"
 
 namespace mozilla::dom {
 
 NS_IMPL_CYCLE_COLLECTING_ADDREF(FetchStreamReader)
 NS_IMPL_CYCLE_COLLECTING_RELEASE(FetchStreamReader)
 
 NS_IMPL_CYCLE_COLLECTION_CLASS(FetchStreamReader)
 
 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(FetchStreamReader)
   NS_IMPL_CYCLE_COLLECTION_UNLINK(mGlobal)
+#ifdef MOZ_DOM_STREAMS
+  NS_IMPL_CYCLE_COLLECTION_UNLINK(mReader)
+#else
   tmp->mReader = nullptr;
+#endif
 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
 
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(FetchStreamReader)
   NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mGlobal)
+#ifdef MOZ_DOM_STREAMS
+  NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReader)
+#endif
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
 
 NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(FetchStreamReader)
+#ifndef MOZ_DOM_STREAMS
   NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReader)
+#endif
 NS_IMPL_CYCLE_COLLECTION_TRACE_END
 
 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchStreamReader)
   NS_INTERFACE_MAP_ENTRY(nsIOutputStreamCallback)
   NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIOutputStreamCallback)
 NS_INTERFACE_MAP_END
 
 /* static */
@@ -132,22 +148,34 @@ void FetchStreamReader::CloseAndRelease(
 
   RefPtr<FetchStreamReader> kungFuDeathGrip = this;
 
   if (aCx && mReader) {
     RefPtr<DOMException> error = DOMException::Create(aStatus);
 
     JS::Rooted<JS::Value> errorValue(aCx);
     if (ToJSValue(aCx, error, &errorValue)) {
+#ifdef MOZ_DOM_STREAMS
+      IgnoredErrorResult ignoredError;
+      // It's currently safe to cancel an already closed reader because, per the
+      // comments in ReadableStream::cancel() conveying the spec, step 2 of
+      // 3.4.3 that specified ReadableStreamCancel is: If stream.[[state]] is
+      // "closed", return a new promise resolved with undefined.
+      RefPtr<Promise> ignoredResultPromise =
+          mReader->Cancel(aCx, errorValue, ignoredError);
+      NS_WARNING_ASSERTION(!ignoredError.Failed(),
+                           "Failed to cancel stream during close and release");
+#else
       JS::Rooted<JSObject*> reader(aCx, mReader);
       // It's currently safe to cancel an already closed reader because, per the
       // comments in ReadableStream::cancel() conveying the spec, step 2 of
       // 3.4.3 that specified ReadableStreamCancel is: If stream.[[state]] is
       // "closed", return a new promise resolved with undefined.
       JS::ReadableStreamReaderCancel(aCx, reader, errorValue);
+#endif
     }
 
     // We don't want to propagate exceptions during the cleanup.
     JS_ClearPendingException(aCx);
   }
 
   mStreamClosed = true;
 
@@ -157,16 +185,41 @@ void FetchStreamReader::CloseAndRelease(
   mPipeOut = nullptr;
 
   mWorkerRef = nullptr;
 
   mReader = nullptr;
   mBuffer.Clear();
 }
 
+#ifdef MOZ_DOM_STREAMS
+void FetchStreamReader::StartConsuming(JSContext* aCx, ReadableStream* aStream,
+                                       ReadableStreamDefaultReader** aReader,
+                                       ErrorResult& aRv) {
+  MOZ_DIAGNOSTIC_ASSERT(!mReader);
+  MOZ_DIAGNOSTIC_ASSERT(aStream);
+
+  RefPtr<ReadableStreamDefaultReader> reader =
+      AcquireReadableStreamDefaultReader(aCx, aStream, aRv);
+  if (aRv.Failed()) {
+    CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
+    return;
+  }
+
+  mReader = reader;
+  reader.forget(aReader);
+
+  aRv = mPipeOut->AsyncWait(this, 0, 0, mOwningEventTarget);
+  if (NS_WARN_IF(aRv.Failed())) {
+    return;
+  }
+}
+
+#else
+
 void FetchStreamReader::StartConsuming(JSContext* aCx, JS::HandleObject aStream,
                                        JS::MutableHandle<JSObject*> aReader,
                                        ErrorResult& aRv) {
   MOZ_DIAGNOSTIC_ASSERT(!mReader);
   MOZ_DIAGNOSTIC_ASSERT(aStream);
 
   aRv.MightThrowJSException();
 
@@ -188,16 +241,17 @@ void FetchStreamReader::StartConsuming(J
   mReader = reader;
   aReader.set(reader);
 
   aRv = mPipeOut->AsyncWait(this, 0, 0, mOwningEventTarget);
   if (NS_WARN_IF(aRv.Failed())) {
     return;
   }
 }
+#endif
 
 // nsIOutputStreamCallback interface
 
 NS_IMETHODIMP
 FetchStreamReader::OnOutputStreamReady(nsIAsyncOutputStream* aStream) {
   NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
   if (mStreamClosed) {
     return NS_OK;
@@ -211,32 +265,59 @@ FetchStreamReader::OnOutputStreamReady(n
     return WriteBuffer();
   }
 
   // Here we can retrieve data from the reader using any global we want because
   // it is not observable. We want to use the reader's global, which is also the
   // Response's one.
   AutoEntryScript aes(mGlobal, "ReadableStreamReader.read", !mWorkerRef);
 
+#ifdef MOZ_DOM_STREAMS
+  ErrorResult rv;
+
+  // The below very loosely tries to implement the incrementally-read-loop from
+  // the fetch spec: However, because of the structure of the surrounding code,
+  // it makes use of the Read_ReadRequest with one modification: For the
+  // purposes of this read, we use `aForAuthorCode=false` in constructing the
+  // read request. This makes the value resolve have a null prototype, hiding
+  // this code from potential interference via `Object.prototype.then`.
+  RefPtr<Promise> domPromise = Promise::Create(mGlobal, rv);
+  if (NS_WARN_IF(rv.Failed())) {
+    // Let's close the stream.
+    CloseAndRelease(aes.cx(), NS_ERROR_DOM_INVALID_STATE_ERR);
+    return NS_ERROR_FAILURE;
+  }
+
+  RefPtr<ReadRequest> readRequest =
+      new Read_ReadRequest(domPromise, /* aForAuthorCode = */ false);
+
+  ReadableStreamDefaultReaderRead(aes.cx(), mReader, readRequest, rv);
+  if (NS_WARN_IF(rv.Failed())) {
+    // Let's close the stream.
+    CloseAndRelease(aes.cx(), NS_ERROR_DOM_INVALID_STATE_ERR);
+    return NS_ERROR_FAILURE;
+  }
+
+#else
   JS::Rooted<JSObject*> reader(aes.cx(), mReader);
   JS::Rooted<JSObject*> promise(
       aes.cx(), JS::ReadableStreamDefaultReaderRead(aes.cx(), reader));
   if (NS_WARN_IF(!promise)) {
     // Let's close the stream.
     CloseAndRelease(aes.cx(), NS_ERROR_DOM_INVALID_STATE_ERR);
     return NS_ERROR_FAILURE;
   }
 
   RefPtr<Promise> domPromise = Promise::CreateFromExisting(mGlobal, promise);
   if (NS_WARN_IF(!domPromise)) {
     // Let's close the stream.
     CloseAndRelease(aes.cx(), NS_ERROR_DOM_INVALID_STATE_ERR);
     return NS_ERROR_FAILURE;
   }
-
+#endif
   // Let's wait.
   domPromise->AppendNativeHandler(this);
   return NS_OK;
 }
 
 void FetchStreamReader::ResolvedCallback(JSContext* aCx,
                                          JS::Handle<JS::Value> aValue) {
   if (mStreamClosed) {
--- a/dom/fetch/FetchStreamReader.h
+++ b/dom/fetch/FetchStreamReader.h
@@ -12,16 +12,18 @@
 #include "mozilla/dom/FetchBinding.h"
 #include "mozilla/dom/PromiseNativeHandler.h"
 #include "nsIAsyncOutputStream.h"
 #include "nsIGlobalObject.h"
 
 namespace mozilla {
 namespace dom {
 
+class ReadableStream;
+class ReadableStreamDefaultReader;
 class WeakWorkerRef;
 
 class FetchStreamReader final : public nsIOutputStreamCallback,
                                 public PromiseNativeHandler {
  public:
   NS_DECL_CYCLE_COLLECTING_ISUPPORTS
   NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS_AMBIGUOUS(
       FetchStreamReader, nsIOutputStreamCallback)
@@ -38,35 +40,44 @@ class FetchStreamReader final : public n
   void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override;
 
   // Idempotently close the output stream and null out all state. If aCx is
   // provided, the reader will also be canceled.  aStatus must be a DOM error
   // as understood by DOMException because it will be provided as the
   // cancellation reason.
   void CloseAndRelease(JSContext* aCx, nsresult aStatus);
 
+#ifdef MOZ_DOM_STREAMS
+  void StartConsuming(JSContext* aCx, ReadableStream* aStream,
+                      ReadableStreamDefaultReader** aReader, ErrorResult& aRv);
+#else
   void StartConsuming(JSContext* aCx, JS::HandleObject aStream,
                       JS::MutableHandle<JSObject*> aReader, ErrorResult& aRv);
+#endif
 
  private:
   explicit FetchStreamReader(nsIGlobalObject* aGlobal);
   ~FetchStreamReader();
 
   nsresult WriteBuffer();
 
   void ReportErrorToConsole(JSContext* aCx, JS::Handle<JS::Value> aValue);
 
   nsCOMPtr<nsIGlobalObject> mGlobal;
   nsCOMPtr<nsIEventTarget> mOwningEventTarget;
 
   nsCOMPtr<nsIAsyncOutputStream> mPipeOut;
 
   RefPtr<WeakWorkerRef> mWorkerRef;
 
+#ifdef MOZ_DOM_STREAMS
+  RefPtr<ReadableStreamDefaultReader> mReader;
+#else
   JS::Heap<JSObject*> mReader;
+#endif
 
   nsTArray<uint8_t> mBuffer;
   uint32_t mBufferRemaining;
   uint32_t mBufferOffset;
 
   bool mStreamClosed;
 };
 
--- a/dom/fetch/Request.cpp
+++ b/dom/fetch/Request.cpp
@@ -25,34 +25,44 @@
 namespace mozilla::dom {
 
 NS_IMPL_ADDREF_INHERITED(Request, FetchBody<Request>)
 NS_IMPL_RELEASE_INHERITED(Request, FetchBody<Request>)
 
 NS_IMPL_CYCLE_COLLECTION_CLASS(Request)
 
 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN_INHERITED(Request, FetchBody<Request>)
+#ifdef MOZ_DOM_STREAMS
+  NS_IMPL_CYCLE_COLLECTION_UNLINK(mReadableStreamBody)
+  NS_IMPL_CYCLE_COLLECTION_UNLINK(mReadableStreamReader)
+#endif
   NS_IMPL_CYCLE_COLLECTION_UNLINK(mOwner)
   NS_IMPL_CYCLE_COLLECTION_UNLINK(mHeaders)
   NS_IMPL_CYCLE_COLLECTION_UNLINK(mSignal)
   AbortFollower::Unlink(static_cast<AbortFollower*>(tmp));
   NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER
 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
 
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED(Request, FetchBody<Request>)
+#ifdef MOZ_DOM_STREAMS
+  NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReadableStreamBody)
+  NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReadableStreamReader)
+#endif
   NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mOwner)
   NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mHeaders)
   NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mSignal)
   AbortFollower::Traverse(static_cast<AbortFollower*>(tmp), cb);
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
 
 NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN_INHERITED(Request, FetchBody<Request>)
+#ifndef MOZ_DOM_STREAMS
   NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamBody)
   MOZ_DIAGNOSTIC_ASSERT(!tmp->mReadableStreamReader);
   NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamReader)
+#endif
   NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER
 NS_IMPL_CYCLE_COLLECTION_TRACE_END
 
 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(Request)
   NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY
 NS_INTERFACE_MAP_END_INHERITING(FetchBody<Request>)
 
 Request::Request(nsIGlobalObject* aOwner, SafeRefPtr<InternalRequest> aRequest,
--- a/dom/fetch/Response.cpp
+++ b/dom/fetch/Response.cpp
@@ -36,34 +36,43 @@ NS_IMPL_RELEASE_INHERITED(Response, Fetc
 NS_IMPL_CYCLE_COLLECTION_CLASS(Response)
 
 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN_INHERITED(Response, FetchBody<Response>)
   AbortFollower::Unlink(static_cast<AbortFollower*>(tmp));
   NS_IMPL_CYCLE_COLLECTION_UNLINK(mOwner)
   NS_IMPL_CYCLE_COLLECTION_UNLINK(mHeaders)
   NS_IMPL_CYCLE_COLLECTION_UNLINK(mSignalImpl)
   NS_IMPL_CYCLE_COLLECTION_UNLINK(mFetchStreamReader)
-
+#ifdef MOZ_DOM_STREAMS
+  NS_IMPL_CYCLE_COLLECTION_UNLINK(mReadableStreamBody)
+  NS_IMPL_CYCLE_COLLECTION_UNLINK(mReadableStreamReader)
+#else
   tmp->mReadableStreamBody = nullptr;
   tmp->mReadableStreamReader = nullptr;
-
+#endif
   NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER
 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
 
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED(Response, FetchBody<Response>)
   AbortFollower::Traverse(static_cast<AbortFollower*>(tmp), cb);
   NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mOwner)
   NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mHeaders)
   NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mSignalImpl)
   NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mFetchStreamReader)
+#ifdef MOZ_DOM_STREAMS
+  NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReadableStreamBody)
+  NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReadableStreamReader)
+#endif
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
 
 NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN_INHERITED(Response, FetchBody<Response>)
+#ifndef MOZ_DOM_STREAMS
   NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamBody)
   NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamReader)
+#endif
   NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER
 NS_IMPL_CYCLE_COLLECTION_TRACE_END
 
 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(Response)
   NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY
 NS_INTERFACE_MAP_END_INHERITING(FetchBody<Response>)
 
 Response::Response(nsIGlobalObject* aGlobal,
@@ -276,22 +285,45 @@ already_AddRefed<Response> Response::Con
     }
 
     nsCString contentTypeWithCharset;
     nsCOMPtr<nsIInputStream> bodyStream;
     int64_t bodySize = InternalResponse::UNKNOWN_BODY_SIZE;
 
     const fetch::ResponseBodyInit& body = aBody.Value();
     if (body.IsReadableStream()) {
+      JSContext* cx = aGlobal.Context();
 #ifdef MOZ_DOM_STREAMS
-      MOZ_CRASH("MOZ_DOM_STREAMS:NYI");
+      aRv.MightThrowJSException();
+
+      ReadableStream& readableStream = body.GetAsReadableStream();
+
+      if (readableStream.Locked() || readableStream.Disturbed()) {
+        aRv.ThrowTypeError<MSG_FETCH_BODY_CONSUMED_ERROR>();
+        return nullptr;
+      }
+
+      r->SetReadableStreamBody(cx, &readableStream);
+
+      // If this is a DOM generated ReadableStream, we can extract the
+      // inputStream directly.
+      if (readableStream.HasNativeUnderlyingSource()) {
+        BodyStreamHolder* underlyingSource =
+            readableStream.GetNativeUnderlyingSource();
+        MOZ_ASSERT(underlyingSource);
+
+        aRv = BodyStream::RetrieveInputStream(underlyingSource,
+                                              getter_AddRefs(bodyStream));
+
+        if (NS_WARN_IF(aRv.Failed())) {
+          return nullptr;
+        }
 #else
       aRv.MightThrowJSException();
 
-      JSContext* cx = aGlobal.Context();
       const ReadableStream& readableStream = body.GetAsReadableStream();
 
       JS::Rooted<JSObject*> readableStreamObj(cx, readableStream.Obj());
 
       bool disturbed;
       bool locked;
       if (!JS::ReadableStreamIsDisturbed(cx, readableStreamObj, &disturbed) ||
           !JS::ReadableStreamIsLocked(cx, readableStreamObj, &locked)) {
@@ -330,27 +362,27 @@ already_AddRefed<Response> Response::Con
         if (!JS::ReadableStreamReleaseExternalUnderlyingSource(
                 cx, readableStreamObj)) {
           aRv.StealExceptionFromJSContext(cx);
           return nullptr;
         }
         if (NS_WARN_IF(aRv.Failed())) {
           return nullptr;
         }
+#endif
       } else {
         // If this is a JS-created ReadableStream, let's create a
         // FetchStreamReader.
         aRv = FetchStreamReader::Create(aGlobal.Context(), global,
                                         getter_AddRefs(r->mFetchStreamReader),
                                         getter_AddRefs(bodyStream));
         if (NS_WARN_IF(aRv.Failed())) {
           return nullptr;
         }
       }
-#endif
     } else {
       uint64_t size = 0;
       aRv = ExtractByteStreamFromBody(body, getter_AddRefs(bodyStream),
                                       contentTypeWithCharset, size);
       if (NS_WARN_IF(aRv.Failed())) {
         return nullptr;
       }
 
@@ -378,16 +410,19 @@ already_AddRefed<Response> Response::Con
 
 already_AddRefed<Response> Response::Clone(JSContext* aCx, ErrorResult& aRv) {
   bool bodyUsed = GetBodyUsed(aRv);
   if (NS_WARN_IF(aRv.Failed())) {
     return nullptr;
   }
 
   if (!bodyUsed && mReadableStreamBody) {
+#ifdef MOZ_DOM_STREAMS
+    bool locked = mReadableStreamBody->Locked();
+#else
     aRv.MightThrowJSException();
 
     AutoJSAPI jsapi;
     if (!jsapi.Init(mOwner)) {
       aRv.Throw(NS_ERROR_FAILURE);
       return nullptr;
     }
 
@@ -395,31 +430,38 @@ already_AddRefed<Response> Response::Clo
     JS::Rooted<JSObject*> body(cx, mReadableStreamBody);
     bool locked;
     // We just need to check the 'locked' state because GetBodyUsed() already
     // checked the 'disturbed' state.
     if (!JS::ReadableStreamIsLocked(cx, body, &locked)) {
       aRv.StealExceptionFromJSContext(cx);
       return nullptr;
     }
-
+#endif
     bodyUsed = locked;
   }
 
   if (bodyUsed) {
     aRv.ThrowTypeError<MSG_FETCH_BODY_CONSUMED_ERROR>();
     return nullptr;
   }
 
   RefPtr<FetchStreamReader> streamReader;
   nsCOMPtr<nsIInputStream> inputStream;
 
+#ifdef MOZ_DOM_STREAMS
+  RefPtr<ReadableStream> body;
+  MaybeTeeReadableStreamBody(aCx, getter_AddRefs(body),
+                             getter_AddRefs(streamReader),
+                             getter_AddRefs(inputStream), aRv);
+#else
   JS::Rooted<JSObject*> body(aCx);
   MaybeTeeReadableStreamBody(aCx, &body, getter_AddRefs(streamReader),
                              getter_AddRefs(inputStream), aRv);
+#endif
   if (NS_WARN_IF(aRv.Failed())) {
     return nullptr;
   }
 
   MOZ_ASSERT_IF(body, streamReader);
   MOZ_ASSERT_IF(body, inputStream);
 
   SafeRefPtr<InternalResponse> ir =
@@ -447,19 +489,26 @@ already_AddRefed<Response> Response::Clo
   if (GetBodyUsed(aRv)) {
     aRv.ThrowTypeError<MSG_FETCH_BODY_CONSUMED_ERROR>();
     return nullptr;
   }
 
   RefPtr<FetchStreamReader> streamReader;
   nsCOMPtr<nsIInputStream> inputStream;
 
+#ifdef MOZ_DOM_STREAMS
+  RefPtr<ReadableStream> body;
+  MaybeTeeReadableStreamBody(aCx, getter_AddRefs(body),
+                             getter_AddRefs(streamReader),
+                             getter_AddRefs(inputStream), aRv);
+#else
   JS::Rooted<JSObject*> body(aCx);
   MaybeTeeReadableStreamBody(aCx, &body, getter_AddRefs(streamReader),
                              getter_AddRefs(inputStream), aRv);
+#endif
   if (NS_WARN_IF(aRv.Failed())) {
     return nullptr;
   }
 
   MOZ_ASSERT_IF(body, streamReader);
   MOZ_ASSERT_IF(body, inputStream);
 
   SafeRefPtr<InternalResponse> clone =
--- a/dom/file/Blob.cpp
+++ b/dom/file/Blob.cpp
@@ -5,20 +5,24 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "Blob.h"
 #include "EmptyBlobImpl.h"
 #include "File.h"
 #include "MemoryBlobImpl.h"
 #include "mozilla/dom/BlobBinding.h"
 #include "mozilla/dom/BodyStream.h"
+#ifdef MOZ_DOM_STREAMS
+#  include "mozilla/dom/ReadableStream.h"
+#endif
 #include "mozilla/dom/WorkerCommon.h"
 #include "mozilla/dom/WorkerPrivate.h"
 #include "mozilla/HoldDropJSObjects.h"
 #include "MultipartBlobImpl.h"
+#include "nsCycleCollectionParticipant.h"
 #include "nsIGlobalObject.h"
 #include "nsIInputStream.h"
 #include "nsPIDOMWindow.h"
 #include "StreamBlobImpl.h"
 #include "StringBlobImpl.h"
 #include "js/GCAPI.h"
 
 namespace mozilla::dom {
@@ -305,55 +309,97 @@ class BlobBodyStreamHolder final : publi
                                                          BodyStreamHolder)
 
   BlobBodyStreamHolder() { mozilla::HoldJSObjects(this); }
 
   void NullifyStream() override { mozilla::DropJSObjects(this); }
 
   void MarkAsRead() override {}
 
+#ifdef MOZ_DOM_STREAMS
+  void SetReadableStreamBody(ReadableStream* aBody) override {
+    mStream = aBody;
+  }
+  ReadableStream* GetReadableStreamBody() override { return mStream; }
+
+ private:
+  RefPtr<ReadableStream> mStream;
+#else
   void SetReadableStreamBody(JSObject* aBody) override {
     MOZ_ASSERT(aBody);
     mStream = aBody;
   }
 
   JSObject* GetReadableStreamBody() override { return mStream; }
 
   // Public to make trace happy.
   JS::Heap<JSObject*> mStream;
+#endif
 
  protected:
   virtual ~BlobBodyStreamHolder() { NullifyStream(); }
 };
 
 NS_IMPL_CYCLE_COLLECTION_CLASS(BlobBodyStreamHolder)
 
 NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN_INHERITED(BlobBodyStreamHolder,
                                                BodyStreamHolder)
+#ifndef MOZ_DOM_STREAMS
   NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mStream)
+#endif
 NS_IMPL_CYCLE_COLLECTION_TRACE_END
 
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED(BlobBodyStreamHolder,
                                                   BodyStreamHolder)
+#ifdef MOZ_DOM_STREAMS
+  NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mStream)
+#endif
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
 
 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN_INHERITED(BlobBodyStreamHolder,
                                                 BodyStreamHolder)
   tmp->NullifyStream();
+#ifdef MOZ_DOM_STREAMS
+  NS_IMPL_CYCLE_COLLECTION_UNLINK(mStream)
+#endif
 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
 
 NS_IMPL_ADDREF_INHERITED(BlobBodyStreamHolder, BodyStreamHolder)
 NS_IMPL_RELEASE_INHERITED(BlobBodyStreamHolder, BodyStreamHolder)
 
 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(BlobBodyStreamHolder)
 NS_INTERFACE_MAP_END_INHERITING(BodyStreamHolder)
 
 }  // anonymous namespace
 
-#ifndef MOZ_DOM_STREAMS
+#ifdef MOZ_DOM_STREAMS
+already_AddRefed<ReadableStream> Blob::Stream(JSContext* aCx,
+                                              ErrorResult& aRv) {
+  nsCOMPtr<nsIInputStream> stream;
+  CreateInputStream(getter_AddRefs(stream), aRv);
+  if (NS_WARN_IF(aRv.Failed())) {
+    return nullptr;
+  }
+
+  if (NS_WARN_IF(!mGlobal)) {
+    aRv.Throw(NS_ERROR_FAILURE);
+    return nullptr;
+  }
+
+  RefPtr<BlobBodyStreamHolder> holder = new BlobBodyStreamHolder();
+
+  BodyStream::Create(aCx, holder, mGlobal, stream, aRv);
+  if (NS_WARN_IF(aRv.Failed())) {
+    return nullptr;
+  }
+
+  RefPtr<ReadableStream> rStream = holder->GetReadableStreamBody();
+  return rStream.forget();
+}
+#else
 void Blob::Stream(JSContext* aCx, JS::MutableHandle<JSObject*> aStream,
                   ErrorResult& aRv) {
   nsCOMPtr<nsIInputStream> stream;
   CreateInputStream(getter_AddRefs(stream), aRv);
   if (NS_WARN_IF(aRv.Failed())) {
     return;
   }
 
--- a/dom/file/Blob.h
+++ b/dom/file/Blob.h
@@ -118,19 +118,17 @@ class Blob : public nsSupportsWeakRefere
                                ErrorResult& aRv);
 
   size_t GetAllocationSize() const;
 
   nsresult GetSendInfo(nsIInputStream** aBody, uint64_t* aContentLength,
                        nsACString& aContentType, nsACString& aCharset) const;
 
 #ifdef MOZ_DOM_STREAMS
-  already_AddRefed<ReadableStream> Stream(ErrorResult& aRv) {
-    MOZ_CRASH("MOZ_DOM_STREAMS: NYI");
-  }
+  already_AddRefed<ReadableStream> Stream(JSContext* aCx, ErrorResult& aRv);
 #else
   void Stream(JSContext* aCx, JS::MutableHandle<JSObject*> aStream,
               ErrorResult& aRv);
 #endif
   already_AddRefed<Promise> Text(ErrorResult& aRv);
   already_AddRefed<Promise> ArrayBuffer(ErrorResult& aRv);
 
  protected:
--- a/dom/streams/ByteStreamHelpers.cpp
+++ b/dom/streams/ByteStreamHelpers.cpp
@@ -15,22 +15,21 @@ namespace dom {
 
 // https://streams.spec.whatwg.org/#transfer-array-buffer
 // As some parts of the specifcation want to use the abrupt completion value,
 // this function may leave a pending exception if it returns nullptr.
 JSObject* TransferArrayBuffer(JSContext* aCx, JS::Handle<JSObject*> aObject) {
   // Step 1.
   MOZ_ASSERT(!JS::IsDetachedArrayBufferObject(aObject));
 
-  // Step 2+3.
-  uint8_t* bufferData = nullptr;
-  size_t bufferLength = 0;
-  bool isSharedMemory = false;
-  JS::GetArrayBufferLengthAndData(aObject, &bufferLength, &isSharedMemory,
-                                  &bufferData);
+  // Step 3 (Reordered)
+  size_t bufferLength = JS::GetArrayBufferByteLength(aObject);
+
+  // Step 2 (Reordered)
+  void* bufferData = JS::StealArrayBufferContents(aCx, aObject);
 
   // Step 4.
   if (!JS::DetachArrayBuffer(aCx, aObject)) {
     return nullptr;
   }
 
   // Step 5.
   return JS::NewArrayBufferWithContents(aCx, bufferLength, bufferData);
new file mode 100644
--- /dev/null
+++ b/dom/streams/NativeUnderlyingSource.h
@@ -0,0 +1,42 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim:set ts=2 sw=2 sts=2 et cindent: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#ifndef mozilla_dom_NativeUnderlyingSource_h
+#define mozilla_dom_NativeUnderlyingSource_h
+
+#include "js/TypeDecls.h"
+#include "mozilla/AlreadyAddRefed.h"
+#include "mozilla/dom/BindingDeclarations.h"
+#include "mozilla/ErrorResult.h"
+#include "nsCycleCollectionParticipant.h"
+
+namespace mozilla::dom {
+class Promise;
+class ReadableStreamController;
+
+// A class implementing NativeUnderlyingSource must be kept alive via some
+// mechanism, but NativeUnderlyingSource -does not provide that mechanism-.
+class NativeUnderlyingSource {
+ public:
+  NS_INLINE_DECL_PURE_VIRTUAL_REFCOUNTING
+
+  virtual already_AddRefed<Promise> PullCallback(
+      JSContext* aCx, ReadableStreamController& aController,
+      ErrorResult& aRv) = 0;
+
+  virtual already_AddRefed<Promise> CancelCallback(
+      JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
+      ErrorResult& aRv) = 0;
+
+  virtual void ErrorCallback() = 0;
+
+ protected:
+  virtual ~NativeUnderlyingSource() = default;
+};
+
+}  // namespace mozilla::dom
+
+#endif
--- a/dom/streams/ReadableByteStreamController.cpp
+++ b/dom/streams/ReadableByteStreamController.cpp
@@ -24,16 +24,17 @@
 #include "mozilla/dom/ReadableStream.h"
 #include "mozilla/dom/ReadableStreamBYOBReader.h"
 #include "mozilla/dom/ReadableStreamBYOBRequest.h"
 #include "mozilla/dom/ReadableStreamController.h"
 #include "mozilla/dom/ReadableStreamDefaultController.h"
 #include "mozilla/dom/ReadableStreamGenericReader.h"
 #include "mozilla/dom/ToJSValue.h"
 #include "mozilla/dom/ScriptSettings.h"
+#include "mozilla/dom/UnderlyingSourceCallbackHelpers.h"
 #include "nsCycleCollectionParticipant.h"
 #include "nsIGlobalObject.h"
 #include "nsISupports.h"
 
 #include <algorithm>  // std::min
 
 namespace mozilla {
 namespace dom {
@@ -1078,17 +1079,17 @@ static void ReadableByteStreamController
   // Step 3. If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s element
   // size, return.
   if (aPullIntoDescriptor->BytesFilled() < aPullIntoDescriptor->ElementSize()) {
     return;
   }
 
   // Step 4. Perform
   // !ReadableByteStreamControllerShiftPendingPullInto(controller).
-  ReadableByteStreamControllerShiftPendingPullInto(aController);
+  (void)ReadableByteStreamControllerShiftPendingPullInto(aController);
 
   // Step 5. Let remainderSize be pullIntoDescriptor’s bytes filled mod
   // pullIntoDescriptor’s element size.
   size_t remainderSize =
       aPullIntoDescriptor->BytesFilled() % aPullIntoDescriptor->ElementSize();
 
   // Step 6. If remainderSize > 0,
   if (remainderSize > 0) {
@@ -1676,18 +1677,18 @@ NS_INTERFACE_MAP_END
 
 // https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller
 void SetUpReadableByteStreamController(
     JSContext* aCx, ReadableStream* aStream,
     ReadableByteStreamController* aController,
     UnderlyingSourceStartCallbackHelper* aStartAlgorithm,
     UnderlyingSourcePullCallbackHelper* aPullAlgorithm,
     UnderlyingSourceCancelCallbackHelper* aCancelAlgorithm,
-    double aHighWaterMark, Maybe<uint64_t> aAutoAllocateChunkSize,
-    ErrorResult& aRv) {
+    UnderlyingSourceErrorCallbackHelper* aErrorAlgorithm, double aHighWaterMark,
+    Maybe<uint64_t> aAutoAllocateChunkSize, ErrorResult& aRv) {
   // Step 1. Assert: stream.[[controller]] is undefined.
   MOZ_ASSERT(!aStream->Controller());
 
   // Step 2. If autoAllocateChunkSize is not undefined,
   if (aAutoAllocateChunkSize) {
     // Step 2.1. Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
     // Step 2.2. Assert: autoAllocateChunkSize is positive.
     MOZ_ASSERT(*aAutoAllocateChunkSize >= 0);
@@ -1715,16 +1716,19 @@ void SetUpReadableByteStreamController(
   aController->SetStrategyHWM(aHighWaterMark);
 
   // Step 9. Set controller.[[pullAlgorithm]] to pullAlgorithm.
   aController->SetPullAlgorithm(aPullAlgorithm);
 
   // Step 10. Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
   aController->SetCancelAlgorithm(aCancelAlgorithm);
 
+  // Not Specified.
+  aStream->SetErrorAlgorithm(aErrorAlgorithm);
+
   // Step 11. Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
   aController->SetAutoAllocateChunkSize(aAutoAllocateChunkSize);
 
   // Step 12. Set controller.[[pendingPullIntos]] to a new empty list.
   aController->PendingPullIntos().clear();
 
   // Step 13. Set stream.[[controller]] to controller.
   aStream->SetController(aController);
@@ -1750,10 +1754,62 @@ void SetUpReadableByteStreamController(
   }
   startPromise->MaybeResolve(startResult);
 
   // Step 16+17
   startPromise->AppendNativeHandler(
       new ByteStreamStartPromiseNativeHandler(aController));
 }
 
+// This is gently modelled on the pre-existing
+// SetUpExternalReadableByteStreamController, but specialized to the
+// BodyStreamUnderlyingSource model vs. the External streams of the JS
+// implementation.
+//
+// https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller-from-underlying-source
+MOZ_CAN_RUN_SCRIPT
+void SetUpReadableByteStreamControllerFromUnderlyingSource(
+    JSContext* aCx, ReadableStream* aStream,
+    BodyStreamHolder* aUnderlyingSource, ErrorResult& aRv) {
+  // Step 1.
+  RefPtr<ReadableByteStreamController> controller =
+      new ReadableByteStreamController(aStream->GetParentObject());
+
+  // Step 2.
+  RefPtr<UnderlyingSourceStartCallbackHelper> startAlgorithm;
+
+  // Step 3.
+  RefPtr<UnderlyingSourcePullCallbackHelper> pullAlgorithm;
+
+  // Step 4
+  RefPtr<UnderlyingSourceCancelCallbackHelper> cancelAlgorithm;
+
+  // Not Specified
+  RefPtr<UnderlyingSourceErrorCallbackHelper> errorAlgorithm;
+
+  // Step 5. Intentionally skipped: No startAlgorithm for
+  // BodyStreamUnderlyingSources. Step 6.
+  pullAlgorithm =
+      new BodyStreamUnderlyingSourcePullCallbackHelper(aUnderlyingSource);
+
+  // Step 7.
+  cancelAlgorithm =
+      new BodyStreamUnderlyingSourceCancelCallbackHelper(aUnderlyingSource);
+
+  // Not Specified
+  errorAlgorithm =
+      new BodyStreamUnderlyingSourceErrorCallbackHelper(aUnderlyingSource);
+
+  // Step 8
+  Maybe<double> autoAllocateChunkSize = mozilla::Nothing();
+  // Step 9 (Skipped)
+
+  // Not Specified: Native underlying sources always use 0.0 high water mark.
+  double highWaterMark = 0.0;
+
+  // Step 10.
+  SetUpReadableByteStreamController(
+      aCx, aStream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm,
+      errorAlgorithm, highWaterMark, autoAllocateChunkSize, aRv);
+}
+
 }  // namespace dom
 }  // namespace mozilla
--- a/dom/streams/ReadableByteStreamController.h
+++ b/dom/streams/ReadableByteStreamController.h
@@ -367,14 +367,25 @@ extern void ReadableByteStreamController
     ErrorResult& aRv);
 
 MOZ_CAN_RUN_SCRIPT extern void SetUpReadableByteStreamController(
     JSContext* aCx, ReadableStream* aStream,
     ReadableByteStreamController* aController,
     UnderlyingSourceStartCallbackHelper* aStartAlgorithm,
     UnderlyingSourcePullCallbackHelper* aPullAlgorithm,
     UnderlyingSourceCancelCallbackHelper* aCancelAlgorithm,
-    double aHighWaterMark, Maybe<uint64_t> aAutoAllocateChunkSize,
+    UnderlyingSourceErrorCallbackHelper* aErrorAlgorithm, double aHighWaterMark,
+    Maybe<uint64_t> aAutoAllocateChunkSize, ErrorResult& aRv);
+
+extern void ReadableByteStreamControllerCallPullIfNeeded(
+    JSContext* aCx, ReadableByteStreamController* aController,
     ErrorResult& aRv);
 
+void SetUpReadableByteStreamControllerFromUnderlyingSource(
+    JSContext* aCx, ReadableStream* aStream,
+    BodyStreamHolder* aUnderlyingSource, ErrorResult& aRv);
+
+void ReadableByteStreamControllerClearAlgorithms(
+    ReadableByteStreamController* aController);
+
 }  // namespace mozilla::dom
 
 #endif
--- a/dom/streams/ReadableStream.cpp
+++ b/dom/streams/ReadableStream.cpp
@@ -13,17 +13,19 @@
 #include "mozilla/AlreadyAddRefed.h"
 #include "mozilla/Assertions.h"
 #include "mozilla/Attributes.h"
 #include "mozilla/CycleCollectedJSContext.h"
 #include "mozilla/FloatingPoint.h"
 #include "mozilla/HoldDropJSObjects.h"
 #include "mozilla/dom/BindingCallContext.h"
 #include "mozilla/dom/ByteStreamHelpers.h"
+#include "mozilla/dom/BodyStream.h"
 #include "mozilla/dom/ModuleMapKey.h"
+#include "mozilla/dom/NativeUnderlyingSource.h"
 #include "mozilla/dom/QueueWithSizes.h"
 #include "mozilla/dom/QueuingStrategyBinding.h"
 #include "mozilla/dom/ReadIntoRequest.h"
 #include "mozilla/dom/ReadRequest.h"
 #include "mozilla/dom/ReadableByteStreamController.h"
 #include "mozilla/dom/ReadableStreamBYOBReader.h"
 #include "mozilla/dom/ReadableStreamBinding.h"
 #include "mozilla/dom/ReadableStreamController.h"
@@ -64,23 +66,23 @@ inline void ImplCycleCollectionUnlink(
 
 namespace mozilla {
 namespace dom {
 
 // Only needed for refcounted objects.
 NS_IMPL_CYCLE_COLLECTION_CLASS(ReadableStream)
 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(ReadableStream)
   NS_IMPL_CYCLE_COLLECTION_UNLINK(mGlobal, mController, mReader,
-                                  mErrorAlgorithm)
+                                  mErrorAlgorithm, mNativeUnderlyingSource)
   NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER
   tmp->mStoredError.setNull();
 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(ReadableStream)
   NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mGlobal, mController, mReader,
-                                    mErrorAlgorithm)
+                                    mErrorAlgorithm, mNativeUnderlyingSource)
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
 
 NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(ReadableStream)
   NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER
   NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mStoredError)
 NS_IMPL_CYCLE_COLLECTION_TRACE_END
 
 NS_IMPL_CYCLE_COLLECTING_ADDREF(ReadableStream)
@@ -140,16 +142,35 @@ bool ReadableStreamHasDefaultReader(Read
     return false;
   }
 
   // Step 3. If reader implements ReadableStreamDefaultReader, return true.
   // Step 4. Return false.
   return reader->IsDefault();
 }
 
+void ReadableStream::SetNativeUnderlyingSource(
+    BodyStreamHolder* aUnderlyingSource) {
+  mNativeUnderlyingSource = aUnderlyingSource;
+}
+
+void ReadableStream::ReleaseObjects() {
+  SetNativeUnderlyingSource(nullptr);
+
+  SetErrorAlgorithm(nullptr);
+
+  if (mController->IsByte()) {
+    ReadableByteStreamControllerClearAlgorithms(mController->AsByte());
+    return;
+  }
+
+  MOZ_ASSERT(mController->IsDefault());
+  ReadableStreamDefaultControllerClearAlgorithms(mController->AsDefault());
+}
+
 // Streams Spec: 4.2.4: https://streams.spec.whatwg.org/#rs-prototype
 /* static */
 already_AddRefed<ReadableStream> ReadableStream::Constructor(
     const GlobalObject& aGlobal,
     const Optional<JS::Handle<JSObject*>>& aUnderlyingSource,
     const QueuingStrategy& aStrategy, ErrorResult& aRv) {
   // Step 1.
   JS::RootedObject underlyingSourceObj(
@@ -842,18 +863,37 @@ already_AddRefed<ReadableStream> CreateR
 
   // Step 3. Let controller be a new ReadableByteStreamController.
   RefPtr<ReadableByteStreamController> controller =
       new ReadableByteStreamController(aGlobal);
 
   // Step 4. Perform ? SetUpReadableByteStreamController(stream, controller,
   // startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined).
   SetUpReadableByteStreamController(aCx, stream, controller, aStartAlgorithm,
-                                    aPullAlgorithm, aCancelAlgorithm, 0,
-                                    mozilla::Nothing(), aRv);
+                                    aPullAlgorithm, aCancelAlgorithm, nullptr,
+                                    0, mozilla::Nothing(), aRv);
+  if (aRv.Failed()) {
+    return nullptr;
+  }
+
+  // Return stream.
+  return stream.forget();
+}
+
+MOZ_CAN_RUN_SCRIPT
+already_AddRefed<ReadableStream> ReadableStream::Create(
+    JSContext* aCx, nsIGlobalObject* aGlobal,
+    BodyStreamHolder* aUnderlyingSource, ErrorResult& aRv) {
+  RefPtr<ReadableStream> stream = new ReadableStream(aGlobal);
+
+  stream->SetNativeUnderlyingSource(aUnderlyingSource);
+
+  SetUpReadableByteStreamControllerFromUnderlyingSource(aCx, stream,
+                                                        aUnderlyingSource, aRv);
+
   if (aRv.Failed()) {
     return nullptr;
   }
 
   // Step 5. Return stream.
   return stream.forget();
 }
 
--- a/dom/streams/ReadableStream.h
+++ b/dom/streams/ReadableStream.h
@@ -7,17 +7,19 @@
 #ifndef mozilla_dom_ReadableStream_h
 #define mozilla_dom_ReadableStream_h
 
 #include "js/TypeDecls.h"
 #include "js/Value.h"
 #include "mozilla/Attributes.h"
 #include "mozilla/ErrorResult.h"
 #include "mozilla/dom/BindingDeclarations.h"
+#include "mozilla/dom/NativeUnderlyingSource.h"
 #include "mozilla/dom/QueuingStrategyBinding.h"
+#include "mozilla/dom/ReadableStreamController.h"
 #include "mozilla/dom/ReadableStreamDefaultController.h"
 #include "nsCycleCollectionParticipant.h"
 #include "nsWrapperCache.h"
 
 #ifndef MOZ_DOM_STREAMS
 #  error "Shouldn't be compiling with this header without MOZ_DOM_STREAMS set"
 #endif
 
@@ -30,16 +32,18 @@ class ReadableStreamDefaultReader;
 class ReadableStreamGenericReader;
 struct ReadableStreamGetReaderOptions;
 struct ReadIntoRequest;
 
 using ReadableStreamReader =
     ReadableStreamDefaultReaderOrReadableStreamBYOBReader;
 using OwningReadableStreamReader =
     OwningReadableStreamDefaultReaderOrReadableStreamBYOBReader;
+class NativeUnderlyingSource;
+class BodyStreamHolder;
 
 class ReadableStream final : public nsISupports, public nsWrapperCache {
  public:
   NS_DECL_CYCLE_COLLECTING_ISUPPORTS
   NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(ReadableStream)
 
  protected:
   ~ReadableStream();
@@ -81,22 +85,34 @@ class ReadableStream final : public nsIS
 
   UnderlyingSourceErrorCallbackHelper* GetErrorAlgorithm() const {
     return mErrorAlgorithm;
   }
   void SetErrorAlgorithm(UnderlyingSourceErrorCallbackHelper* aErrorAlgorithm) {
     mErrorAlgorithm = aErrorAlgorithm;
   }
 
+  void SetNativeUnderlyingSource(BodyStreamHolder* aUnderlyingSource);
+  BodyStreamHolder* GetNativeUnderlyingSource() {
+    return mNativeUnderlyingSource;
+  }
+  bool HasNativeUnderlyingSource() { return mNativeUnderlyingSource; }
+
+  void ReleaseObjects();
+
  public:
   nsIGlobalObject* GetParentObject() const { return mGlobal; }
 
   JSObject* WrapObject(JSContext* aCx,
                        JS::Handle<JSObject*> aGivenProto) override;
 
+  static already_AddRefed<ReadableStream> Create(
+      JSContext* aCx, nsIGlobalObject* aGlobal,
+      BodyStreamHolder* aUnderlyingSource, ErrorResult& aRv);
+
   // IDL Methods
   // TODO: Use MOZ_CAN_RUN_SCRIPT when IDL constructors can use it (bug 1749042)
   MOZ_CAN_RUN_SCRIPT_BOUNDARY static already_AddRefed<ReadableStream>
   Constructor(const GlobalObject& aGlobal,
               const Optional<JS::Handle<JSObject*>>& aUnderlyingSource,
               const QueuingStrategy& aStrategy, ErrorResult& aRv);
 
   bool Locked() const;
@@ -116,16 +132,31 @@ class ReadableStream final : public nsIS
   RefPtr<ReadableStreamController> mController;
   bool mDisturbed = false;
   RefPtr<ReadableStreamGenericReader> mReader;
   ReaderState mState = ReaderState::Readable;
   JS::Heap<JS::Value> mStoredError;
 
   // Optional Callback for erroring a stream.
   RefPtr<UnderlyingSourceErrorCallbackHelper> mErrorAlgorithm;
+
+  // Optional strong reference to an Underlying Source; This
+  // exists because NativeUnderlyingSource callbacks don't hold
+  // a strong reference to the underlying source: So we need
+  // something else to hold onto that. As well, some of the integration
+  // desires the ability to extract the underlying source from the
+  // ReadableStream.
+  //
+  // While theoretically this ought to be some base class type to support
+  // multiple native underlying source types, I'm not sure what base class
+  // makes any sense for BodyStream, and given there's only body streams
+  // as the underlying source right now, I'm going to punt that problem to
+  // the future where we need to provide other native underlying sources
+  // (i.e. perhaps WebTransport.)
+  RefPtr<BodyStreamHolder> mNativeUnderlyingSource;
 };
 
 extern bool IsReadableStreamLocked(ReadableStream* aStream);
 
 extern double ReadableStreamGetNumReadRequests(ReadableStream* aStream);
 
 extern void ReadableStreamError(JSContext* aCx, ReadableStream* aStream,
                                 JS::Handle<JS::Value> aValue, ErrorResult& aRv);
--- a/dom/streams/ReadableStreamDefaultController.cpp
+++ b/dom/streams/ReadableStreamDefaultController.cpp
@@ -4,16 +4,17 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "js/Exception.h"
 #include "js/TypeDecls.h"
 #include "js/Value.h"
 #include "mozilla/AlreadyAddRefed.h"
 #include "mozilla/Attributes.h"
+#include "mozilla/HoldDropJSObjects.h"
 #include "mozilla/dom/Promise.h"
 #include "mozilla/dom/PromiseNativeHandler.h"
 #include "mozilla/dom/ReadableStream.h"
 #include "mozilla/dom/ReadableStreamController.h"
 #include "mozilla/dom/ReadableStreamDefaultController.h"
 #include "mozilla/dom/ReadableStreamDefaultControllerBinding.h"
 #include "mozilla/dom/ReadableStreamDefaultReaderBinding.h"
 #include "mozilla/dom/UnderlyingSourceBinding.h"
@@ -34,16 +35,17 @@ NS_INTERFACE_MAP_END
 // Note: Using the individual macros vs NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE
 // because I need to specificy a manual implementation of
 // NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN.
 NS_IMPL_CYCLE_COLLECTION_CLASS(ReadableStreamDefaultController)
 
 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(ReadableStreamDefaultController)
   NS_IMPL_CYCLE_COLLECTION_UNLINK(mCancelAlgorithm, mStrategySizeAlgorithm,
                                   mPullAlgorithm, mStream)
+  tmp->mQueue.clear();
   NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER
 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
 
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED(
     ReadableStreamDefaultController, ReadableStreamController)
   NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mCancelAlgorithm, mStrategySizeAlgorithm,
                                     mPullAlgorithm, mStream)
 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
@@ -64,26 +66,26 @@ NS_IMPL_RELEASE_INHERITED(ReadableStream
 
 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ReadableStreamDefaultController)
   NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY
 NS_INTERFACE_MAP_END_INHERITING(ReadableStreamController)
 
 ReadableStreamDefaultController::ReadableStreamDefaultController(
     nsIGlobalObject* aGlobal)
     : ReadableStreamController(aGlobal) {
-  // Add |MOZ_COUNT_CTOR(ReadableStreamDefaultController);| for a non-refcounted
-  // object.
+  mozilla::HoldJSObjects(this);
 }
 
 ReadableStreamDefaultController::~ReadableStreamDefaultController() {
   // MG:XXX: LinkedLists are required to be empty at destruction, but it seems
   //         it is possible to have a controller be destructed while still
   //         having entries in its queue.
   //
   //         This needs to be verified as not indicating some other issue.
+  mozilla::DropJSObjects(this);
   mQueue.clear();
 }
 
 JSObject* ReadableStreamDefaultController::WrapObject(
     JSContext* aCx, JS::Handle<JSObject*> aGivenProto) {
   return ReadableStreamDefaultController_Binding::Wrap(aCx, this, aGivenProto);
 }
 
@@ -178,17 +180,17 @@ Nullable<double> ReadableStreamDefaultCo
 //       moment,
 //       so the below doesn't quite match the spec, but serves the correct
 //       purpose for disconnecting the algorithms from the object graph to allow
 //       collection.
 //
 //       As far as I know, this isn't currently visible, but we need to keep
 //       this in mind. This is a weakness of this current implementation, and
 //       I'd prefer to have a better answer here eventually.
-static void ReadableStreamDefaultControllerClearAlgorithms(
+void ReadableStreamDefaultControllerClearAlgorithms(
     ReadableStreamDefaultController* aController) {
   // Step 1.
   aController->SetPullAlgorithm(nullptr);
 
   // Step 2.
   aController->SetCancelAlgorithm(nullptr);
 
   // Step 3.
--- a/dom/streams/ReadableStreamDefaultController.h
+++ b/dom/streams/ReadableStreamDefaultController.h
@@ -165,12 +165,15 @@ extern void ReadableStreamDefaultControl
 MOZ_CAN_RUN_SCRIPT extern void ReadableStreamDefaultReaderRead(
     JSContext* aCx, ReadableStreamGenericReader* reader, ReadRequest* aRequest,
     ErrorResult& aRv);
 
 extern void ReadableStreamDefaultControllerError(
     JSContext* aCx, ReadableStreamDefaultController* aController,
     JS::Handle<JS::Value> aValue, ErrorResult& aRv);
 
+extern void ReadableStreamDefaultControllerClearAlgorithms(
+    ReadableStreamDefaultController* aController);
+
 }  // namespace dom
 }  // namespace mozilla
 
 #endif  // mozilla_dom_ReadableStreamDefaultController_h
--- a/dom/streams/ReadableStreamDefaultReader.cpp
+++ b/dom/streams/ReadableStreamDefaultReader.cpp
@@ -124,75 +124,65 @@ ReadableStreamDefaultReader::Constructor
   }
 
   // Step 3.
   reader->mReadRequests.clear();
 
   return reader.forget();
 }
 
-static bool CreateValueDonePair(JSContext* aCx, JS::HandleValue aValue,
-                                bool aDone,
+static bool CreateValueDonePair(JSContext* aCx, bool forAuthorCode,
+                                JS::HandleValue aValue, bool aDone,
                                 JS::MutableHandleValue aReturnValue) {
-  JS::RootedObject obj(aCx, JS_NewPlainObject(aCx));
+  JS::RootedObject obj(
+      aCx, forAuthorCode ? JS_NewPlainObject(aCx)
+                         : JS_NewObjectWithGivenProto(aCx, nullptr, nullptr));
   if (!obj) {
     return false;
   }
   if (!JS_DefineProperty(aCx, obj, "value", aValue, JSPROP_ENUMERATE)) {
     return false;
   }
   JS::RootedValue done(aCx, JS::BooleanValue(aDone));
   if (!JS_DefineProperty(aCx, obj, "done", done, JSPROP_ENUMERATE)) {
     return false;
   }
 
   aReturnValue.setObject(*obj);
   return true;
 }
 
-// https://streams.spec.whatwg.org/#default-reader-read
-struct Read_ReadRequest : public ReadRequest {
- public:
-  NS_DECL_ISUPPORTS_INHERITED
-  NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(Read_ReadRequest, ReadRequest)
-
-  RefPtr<Promise> mPromise;
-
-  explicit Read_ReadRequest(Promise* aPromise) : mPromise(aPromise) {}
-
-  void ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
-                  ErrorResult& aRv) override {
-    // Step 1.
-    JS::RootedValue resolvedValue(aCx);
-    if (!CreateValueDonePair(aCx, aChunk, false, &resolvedValue)) {
-      aRv.StealExceptionFromJSContext(aCx);
-      return;
-    }
-    mPromise->MaybeResolve(resolvedValue);
+void Read_ReadRequest::ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
+                                  ErrorResult& aRv) {
+  // Step 1.
+  JS::RootedValue resolvedValue(aCx);
+  if (!CreateValueDonePair(aCx, mForAuthorCode, aChunk, false,
+                           &resolvedValue)) {
+    aRv.StealExceptionFromJSContext(aCx);
+    return;
   }
+  mPromise->MaybeResolve(resolvedValue);
+}
 
-  void CloseSteps(JSContext* aCx, ErrorResult& aRv) override {
-    // Step 1.
-    JS::RootedValue undefined(aCx, JS::UndefinedValue());
-    JS::RootedValue resolvedValue(aCx);
-    if (!CreateValueDonePair(aCx, undefined, true, &resolvedValue)) {
-      aRv.StealExceptionFromJSContext(aCx);
-      return;
-    }
-    mPromise->MaybeResolve(resolvedValue);
+void Read_ReadRequest::CloseSteps(JSContext* aCx, ErrorResult& aRv) {
+  // Step 1.
+  JS::RootedValue undefined(aCx, JS::UndefinedValue());
+  JS::RootedValue resolvedValue(aCx);
+  if (!CreateValueDonePair(aCx, mForAuthorCode, undefined, true,
+                           &resolvedValue)) {
+    aRv.StealExceptionFromJSContext(aCx);
+    return;
   }
+  mPromise->MaybeResolve(resolvedValue);
+}
 
-  void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> e,
-                  ErrorResult& aRv) override {
-    mPromise->MaybeReject(e);
-  }
-
- protected:
-  virtual ~Read_ReadRequest() = default;
-};
+void Read_ReadRequest::ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> e,
+                                  ErrorResult& aRv) {
+  mPromise->MaybeReject(e);
+}
 
 NS_IMPL_CYCLE_COLLECTION(ReadRequest)
 NS_IMPL_CYCLE_COLLECTION_INHERITED(Read_ReadRequest, ReadRequest, mPromise)
 NS_IMPL_CYCLE_COLLECTING_ADDREF(ReadRequest)
 NS_IMPL_CYCLE_COLLECTING_RELEASE(ReadRequest)
 
 NS_IMPL_ADDREF_INHERITED(Read_ReadRequest, ReadRequest)
 NS_IMPL_RELEASE_INHERITED(Read_ReadRequest, ReadRequest)
--- a/dom/streams/ReadableStreamDefaultReader.h
+++ b/dom/streams/ReadableStreamDefaultReader.h
@@ -19,16 +19,42 @@
 #include "mozilla/LinkedList.h"
 
 namespace mozilla {
 namespace dom {
 
 class Promise;
 class ReadableStream;
 
+// https://streams.spec.whatwg.org/#default-reader-read
+struct Read_ReadRequest : public ReadRequest {
+ public:
+  NS_DECL_ISUPPORTS_INHERITED
+  NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(Read_ReadRequest, ReadRequest)
+
+  RefPtr<Promise> mPromise;
+  /* This allows Gecko Internals to create objects with null prototypes, to hide
+   * promise resolution from Object.prototype.then */
+  bool mForAuthorCode = true;
+
+  explicit Read_ReadRequest(Promise* aPromise, bool aForAuthorCode = true)
+      : mPromise(aPromise), mForAuthorCode(aForAuthorCode) {}
+
+  void ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
+                  ErrorResult& aRv) override;
+
+  void CloseSteps(JSContext* aCx, ErrorResult& aRv) override;
+
+  void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> e,
+                  ErrorResult& aRv) override;
+
+ protected:
+  virtual ~Read_ReadRequest() = default;
+};
+
 class ReadableStreamDefaultReader final : public ReadableStreamGenericReader,
                                           public nsWrapperCache
 
 {
  public:
   NS_DECL_ISUPPORTS_INHERITED
   NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS_INHERITED(
       ReadableStreamDefaultReader, ReadableStreamGenericReader)
--- a/dom/streams/UnderlyingSourceCallbackHelpers.cpp
+++ b/dom/streams/UnderlyingSourceCallbackHelpers.cpp
@@ -1,16 +1,18 @@
 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
 /* vim:set ts=2 sw=2 sts=2 et cindent: */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
+#include "mozilla/dom/BodyStream.h"
 #include "mozilla/dom/UnderlyingSourceCallbackHelpers.h"
 #include "mozilla/dom/UnderlyingSourceBinding.h"
+#include "mozilla/dom/NativeUnderlyingSource.h"
 
 namespace mozilla::dom {
 
 // UnderlyingSourceStartCallbackHelper
 NS_IMPL_CYCLE_COLLECTION_CLASS(UnderlyingSourceStartCallbackHelper)
 
 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(UnderlyingSourceStartCallbackHelper)
   tmp->mThisObj.set(nullptr);
@@ -65,16 +67,29 @@ NS_IMPL_CYCLE_COLLECTION_TRACE_END
 NS_IMPL_ADDREF_INHERITED(IDLUnderlyingSourcePullCallbackHelper,
                          UnderlyingSourcePullCallbackHelper)
 NS_IMPL_RELEASE_INHERITED(IDLUnderlyingSourcePullCallbackHelper,
                           UnderlyingSourcePullCallbackHelper)
 
 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(IDLUnderlyingSourcePullCallbackHelper)
 NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourcePullCallbackHelper)
 
+// BodyStreamUnderlyingSourcePullCallbackHelper
+NS_IMPL_CYCLE_COLLECTION(BodyStreamUnderlyingSourcePullCallbackHelper,
+                         mUnderlyingSource)
+
+NS_IMPL_ADDREF_INHERITED(BodyStreamUnderlyingSourcePullCallbackHelper,
+                         UnderlyingSourcePullCallbackHelper)
+NS_IMPL_RELEASE_INHERITED(BodyStreamUnderlyingSourcePullCallbackHelper,
+                          UnderlyingSourcePullCallbackHelper)
+
+NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(
+    BodyStreamUnderlyingSourcePullCallbackHelper)
+NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourcePullCallbackHelper)
+
 // UnderlyingSourceCancelCallbackHelper
 NS_IMPL_CYCLE_COLLECTION(UnderlyingSourceCancelCallbackHelper)
 NS_IMPL_CYCLE_COLLECTING_ADDREF(UnderlyingSourceCancelCallbackHelper)
 NS_IMPL_CYCLE_COLLECTING_RELEASE(UnderlyingSourceCancelCallbackHelper)
 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSourceCancelCallbackHelper)
   NS_INTERFACE_MAP_ENTRY(nsISupports)
 NS_INTERFACE_MAP_END
 
@@ -106,16 +121,37 @@ NS_IMPL_CYCLE_COLLECTION_TRACE_END
 NS_IMPL_ADDREF_INHERITED(IDLUnderlyingSourceCancelCallbackHelper,
                          UnderlyingSourceCancelCallbackHelper)
 NS_IMPL_RELEASE_INHERITED(IDLUnderlyingSourceCancelCallbackHelper,
                           UnderlyingSourceCancelCallbackHelper)
 
 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(IDLUnderlyingSourceCancelCallbackHelper)
 NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourceCancelCallbackHelper)
 
+// UnderlyingSourcePullCallbackHelper
+NS_IMPL_CYCLE_COLLECTION(UnderlyingSourceErrorCallbackHelper)
+NS_IMPL_CYCLE_COLLECTING_ADDREF(UnderlyingSourceErrorCallbackHelper)
+NS_IMPL_CYCLE_COLLECTING_RELEASE(UnderlyingSourceErrorCallbackHelper)
+NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSourceErrorCallbackHelper)
+  NS_INTERFACE_MAP_ENTRY(nsISupports)
+NS_INTERFACE_MAP_END
+
+// BodyStreamUnderlyingSourceCancelCallbackHelper
+NS_IMPL_CYCLE_COLLECTION(BodyStreamUnderlyingSourceCancelCallbackHelper,
+                         mUnderlyingSource)
+
+NS_IMPL_ADDREF_INHERITED(BodyStreamUnderlyingSourceCancelCallbackHelper,
+                         UnderlyingSourceCancelCallbackHelper)
+NS_IMPL_RELEASE_INHERITED(BodyStreamUnderlyingSourceCancelCallbackHelper,
+                          UnderlyingSourceCancelCallbackHelper)
+
+NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(
+    BodyStreamUnderlyingSourceCancelCallbackHelper)
+NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourceCancelCallbackHelper)
+
 void UnderlyingSourceStartCallbackHelper::StartCallback(
     JSContext* aCx, ReadableStreamController& aController,
     JS::MutableHandle<JS::Value> aRetVal, ErrorResult& aRv) {
   JS::RootedObject thisObj(aCx, mThisObj);
   RefPtr<UnderlyingSourceStartCallback> callback(mCallback);
 
   ReadableStreamDefaultControllerOrReadableByteStreamController controller;
   if (aController.IsDefault()) {
@@ -145,24 +181,72 @@ already_AddRefed<Promise> IDLUnderlyingS
   RefPtr<UnderlyingSourcePullCallback> callback(mCallback);
   RefPtr<Promise> promise =
       callback->Call(thisObj, controller, aRv, "UnderlyingSource.pull",
                      CallbackFunction::eRethrowExceptions);
 
   return promise.forget();
 }
 
+BodyStreamUnderlyingSourcePullCallbackHelper::
+    BodyStreamUnderlyingSourcePullCallbackHelper(
+        BodyStreamHolder* underlyingSource)
+    : mUnderlyingSource(underlyingSource) {}
+
+already_AddRefed<Promise>
+BodyStreamUnderlyingSourcePullCallbackHelper::PullCallback(
+    JSContext* aCx, ReadableStreamController& aController, ErrorResult& aRv) {
+  RefPtr<BodyStream> bodyStream = mUnderlyingSource->GetBodyStream();
+  return bodyStream->PullCallback(aCx, aController, aRv);
+}
+
 already_AddRefed<Promise>
 IDLUnderlyingSourceCancelCallbackHelper::CancelCallback(
     JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
     ErrorResult& aRv) {
   JS::RootedObject thisObj(aCx, mThisObj);
 
   // Strong Ref
   RefPtr<UnderlyingSourceCancelCallback> callback(mCallback);
   RefPtr<Promise> promise =
       callback->Call(thisObj, aReason, aRv, "UnderlyingSource.cancel",
                      CallbackFunction::eRethrowExceptions);
 
   return promise.forget();
 }
 
+BodyStreamUnderlyingSourceCancelCallbackHelper::
+    BodyStreamUnderlyingSourceCancelCallbackHelper(
+        BodyStreamHolder* aUnderlyingSource)
+    : mUnderlyingSource(aUnderlyingSource) {}
+
+already_AddRefed<Promise>
+BodyStreamUnderlyingSourceCancelCallbackHelper::CancelCallback(
+    JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
+    ErrorResult& aRv) {
+  RefPtr<BodyStream> bodyStream = mUnderlyingSource->GetBodyStream();
+  return bodyStream->CancelCallback(aCx, aReason, aRv);
+}
+
+// BodyStreamUnderlyingSourceErrorCallbackHelper
+NS_IMPL_CYCLE_COLLECTION(BodyStreamUnderlyingSourceErrorCallbackHelper,
+                         mUnderlyingSource)
+
+NS_IMPL_ADDREF_INHERITED(BodyStreamUnderlyingSourceErrorCallbackHelper,
+                         UnderlyingSourceErrorCallbackHelper)
+NS_IMPL_RELEASE_INHERITED(BodyStreamUnderlyingSourceErrorCallbackHelper,
+                          UnderlyingSourceErrorCallbackHelper)
+
+NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(
+    BodyStreamUnderlyingSourceErrorCallbackHelper)
+NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourceErrorCallbackHelper)
+
+BodyStreamUnderlyingSourceErrorCallbackHelper::
+    BodyStreamUnderlyingSourceErrorCallbackHelper(
+        BodyStreamHolder* aUnderlyingSource)
+    : mUnderlyingSource(aUnderlyingSource) {}
+
+void BodyStreamUnderlyingSourceErrorCallbackHelper::Call() {
+  RefPtr<BodyStream> bodyStream = mUnderlyingSource->GetBodyStream();
+  bodyStream->ErrorCallback();
+}
+
 }  // namespace mozilla::dom
--- a/dom/streams/UnderlyingSourceCallbackHelpers.h
+++ b/dom/streams/UnderlyingSourceCallbackHelpers.h
@@ -4,16 +4,17 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef mozilla_dom_UnderlyingSourceCallbackHelpers_h
 #define mozilla_dom_UnderlyingSourceCallbackHelpers_h
 
 #include "mozilla/HoldDropJSObjects.h"
 #include "mozilla/dom/ModuleMapKey.h"
+#include "mozilla/dom/NativeUnderlyingSource.h"
 #include "mozilla/dom/Promise.h"
 #include "mozilla/dom/ReadableStreamDefaultController.h"
 #include "mozilla/dom/UnderlyingSourceBinding.h"
 #include "nsISupports.h"
 #include "nsISupportsImpl.h"
 
 /* Since the streams specification has native descriptions of some callbacks
  * (i.e. described in prose, rather than provided by user code), we need to be
@@ -21,16 +22,18 @@
  * classes That cover the difference between native callback and user-provided.
  *
  * The Streams specification wants us to invoke these callbacks, run through
  * WebIDL as if they were methods. So we have to preserve the underlying object
  * to use as the This value on invocation.
  */
 namespace mozilla::dom {
 
+class BodyStreamHolder;
+
 // Note: Until we need to be able to provide a native implementation of start,
 // I don't distinguish between UnderlyingSourceStartCallbackHelper and  a
 // hypothetical IDLUnderlingSourceStartCallbackHelper
 class UnderlyingSourceStartCallbackHelper : public nsISupports {
  public:
   NS_DECL_CYCLE_COLLECTING_ISUPPORTS
   NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(
       UnderlyingSourceStartCallbackHelper)
@@ -98,16 +101,39 @@ class IDLUnderlyingSourcePullCallbackHel
     mozilla::DropJSObjects(this);
   }
 
  private:
   JS::Heap<JSObject*> mThisObj;
   RefPtr<UnderlyingSourcePullCallback> mCallback;
 };
 
+class BodyStreamUnderlyingSourcePullCallbackHelper final
+    : public UnderlyingSourcePullCallbackHelper {
+ public:
+  NS_DECL_ISUPPORTS_INHERITED
+  NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(
+      BodyStreamUnderlyingSourcePullCallbackHelper,
+      UnderlyingSourcePullCallbackHelper)
+
+  explicit BodyStreamUnderlyingSourcePullCallbackHelper(
+      BodyStreamHolder* underlyingSource);
+
+  MOZ_CAN_RUN_SCRIPT
+  virtual already_AddRefed<Promise> PullCallback(
+      JSContext* aCx, ReadableStreamController& aController,
+      ErrorResult& aRv) override;
+
+ protected:
+  virtual ~BodyStreamUnderlyingSourcePullCallbackHelper() = default;
+
+ private:
+  RefPtr<BodyStreamHolder> mUnderlyingSource;
+};
+
 class UnderlyingSourceCancelCallbackHelper : public nsISupports {
  public:
   NS_DECL_CYCLE_COLLECTING_ISUPPORTS
   NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(
       UnderlyingSourceCancelCallbackHelper)
 
   MOZ_CAN_RUN_SCRIPT
   virtual already_AddRefed<Promise> CancelCallback(
@@ -144,24 +170,66 @@ class IDLUnderlyingSourceCancelCallbackH
     mozilla::DropJSObjects(this);
   }
 
  private:
   JS::Heap<JSObject*> mThisObj;
   RefPtr<UnderlyingSourceCancelCallback> mCallback;
 };
 
+class BodyStreamUnderlyingSourceCancelCallbackHelper final
+    : public UnderlyingSourceCancelCallbackHelper {
+ public:
+  NS_DECL_ISUPPORTS_INHERITED
+  NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(
+      BodyStreamUnderlyingSourceCancelCallbackHelper,
+      UnderlyingSourceCancelCallbackHelper)
+
+  explicit BodyStreamUnderlyingSourceCancelCallbackHelper(
+      BodyStreamHolder* aUnderlyingSource);
+
+  MOZ_CAN_RUN_SCRIPT
+  virtual already_AddRefed<Promise> CancelCallback(
+      JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
+      ErrorResult& aRv) override;
+
+ protected:
+  virtual ~BodyStreamUnderlyingSourceCancelCallbackHelper() = default;
+
+ private:
+  RefPtr<BodyStreamHolder> mUnderlyingSource;
+};
+
 // Callback called when erroring a stream.
 class UnderlyingSourceErrorCallbackHelper : public nsISupports {
  public:
   NS_DECL_CYCLE_COLLECTING_ISUPPORTS
-  NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(
-      UnderlyingSourceErrorCallbackHelper)
+  NS_DECL_CYCLE_COLLECTION_CLASS(UnderlyingSourceErrorCallbackHelper)
 
   virtual void Call() = 0;
 
  protected:
   virtual ~UnderlyingSourceErrorCallbackHelper() = default;
 };
 
+class BodyStreamUnderlyingSourceErrorCallbackHelper final
+    : public UnderlyingSourceErrorCallbackHelper {
+ public:
+  NS_DECL_ISUPPORTS_INHERITED
+  NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(
+      BodyStreamUnderlyingSourceErrorCallbackHelper,
+      UnderlyingSourceErrorCallbackHelper)
+
+  explicit BodyStreamUnderlyingSourceErrorCallbackHelper(
+      BodyStreamHolder* aUnderlyingSource);
+
+  virtual void Call() override;
+
+ protected:
+  virtual ~BodyStreamUnderlyingSourceErrorCallbackHelper() = default;
+
+ private:
+  RefPtr<BodyStreamHolder> mUnderlyingSource;
+};
+
 }  // namespace mozilla::dom
 
 #endif
--- a/dom/streams/moz.build
+++ b/dom/streams/moz.build
@@ -7,16 +7,17 @@
 with Files("**"):
     BUG_COMPONENT = ("Core", "DOM: Streams")
 
 EXPORTS.mozilla.dom += [
     "BaseQueuingStrategy.h",
     "ByteLengthQueuingStrategy.h",
     "ByteStreamHelpers.h",
     "CountQueuingStrategy.h",
+    "NativeUnderlyingSource.h",
     "QueueWithSizes.h",
     "ReadableByteStreamController.h",
     "ReadableStream.h",
     "ReadableStreamBYOBReader.h",
     "ReadableStreamBYOBRequest.h",
     "ReadableStreamController.h",
     "ReadableStreamDefaultController.h",
     "ReadableStreamDefaultReader.h",
new file mode 100644
--- /dev/null
+++ b/dom/streams/test/xpcshell/fetch.js
@@ -0,0 +1,36 @@
+"use strict";
+
+const { AddonTestUtils } = ChromeUtils.import(
+  "resource://testing-common/AddonTestUtils.jsm"
+);
+
+AddonTestUtils.init(this);
+AddonTestUtils.createAppInfo(
+  "xpcshell@tests.mozilla.org",
+  "XPCShell",
+  "42",
+  "42"
+);
+
+Cu.importGlobalProperties(["fetch"]);
+add_task(async function helper() {
+  do_get_profile();
+
+  // The SearchService is also needed in order to construct the initial state,
+  // which means that the AddonManager needs to be available.
+  await AddonTestUtils.promiseStartupManager();
+
+  // The example.com domain will be used to host the dynamic layout JSON and
+  // the top stories JSON.
+  let server = AddonTestUtils.createHttpServer({ hosts: ["example.com"] });
+  server.registerDirectory("/", do_get_cwd());
+
+  Assert.equal(true, fetch instanceof Function);
+  var k = await fetch("http://example.com/");
+  console.log(k);
+  console.log(k.body);
+  var r = k.body.getReader();
+  console.log(r);
+  var v = await r.read();
+  console.log(v);
+});
new file mode 100644
--- /dev/null
+++ b/dom/streams/test/xpcshell/response.js
@@ -0,0 +1,30 @@
+"use strict";
+
+add_task(async function(test) {
+  return new Response(new Blob([], { type: "text/plain" })).body.cancel();
+});
+
+add_task(function(test) {
+  var response = new Response(
+    new Blob(["This is data"], { type: "text/plain" })
+  );
+  var reader = response.body.getReader();
+  reader.read();
+  return reader.cancel();
+});
+
+add_task(function(test) {
+  var response = new Response(new Blob(["T"], { type: "text/plain" }));
+  var reader = response.body.getReader();
+
+  var closedPromise = reader.closed.then(function() {
+    return reader.cancel();
+  });
+  reader.read().then(function readMore({ done, value }) {
+    if (!done) {
+      return reader.read().then(readMore);
+    }
+    return undefined;
+  });
+  return closedPromise;
+});
--- a/dom/streams/test/xpcshell/xpcshell.ini
+++ b/dom/streams/test/xpcshell/xpcshell.ini
@@ -1,14 +1,15 @@
 [DEFAULT]
 head =
 skip-if = toolkit == 'android'
 support-files =
 
-
+[response.js]
+[fetch.js]
 [dom_stream_prototype_test.js]
 [subclassing.js]
 # The following firefox-appdir make sure that this xpcshell test will run
 # with e10s enabled (which is needed to make sure that the test case is
 # going to launch the expected new processes)
 firefox-appdir = browser
 # Disable plugin loading to make it rr able to record and replay this test.
 prefs =