Bug 1507248 - Use virtual methods for external readable stream callbacks. r=baku,jwalden
authorJason Orendorff <jorendorff@mozilla.com>
Thu, 29 Nov 2018 22:33:43 +0000
changeset 505302 5808b57453825fe1185e94fd4e7e2e551bdf878f
parent 505301 0ceb5c6cc6b076e9b79f9d5f70ae901f8fd9644e
child 505303 7180b12513ad5f0a0d1edf3188610a848b3b6ebf
push id10290
push userffxbld-merge
push dateMon, 03 Dec 2018 16:23:23 +0000
treeherdermozilla-beta@700bed2445e6 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersbaku, jwalden
bugs1507248
milestone65.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1507248 - Use virtual methods for external readable stream callbacks. r=baku,jwalden Differential Revision: https://phabricator.services.mozilla.com/D11926
dom/fetch/FetchStream.cpp
dom/fetch/FetchStream.h
dom/fetch/Response.cpp
js/public/Stream.h
js/src/builtin/Stream.cpp
js/src/builtin/Stream.h
js/src/jsapi-tests/testReadableStream.cpp
js/src/vm/Runtime.cpp
js/src/vm/Runtime.h
--- a/dom/fetch/FetchStream.cpp
+++ b/dom/fetch/FetchStream.cpp
@@ -91,240 +91,200 @@ FetchStream::Create(JSContext* aCx, Fetc
     }
 
     // Note, this will create a ref-cycle between the holder and the stream.
     // The cycle is broken when the stream is closed or the worker begins
     // shutting down.
     stream->mWorkerRef = workerRef.forget();
   }
 
-  if (!JS::HasReadableStreamCallbacks(aCx)) {
-    JS::SetReadableStreamCallbacks(aCx,
-                                   &FetchStream::RequestDataCallback,
-                                   &FetchStream::WriteIntoReadRequestCallback,
-                                   &FetchStream::CancelCallback,
-                                   &FetchStream::ClosedCallback,
-                                   &FetchStream::ErroredCallback,
-                                   &FetchStream::FinalizeCallback);
-  }
-
   aRv.MightThrowJSException();
   JS::Rooted<JSObject*> body(aCx,
     JS::NewReadableExternalSourceStreamObject(aCx, stream));
   if (!body) {
     aRv.StealExceptionFromJSContext(aCx);
     return;
   }
 
   // This will be released in FetchStream::FinalizeCallback().  We are
   // guaranteed the jsapi will call FinalizeCallback when ReadableStream
   // js object is finalized.
   NS_ADDREF(stream.get());
 
   aStream.set(body);
 }
 
-/* static */ void
-FetchStream::RequestDataCallback(JSContext* aCx,
-                                 JS::HandleObject aStream,
-                                 void* aUnderlyingSource,
-                                 size_t aDesiredSize)
+void
+FetchStream::requestData(JSContext* aCx,
+                         JS::HandleObject aStream,
+                         size_t aDesiredSize)
 {
-  MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
 #if MOZ_DIAGNOSTIC_ASSERT_ENABLED
   bool disturbed;
   if (!JS::ReadableStreamIsDisturbed(aCx, aStream, &disturbed)) {
     JS_ClearPendingException(aCx);
   } else {
     MOZ_DIAGNOSTIC_ASSERT(disturbed);
   }
 #endif
 
-  RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource);
-  stream->AssertIsOnOwningThread();
+  AssertIsOnOwningThread();
 
-  MutexAutoLock lock(stream->mMutex);
+  MutexAutoLock lock(mMutex);
 
-  MOZ_DIAGNOSTIC_ASSERT(stream->mState == eInitializing ||
-                        stream->mState == eWaiting ||
-                        stream->mState == eChecking ||
-                        stream->mState == eReading);
+  MOZ_DIAGNOSTIC_ASSERT(mState == eInitializing ||
+                        mState == eWaiting ||
+                        mState == eChecking ||
+                        mState == eReading);
 
-  if (stream->mState == eReading) {
+  if (mState == eReading) {
     // We are already reading data.
     return;
   }
 
-  if (stream->mState == eChecking) {
+  if (mState == eChecking) {
     // If we are looking for more data, there is nothing else we should do:
     // let's move this checking operation in a reading.
-    MOZ_ASSERT(stream->mInputStream);
-    stream->mState = eReading;
+    MOZ_ASSERT(mInputStream);
+    mState = eReading;
     return;
   }
 
-  if (stream->mState == eInitializing) {
+  if (mState == eInitializing) {
     // The stream has been used for the first time.
-    stream->mStreamHolder->MarkAsRead();
+    mStreamHolder->MarkAsRead();
   }
 
-  stream->mState = eReading;
+  mState = eReading;
 
-  if (!stream->mInputStream) {
+  if (!mInputStream) {
     // This is the first use of the stream. Let's convert the
     // mOriginalInputStream into an nsIAsyncInputStream.
-    MOZ_ASSERT(stream->mOriginalInputStream);
+    MOZ_ASSERT(mOriginalInputStream);
 
     nsCOMPtr<nsIAsyncInputStream> asyncStream;
     nsresult rv =
-      NS_MakeAsyncNonBlockingInputStream(stream->mOriginalInputStream.forget(),
+      NS_MakeAsyncNonBlockingInputStream(mOriginalInputStream.forget(),
                                          getter_AddRefs(asyncStream));
     if (NS_WARN_IF(NS_FAILED(rv))) {
-      stream->ErrorPropagation(aCx, lock, aStream, rv);
+      ErrorPropagation(aCx, lock, aStream, rv);
       return;
     }
 
-    stream->mInputStream = asyncStream;
-    stream->mOriginalInputStream = nullptr;
+    mInputStream = asyncStream;
+    mOriginalInputStream = nullptr;
   }
 
-  MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream);
-  MOZ_DIAGNOSTIC_ASSERT(!stream->mOriginalInputStream);
+  MOZ_DIAGNOSTIC_ASSERT(mInputStream);
+  MOZ_DIAGNOSTIC_ASSERT(!mOriginalInputStream);
 
-  nsresult rv =
-    stream->mInputStream->AsyncWait(stream, 0, 0,
-                                    stream->mOwningEventTarget);
+  nsresult rv = mInputStream->AsyncWait(this, 0, 0, mOwningEventTarget);
   if (NS_WARN_IF(NS_FAILED(rv))) {
-    stream->ErrorPropagation(aCx, lock, aStream, rv);
+    ErrorPropagation(aCx, lock, aStream, rv);
     return;
   }
 
   // All good.
 }
 
-/* static */ void
-FetchStream::WriteIntoReadRequestCallback(JSContext* aCx,
-                                          JS::HandleObject aStream,
-                                          void* aUnderlyingSource,
-                                          void* aBuffer, size_t aLength,
-                                          size_t* aByteWritten)
+void
+FetchStream::writeIntoReadRequestBuffer(JSContext* aCx,
+                                        JS::HandleObject aStream,
+                                        void* aBuffer, size_t aLength,
+                                        size_t* aByteWritten)
 {
-  MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
   MOZ_DIAGNOSTIC_ASSERT(aBuffer);
   MOZ_DIAGNOSTIC_ASSERT(aByteWritten);
 
-  RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource);
-  stream->AssertIsOnOwningThread();
+  AssertIsOnOwningThread();
+
+  MutexAutoLock lock(mMutex);
 
-  MutexAutoLock lock(stream->mMutex);
-
-  MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream);
-  MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWriting);
-  stream->mState = eChecking;
+  MOZ_DIAGNOSTIC_ASSERT(mInputStream);
+  MOZ_DIAGNOSTIC_ASSERT(mState == eWriting);
+  mState = eChecking;
 
   uint32_t written;
   nsresult rv =
-    stream->mInputStream->Read(static_cast<char*>(aBuffer), aLength, &written);
+    mInputStream->Read(static_cast<char*>(aBuffer), aLength, &written);
   if (NS_WARN_IF(NS_FAILED(rv))) {
-    stream->ErrorPropagation(aCx, lock, aStream, rv);
+    ErrorPropagation(aCx, lock, aStream, rv);
     return;
   }
 
   *aByteWritten = written;
 
   if (written == 0) {
-    stream->CloseAndReleaseObjects(aCx, lock, aStream);
+    CloseAndReleaseObjects(aCx, lock, aStream);
     return;
   }
 
-  rv = stream->mInputStream->AsyncWait(stream, 0, 0,
-                                       stream->mOwningEventTarget);
+  rv = mInputStream->AsyncWait(this, 0, 0, mOwningEventTarget);
   if (NS_WARN_IF(NS_FAILED(rv))) {
-    stream->ErrorPropagation(aCx, lock, aStream, rv);
+    ErrorPropagation(aCx, lock, aStream, rv);
     return;
   }
 
   // All good.
 }
 
-/* static */ JS::Value
-FetchStream::CancelCallback(JSContext* aCx, JS::HandleObject aStream,
-                            void* aUnderlyingSource, JS::HandleValue aReason)
+JS::Value
+FetchStream::cancel(JSContext* aCx, JS::HandleObject aStream, JS::HandleValue aReason)
 {
-  MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
+  AssertIsOnOwningThread();
 
-  // This is safe because we created an extra reference in FetchStream::Create()
-  // that won't be released until FetchStream::FinalizeCallback() is called.
-  // We are guaranteed that won't happen until the js ReadableStream object
-  // is finalized.
-  FetchStream* stream = static_cast<FetchStream*>(aUnderlyingSource);
-  stream->AssertIsOnOwningThread();
-
-  if (stream->mState == eInitializing) {
+  if (mState == eInitializing) {
     // The stream has been used for the first time.
-    stream->mStreamHolder->MarkAsRead();
+    mStreamHolder->MarkAsRead();
   }
 
-  if (stream->mInputStream) {
-    stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
+  if (mInputStream) {
+    mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
   }
 
   // It could be that we don't have mInputStream yet, but we still have the
   // original stream. We need to close that too.
-  if (stream->mOriginalInputStream) {
-    MOZ_ASSERT(!stream->mInputStream);
-    stream->mOriginalInputStream->Close();
+  if (mOriginalInputStream) {
+    MOZ_ASSERT(!mInputStream);
+    mOriginalInputStream->Close();
   }
 
-  stream->ReleaseObjects();
+  ReleaseObjects();
   return JS::UndefinedValue();
 }
 
-/* static */ void
-FetchStream::ClosedCallback(JSContext* aCx, JS::HandleObject aStream,
-                            void* aUnderlyingSource)
-{
-  MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
-}
-
-/* static */ void
-FetchStream::ErroredCallback(JSContext* aCx, JS::HandleObject aStream,
-                             void* aUnderlyingSource, JS::HandleValue aReason)
+void
+FetchStream::onClosed(JSContext* aCx, JS::HandleObject aStream)
 {
-  MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
-
-  // This is safe because we created an extra reference in FetchStream::Create()
-  // that won't be released until FetchStream::FinalizeCallback() is called.
-  // We are guaranteed that won't happen until the js ReadableStream object
-  // is finalized.
-  FetchStream* stream = static_cast<FetchStream*>(aUnderlyingSource);
-  stream->AssertIsOnOwningThread();
-
-  if (stream->mState == eInitializing) {
-    // The stream has been used for the first time.
-    stream->mStreamHolder->MarkAsRead();
-  }
-
-  if (stream->mInputStream) {
-    stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
-  }
-
-  stream->ReleaseObjects();
 }
 
 void
-FetchStream::FinalizeCallback(void* aUnderlyingSource)
+FetchStream::onErrored(JSContext* aCx, JS::HandleObject aStream, JS::HandleValue aReason)
 {
-  MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
+  AssertIsOnOwningThread();
+
+  if (mState == eInitializing) {
+    // The stream has been used for the first time.
+    mStreamHolder->MarkAsRead();
+  }
 
+  if (mInputStream) {
+    mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
+  }
+
+  ReleaseObjects();
+}
+
+void
+FetchStream::finalize()
+{
   // This can be called in any thread.
 
   // This takes ownership of the ref created in FetchStream::Create().
-  RefPtr<FetchStream> stream =
-    dont_AddRef(static_cast<FetchStream*>(aUnderlyingSource));
+  RefPtr<FetchStream> stream = dont_AddRef(this);
 
   stream->ReleaseObjects();
 }
 
 FetchStream::FetchStream(nsIGlobalObject* aGlobal,
                          FetchStreamHolder* aStreamHolder,
                          nsIInputStream* aInputStream)
   : mMutex("FetchStream::mMutex")
@@ -428,17 +388,17 @@ FetchStream::OnInputStreamReady(nsIAsync
 
   // The WriteInto callback changes mState to eChecking.
   MOZ_DIAGNOSTIC_ASSERT(mState == eChecking);
 
   return NS_OK;
 }
 
 /* static */ nsresult
-FetchStream::RetrieveInputStream(void* aUnderlyingReadableStreamSource,
+FetchStream::RetrieveInputStream(JS::ReadableStreamUnderlyingSource* aUnderlyingReadableStreamSource,
                                  nsIInputStream** aInputStream)
 {
   MOZ_ASSERT(aUnderlyingReadableStreamSource);
   MOZ_ASSERT(aInputStream);
 
   RefPtr<FetchStream> stream =
     static_cast<FetchStream*>(aUnderlyingReadableStreamSource);
   stream->AssertIsOnOwningThread();
--- a/dom/fetch/FetchStream.h
+++ b/dom/fetch/FetchStream.h
@@ -23,71 +23,68 @@ namespace mozilla {
 namespace dom {
 
 class FetchStreamHolder;
 class WeakWorkerRef;
 
 class FetchStream final : public nsIInputStreamCallback
                         , public nsIObserver
                         , public nsSupportsWeakReference
+                        , private JS::ReadableStreamUnderlyingSource
 {
 public:
   NS_DECL_THREADSAFE_ISUPPORTS
   NS_DECL_NSIINPUTSTREAMCALLBACK
   NS_DECL_NSIOBSERVER
 
   static void
   Create(JSContext* aCx, FetchStreamHolder* aStreamHolder,
          nsIGlobalObject* aGlobal, nsIInputStream* aInputStream,
          JS::MutableHandle<JSObject*> aStream, ErrorResult& aRv);
 
   void
   Close();
 
   static nsresult
-  RetrieveInputStream(void* aUnderlyingReadableStreamSource,
+  RetrieveInputStream(JS::ReadableStreamUnderlyingSource* aUnderlyingReadableStreamSource,
                       nsIInputStream** aInputStream);
 
 private:
   FetchStream(nsIGlobalObject* aGlobal, FetchStreamHolder* aStreamHolder,
               nsIInputStream* aInputStream);
   ~FetchStream();
 
 #ifdef DEBUG
   void
   AssertIsOnOwningThread();
 #else
   void
   AssertIsOnOwningThread() {}
 #endif
 
-  static void
-  RequestDataCallback(JSContext* aCx, JS::HandleObject aStream,
-                      void* aUnderlyingSource, size_t aDesiredSize);
+  void
+  requestData(JSContext* aCx, JS::HandleObject aStream, size_t aDesiredSize) override;
 
-  static void
-  WriteIntoReadRequestCallback(JSContext* aCx, JS::HandleObject aStream,
-                               void* aUnderlyingSource,
-                               void* aBuffer, size_t aLength,
-                               size_t* aByteWritten);
+  void
+  writeIntoReadRequestBuffer(JSContext* aCx, JS::HandleObject aStream,
+                             void* aBuffer, size_t aLength,
+                             size_t* aBytesWritten) override;
 
-  static JS::Value
-  CancelCallback(JSContext* aCx, JS::HandleObject aStream,
-                 void* aUnderlyingSource, JS::HandleValue aReason);
+  JS::Value
+  cancel(JSContext* aCx, JS::HandleObject aStream, JS::HandleValue aReason) override;
+
+  void
+  onClosed(JSContext* aCx, JS::HandleObject aStream) override;
 
-  static void
-  ClosedCallback(JSContext* aCx, JS::HandleObject aStream,
-                 void* aUnderlyingSource);
+  void
+  onErrored(JSContext* aCx, JS::HandleObject aStream, JS::HandleValue aReason) override;
 
-  static void
-  ErroredCallback(JSContext* aCx, JS::HandleObject aStream,
-                  void* aUnderlyingSource, JS::HandleValue reason);
+  void
+  finalize() override;
 
-  static void
-  FinalizeCallback(void* aUnderlyingSource);
 
   void
   ErrorPropagation(JSContext* aCx,
                    const MutexAutoLock& aProofOfLock,
                    JS::HandleObject aStream, nsresult aRv);
 
   void
   CloseAndReleaseObjects(JSContext* aCx,
--- a/dom/fetch/Response.cpp
+++ b/dom/fetch/Response.cpp
@@ -263,17 +263,17 @@ Response::Constructor(const GlobalObject
       JS::ReadableStreamMode streamMode;
       if (!JS::ReadableStreamGetMode(cx, readableStreamObj, &streamMode)) {
         aRv.StealExceptionFromJSContext(cx);
         return nullptr;
       }
       if (streamMode == JS::ReadableStreamMode::ExternalSource) {
         // If this is a DOM generated ReadableStream, we can extract the
         // inputStream directly.
-        void* underlyingSource = nullptr;
+        JS::ReadableStreamUnderlyingSource* underlyingSource = nullptr;
         if (!JS::ReadableStreamGetExternalUnderlyingSource(cx,
                                                            readableStreamObj,
                                                            &underlyingSource)) {
           aRv.StealExceptionFromJSContext(cx);
           return nullptr;
         }
 
         MOZ_ASSERT(underlyingSource);
--- a/js/public/Stream.h
+++ b/js/public/Stream.h
@@ -6,200 +6,183 @@
 
 /*
  * JSAPI functions and callbacks related to WHATWG Stream objects.
  *
  * Much of the API here mirrors the standard algorithms and standard JS methods
  * of the objects defined in the Streams standard. One difference is that the
  * functionality of the JS controller object is exposed to C++ as functions
  * taking ReadableStream instances instead, for convenience.
- *
- * ## External streams
- *
- * Embeddings can create ReadableStreams that read from custom C++ data
- * sources. Such streams are always byte streams: the chunks they produce are
- * typed arrays (and they will support ReadableStreamBYOBReader once we have
- * it).
- *
- * When creating an "external readable stream" using
- * JS::NewReadableExternalSourceStreamObject, an underlying source can be
- * passed to be stored on the stream. The underlying source is treated as an
- * opaque void* pointer by the JS engine: it's purely meant as a reference to
- * be used by the embedding to identify whatever actual source it uses to
- * supply data for the stream.
- *
- * External readable streams are optimized to allow the embedding to interact
- * with them with a minimum of overhead: chunks aren't enqueued as individual
- * typed arrays; instead, the embedding only updates the amount of data
- * available using ReadableStreamUpdateDataAvailableFromSource. When JS
- * requests data from a reader, WriteIntoReadRequestBufferCallback is invoked,
- * asking the embedding to write data directly into the buffer we're about to
- * hand to JS.
- *
- * Additionally, ReadableStreamGetExternalUnderlyingSource can be used to get
- * the void* pointer to the underlying source. This locks the stream until it
- * is released again using JS::ReadableStreamReleaseExternalUnderlyingSource.
- *
- * Embeddings can use this to optimize away the JS `ReadableStream` overhead
- * when an embedding-defined C++ stream is passed to an embedding-defined C++
- * consumer. For example, consider a ServiceWorker piping a `fetch` Response
- * body to a TextDecoder. Instead of copying chunks of data into JS typed array
- * buffers and creating a Promise per chunk, only to immediately resolve the
- * Promises and read the data out again, the embedding can directly feed the
- * incoming data to the TextDecoder.
  */
 
 #ifndef js_Stream_h
 #define js_Stream_h
 
 #include <stddef.h>
 
 #include "jstypes.h"
 
 #include "js/RootingAPI.h"
 #include "js/TypeDecls.h"
 
 namespace JS {
 
 /**
- * ## Readable stream callbacks
+ * Abstract base class for external underlying sources.
+ *
+ * The term "underlying source" is defined in the Streams spec:
+ *   https://streams.spec.whatwg.org/#underlying-source
+ *
+ * A `ReadableStreamUnderlyingSource` is an underlying source that is
+ * implemented in C++ rather than JS. It can be passed to
+ * `JS::NewReadableExternalSourceStreamObject` to create a custom,
+ * embedding-defined ReadableStream.
+ *
+ * There are several API difference between this class and the standard API for
+ * underlying sources implemented in JS:
+ *
+ * -   JS underlying sources can be either byte sources or non-byte sources.
+ *     External underlying source are always byte sources.
+ *
+ * -   The C++ API does not bother with controller objects. Instead of using
+ *     controller methods, the underlying source directly calls API functions
+ *     like JS::ReadableStream{UpdateDataAvailableFromSource,Close,Error}.
  *
- * Compartment safety: All callbacks (except Finalize) receive `cx` and
+ * -   External readable streams are optimized to allow the embedding to
+ *     interact with them with a minimum of overhead: chunks aren't enqueued as
+ *     individual typed arrays; instead, the embedding only updates the amount
+ *     of data available using
+ *     JS::ReadableStreamUpdateDataAvailableFromSource. When JS requests data
+ *     from a reader, writeIntoReadRequestBuffer is invoked, asking the
+ *     embedding to write data directly into the buffer we're about to hand to
+ *     JS.
+ *
+ * -   The C++ API provides extra callbacks onClosed() and onErrored().
+ *
+ * -   This class has a `finalize()` method, because C++ cares about lifetimes.
+ *
+ * Additionally, ReadableStreamGetExternalUnderlyingSource can be used to get
+ * the pointer to the underlying source. This locks the stream until it is
+ * released again using JS::ReadableStreamReleaseExternalUnderlyingSource.
+ *
+ * Embeddings can use this to optimize away the JS `ReadableStream` overhead
+ * when an embedding-defined C++ stream is passed to an embedding-defined C++
+ * consumer. For example, consider a ServiceWorker piping a `fetch` Response
+ * body to a TextDecoder. Instead of copying chunks of data into JS typed array
+ * buffers and creating a Promise per chunk, only to immediately resolve the
+ * Promises and read the data out again, the embedding can directly feed the
+ * incoming data to the TextDecoder.
+ *
+ * Compartment safety: All methods (except `finalize`) receive `cx` and
  * `stream` arguments. SpiderMonkey enters the realm of the stream object
- * before invoking these callbacks, so `stream` is never a wrapper. Other
+ * before invoking these methods, so `stream` is never a wrapper. Other
  * arguments may be wrappers.
  */
+class JS_PUBLIC_API ReadableStreamUnderlyingSource
+{
+  public:
+    virtual ~ReadableStreamUnderlyingSource() {}
 
-/**
- * Invoked whenever a reader desires more data from a ReadableStream's
- * embedding-provided underlying source.
- *
- * The given |desiredSize| is the absolute size, not a delta from the previous
- * desired size.
- */
-typedef void
-(* RequestReadableStreamDataCallback)(JSContext* cx, HandleObject stream,
-                                      void* underlyingSource, size_t desiredSize);
+    /**
+     * Invoked whenever a reader desires more data from this source.
+     *
+     * The given `desiredSize` is the absolute size, not a delta from the
+     * previous desired size.
+     */
+    virtual void requestData(JSContext* cx, HandleObject stream, size_t desiredSize) = 0;
 
-/**
- * Invoked to cause the embedding to fill the given |buffer| with data from
- * the given embedding-provided underlying source.
- *
- * This can only happen after the embedding has updated the amount of data
- * available using JS::ReadableStreamUpdateDataAvailableFromSource. If at
- * least one read request is pending when
- * JS::ReadableStreamUpdateDataAvailableFromSource is called,
- * the WriteIntoReadRequestBufferCallback is invoked immediately from under
- * the call to JS::WriteIntoReadRequestBufferCallback. If not, it is invoked
- * if and when a new read request is made.
- *
- * Note: This callback *must not cause GC*, because that could potentially
- * invalidate the |buffer| pointer.
- */
-typedef void
-(* WriteIntoReadRequestBufferCallback)(JSContext* cx, HandleObject stream,
-                                       void* underlyingSource, void* buffer, size_t length,
-                                       size_t* bytesWritten);
+    /**
+     * Invoked to cause the embedding to fill the given `buffer` with data from
+     * this underlying source.
+     *
+     * This is called only after the embedding has updated the amount of data
+     * available using JS::ReadableStreamUpdateDataAvailableFromSource. If at
+     * least one read request is pending when
+     * JS::ReadableStreamUpdateDataAvailableFromSource is called, this method
+     * is invoked immediately from under the call to
+     * JS::ReadableStreamUpdateDataAvailableFromSource. If not, it is invoked
+     * if and when a new read request is made.
+     *
+     * Note: This method *must not cause GC*, because that could potentially
+     * invalidate the `buffer` pointer.
+     */
+    virtual void writeIntoReadRequestBuffer(JSContext* cx, HandleObject stream,
+                                            void* buffer, size_t length,
+                                            size_t* bytesWritten) = 0;
 
-/**
- * Invoked in reaction to the ReadableStream being canceled to allow the
- * embedding to free the underlying source.
- *
- * This is equivalent to calling |cancel| on non-external underlying sources
- * provided to the ReadableStream constructor in JavaScript.
- *
- * The given |reason| is the JS::Value that was passed as an argument to
- * ReadableStream#cancel().
- *
- * The returned JS::Value will be used to resolve the Promise returned by
- * ReadableStream#cancel().
- */
-typedef Value
-(* CancelReadableStreamCallback)(JSContext* cx, HandleObject stream,
-                                 void* underlyingSource, HandleValue reason);
-
-/**
- * Invoked in reaction to a ReadableStream with an embedding-provided
- * underlying source being closed.
- */
-typedef void
-(* ReadableStreamClosedCallback)(JSContext* cx, HandleObject stream, void* underlyingSource);
+    /**
+     * Invoked in reaction to the ReadableStream being canceled. This is
+     * equivalent to the `cancel` method on non-external underlying sources
+     * provided to the ReadableStream constructor in JavaScript.
+     *
+     * The underlying source may free up some resources in this method, but
+     * `*this` must not be destroyed until `finalize()` is called.
+     *
+     * The given `reason` is the JS::Value that was passed as an argument to
+     * ReadableStream#cancel().
+     *
+     * The returned JS::Value will be used to resolve the Promise returned by
+     * ReadableStream#cancel().
+     */
+    virtual Value cancel(JSContext* cx, HandleObject stream, HandleValue reason) = 0;
 
-/**
- * Invoked in reaction to a ReadableStream with an embedding-provided
- * underlying source being errored with the
- * given reason.
- */
-typedef void
-(* ReadableStreamErroredCallback)(JSContext* cx, HandleObject stream, void* underlyingSource,
-                                  HandleValue reason);
+    /**
+     * Invoked when the associated ReadableStream becomes closed.
+     *
+     * The underlying source may free up some resources in this method, but
+     * `*this` must not be destroyed until `finalize()` is called.
+     */
+    virtual void onClosed(JSContext* cx, HandleObject stream) = 0;
 
-/**
- * Invoked in reaction to a ReadableStream with an embedding-provided
- * underlying source being finalized. Only the underlying source is passed
- * as an argument, while the ReadableStream itself is not to prevent the
- * embedding from operating on a JSObject that might not be in a valid state
- * anymore.
- *
- * Note: the ReadableStream might be finalized on a background thread. That
- * means this callback might be invoked from an arbitrary thread, which the
- * embedding must be able to handle.
- */
-typedef void
-(* ReadableStreamFinalizeCallback)(void* underlyingSource);
+    /**
+     * Invoked when the associated ReadableStream becomes errored.
+     *
+     * The underlying source may free up some resources in this method, but
+     * `*this` must not be destroyed until `finalize()` is called.
+     */
+    virtual void onErrored(JSContext* cx, HandleObject stream, HandleValue reason) = 0;
 
-/**
- * Sets runtime-wide callbacks to use for interacting with embedding-provided
- * hooks for operating on ReadableStream instances.
- *
- * See the documentation for the individual callback types for details.
- */
-extern JS_PUBLIC_API void
-SetReadableStreamCallbacks(JSContext* cx,
-                           RequestReadableStreamDataCallback dataRequestCallback,
-                           WriteIntoReadRequestBufferCallback writeIntoReadRequestCallback,
-                           CancelReadableStreamCallback cancelCallback,
-                           ReadableStreamClosedCallback closedCallback,
-                           ReadableStreamErroredCallback erroredCallback,
-                           ReadableStreamFinalizeCallback finalizeCallback);
-
-extern JS_PUBLIC_API bool
-HasReadableStreamCallbacks(JSContext* cx);
+    /**
+     * Invoked when the associated ReadableStream object is finalized. The
+     * stream object is not passed as an argument, as it might not be in a
+     * valid state anymore.
+     *
+     * Note: Finalization can happen on a background thread, so the embedding
+     * must be prepared for `finalize()` to be invoked from any thread.
+     */
+    virtual void finalize() = 0;
+};
 
 /**
  * Returns a new instance of the ReadableStream builtin class in the current
  * compartment, configured as a default stream.
  * If a |proto| is passed, that gets set as the instance's [[Prototype]]
  * instead of the original value of |ReadableStream.prototype|.
  */
 extern JS_PUBLIC_API JSObject*
 NewReadableDefaultStreamObject(JSContext* cx, HandleObject underlyingSource = nullptr,
                                HandleFunction size = nullptr, double highWaterMark = 1,
                                HandleObject proto = nullptr);
 
 /**
  * Returns a new instance of the ReadableStream builtin class in the current
- * compartment, with the right slot layout. If a |proto| is passed, that gets
- * set as the instance's [[Prototype]] instead of the original value of
- * |ReadableStream.prototype|.
+ * compartment. If a |proto| is passed, that gets set as the instance's
+ * [[Prototype]] instead of the original value of |ReadableStream.prototype|.
  *
  * The instance is optimized for operating as a byte stream backed by an
- * embedding-provided underlying source, using the callbacks set via
- * |JS::SetReadableStreamCallbacks|.
+ * embedding-provided underlying source, using the virtual methods of
+ * |underlyingSource| as callbacks.
  *
- * Note: the embedding is responsible for ensuring that the pointer to the
- * underlying source stays valid as long as the stream can be read from.
- * The underlying source can be freed if the tree is canceled or errored.
- * It can also be freed if the stream is destroyed. The embedding is notified
- * of that using ReadableStreamFinalizeCallback.
- *
- * Note: |underlyingSource| must have an even address.
+ * Note: The embedding must ensure that |*underlyingSource| lives as long as
+ * the new stream object. The JS engine will call the finalize() method when
+ * the stream object is destroyed.
  */
 extern JS_PUBLIC_API JSObject*
-NewReadableExternalSourceStreamObject(JSContext* cx, void* underlyingSource,
+NewReadableExternalSourceStreamObject(JSContext* cx,
+                                      ReadableStreamUnderlyingSource* underlyingSource,
                                       HandleObject proto = nullptr);
 
 /**
  * Returns the embedding-provided underlying source of the given |stream|.
  *
  * Can be used to optimize operations if both the underlying source and the
  * intended sink are embedding-provided. In that case it might be
  * preferrable to pipe data directly from source to sink without interacting
@@ -217,17 +200,18 @@ NewReadableExternalSourceStreamObject(JS
  * have a Promise to resolve/reject, which a reader provides.
  *
  * Asserts that |stream| is a ReadableStream object or an unwrappable wrapper
  * for one.
  *
  * Asserts that the stream has an embedding-provided underlying source.
  */
 extern JS_PUBLIC_API bool
-ReadableStreamGetExternalUnderlyingSource(JSContext* cx, HandleObject stream, void** source);
+ReadableStreamGetExternalUnderlyingSource(JSContext* cx, HandleObject stream,
+                                          ReadableStreamUnderlyingSource** source);
 
 /**
  * Releases the embedding-provided underlying source of the given |stream|,
  * returning the stream into an unlocked state.
  *
  * Asserts that the stream was locked through
  * ReadableStreamGetExternalUnderlyingSource.
  *
@@ -240,17 +224,17 @@ extern JS_PUBLIC_API bool
 ReadableStreamReleaseExternalUnderlyingSource(JSContext* cx, HandleObject stream);
 
 /**
  * Update the amount of data available at the underlying source of the given
  * |stream|.
  *
  * Can only be used for streams with an embedding-provided underlying source.
  * The JS engine will use the given value to satisfy read requests for the
- * stream by invoking the JS::WriteIntoReadRequestBuffer callback.
+ * stream by invoking the writeIntoReadRequestBuffer method.
  *
  * Asserts that |stream| is a ReadableStream object or an unwrappable wrapper
  * for one.
  */
 extern JS_PUBLIC_API bool
 ReadableStreamUpdateDataAvailableFromSource(JSContext* cx, HandleObject stream,
                                             uint32_t availableData);
 
@@ -371,23 +355,22 @@ ReadableStreamTee(JSContext* cx, HandleO
  *
  * Asserts that |stream| is a ReadableStream object or an unwrappable wrapper
  * for one.
  */
 extern JS_PUBLIC_API bool
 ReadableStreamGetDesiredSize(JSContext* cx, JSObject* stream, bool* hasValue, double* value);
 
 /**
- * Closes the given ReadableStream.
- *
- * Throws a TypeError and returns false if the closing operation fails.
+ * Close the given ReadableStream. This is equivalent to `controller.close()`
+ * in JS.
  *
- * Note: This is semantically equivalent to the |close| method on
- * the stream controller's prototype in JS. We expose it with the stream
- * itself as a target for simplicity.
+ * This can fail with or without an exception pending under a variety of
+ * circumstances. On failure, the stream may or may not be closed, and
+ * downstream consumers may or may not have been notified.
  *
  * Asserts that |stream| is a ReadableStream object or an unwrappable wrapper
  * for one.
  */
 extern JS_PUBLIC_API bool
 ReadableStreamClose(JSContext* cx, HandleObject stream);
 
 /**
--- a/js/src/builtin/Stream.cpp
+++ b/js/src/builtin/Stream.cpp
@@ -484,29 +484,30 @@ static MOZ_MUST_USE bool
 SetUpReadableStreamDefaultController(JSContext* cx,
                                      Handle<ReadableStream*> stream,
                                      HandleValue underlyingSource,
                                      double highWaterMarkVal,
                                      HandleValue size);
 
 static MOZ_MUST_USE ReadableByteStreamController*
 CreateExternalReadableByteStreamController(JSContext* cx, Handle<ReadableStream*> stream,
-                                           void* underlyingSource);
+                                           JS::ReadableStreamUnderlyingSource* source);
 
 ReadableStream*
-ReadableStream::createExternalSourceStream(JSContext* cx, void* underlyingSource,
+ReadableStream::createExternalSourceStream(JSContext* cx,
+                                           JS::ReadableStreamUnderlyingSource* source,
                                            HandleObject proto /* = nullptr */)
 {
     Rooted<ReadableStream*> stream(cx, create(cx, proto));
     if (!stream) {
         return nullptr;
     }
 
     Rooted<ReadableStreamController*> controller(cx);
-    controller = CreateExternalReadableByteStreamController(cx, stream, underlyingSource);
+    controller = CreateExternalReadableByteStreamController(cx, stream, source);
     if (!controller) {
         return nullptr;
     }
 
     stream->setController(controller);
     return stream;
 }
 
@@ -1512,24 +1513,21 @@ ReadableStreamCloseInternal(JSContext* c
     RootedObject closedPromise(cx, unwrappedReader->closedPromise());
     if (!cx->compartment()->wrap(cx, &closedPromise)) {
         return false;
     }
     if (!ResolvePromise(cx, closedPromise, UndefinedHandleValue)) {
         return false;
     }
 
-    if (unwrappedStream->mode() == JS::ReadableStreamMode::ExternalSource &&
-        cx->runtime()->readableStreamClosedCallback)
-    {
+    if (unwrappedStream->mode() == JS::ReadableStreamMode::ExternalSource) {
         // Make sure we're in the stream's compartment.
         AutoRealm ar(cx, unwrappedStream);
-        ReadableStreamController* controller = unwrappedStream->controller();
-        void* source = controller->underlyingSource().toPrivate();
-        cx->runtime()->readableStreamClosedCallback(cx, unwrappedStream, source);
+        JS::ReadableStreamUnderlyingSource* source = unwrappedStream->controller()->externalSource();
+        source->onClosed(cx, unwrappedStream);
     }
 
     return true;
 }
 
 /**
  * Streams spec, 3.4.5. ReadableStreamCreateReadResult ( value, done, forAuthorCode )
  */
@@ -1637,32 +1635,28 @@ ReadableStreamErrorInternal(JSContext* c
     RootedObject closedPromise(cx, unwrappedReader->closedPromise());
     if (!cx->compartment()->wrap(cx, &closedPromise)) {
         return false;
     }
     if (!RejectPromise(cx, closedPromise, e)) {
         return false;
     }
 
-    if (unwrappedStream->mode() == JS::ReadableStreamMode::ExternalSource &&
-        cx->runtime()->readableStreamErroredCallback)
-    {
+    if (unwrappedStream->mode() == JS::ReadableStreamMode::ExternalSource) {
         // Make sure we're in the stream's compartment.
         AutoRealm ar(cx, unwrappedStream);
-        ReadableStreamController* controller = unwrappedStream->controller();
-        void* source = controller->underlyingSource().toPrivate();
+        JS::ReadableStreamUnderlyingSource* source = unwrappedStream->controller()->externalSource();
 
         // Ensure that the embedding doesn't have to deal with
         // mixed-compartment arguments to the callback.
         RootedValue error(cx, e);
         if (!cx->compartment()->wrap(cx, &error)) {
             return false;
         }
-
-        cx->runtime()->readableStreamErroredCallback(cx, unwrappedStream, source, error);
+        source->onErrored(cx, unwrappedStream, error);
     }
 
     return true;
 }
 
 /**
  * Streams spec, 3.4.7.
  *      ReadableStreamFulfillReadIntoRequest( stream, chunk, done )
@@ -2563,25 +2557,25 @@ ReadableStreamControllerCancelSteps(JSCo
         return ReadableStreamTee_Cancel(cx, unwrappedteeState, unwrappedDefaultController,
                                         reason);
     }
 
     if (unwrappedController->hasExternalSource()) {
         RootedValue rval(cx);
         {
             AutoRealm ar(cx, unwrappedController);
+            JS::ReadableStreamUnderlyingSource* source = unwrappedController->externalSource();
             Rooted<ReadableStream*> stream(cx, unwrappedController->stream());
-            void* source = unwrappedUnderlyingSource.toPrivate();
             RootedValue wrappedReason(cx, reason);
             if (!cx->compartment()->wrap(cx, &wrappedReason)) {
                 return nullptr;
             }
 
             cx->check(stream, wrappedReason);
-            rval = cx->runtime()->readableStreamCancelCallback(cx, stream, source, wrappedReason);
+            rval = source->cancel(cx, stream, wrappedReason);
         }
 
         if (!cx->compartment()->wrap(cx, &rval)) {
             return nullptr;
         }
         return PromiseObject::unforgeableResolve(cx, rval);
     }
 
@@ -2795,20 +2789,20 @@ ReadableStreamControllerCallPullIfNeeded
         MOZ_ASSERT(unwrappedUnderlyingSource.toObject().is<TeeState>(),
                    "tee streams and controllers are always same-compartment with the TeeState object");
         Rooted<TeeState*> unwrappedTeeState(cx,
             &unwrappedUnderlyingSource.toObject().as<TeeState>());
         pullPromise = ReadableStreamTee_Pull(cx, unwrappedTeeState);
     } else if (unwrappedController->hasExternalSource()) {
         {
             AutoRealm ar(cx, unwrappedController);
+            JS::ReadableStreamUnderlyingSource* source = unwrappedController->externalSource();
             Rooted<ReadableStream*> stream(cx, unwrappedController->stream());
-            void* source = unwrappedUnderlyingSource.toPrivate();
             double desiredSize = ReadableStreamControllerGetDesiredSizeUnchecked(unwrappedController);
-            cx->runtime()->readableStreamDataRequestCallback(cx, stream, source, desiredSize);
+            source->requestData(cx, stream, desiredSize);
         }
         pullPromise = PromiseObject::unforgeableResolve(cx, UndefinedHandleValue);
     } else {
         RootedValue underlyingSource(cx, unwrappedUnderlyingSource);
         if (!cx->compartment()->wrap(cx, &underlyingSource)) {
             return false;
         }
         pullPromise = PromiseInvokeOrNoop(cx, underlyingSource, cx->names().pull, controllerVal);
@@ -3315,32 +3309,33 @@ ReadableByteStreamController::constructo
 
 /**
  * Version of the ReadableByteStreamConstructor that's specialized for
  * handling external, embedding-provided, underlying sources.
  */
 static MOZ_MUST_USE ReadableByteStreamController*
 CreateExternalReadableByteStreamController(JSContext* cx,
                                            Handle<ReadableStream*> stream,
-                                           void* underlyingSource)
+                                           JS::ReadableStreamUnderlyingSource* source)
 {
     Rooted<ReadableByteStreamController*> controller(cx,
         NewBuiltinClassInstance<ReadableByteStreamController>(cx));
     if (!controller) {
         return nullptr;
     }
 
     // Step 3: Set this.[[controlledReadableStream]] to stream.
     controller->setStream(stream);
 
     // Step 4: Set this.[[underlyingByteSource]] to underlyingByteSource.
-    controller->setUnderlyingSource(PrivateValue(underlyingSource));
-
-    // Step 5: Set this.[[pullAgain]], and this.[[pulling]] to false.
-    controller->setFlags(ReadableStreamController::Flag_ExternalSource);
+    controller->setExternalSource(source);
+
+    // Step 5: Set this.[[pullAgain]] and this.[[pulling]] to false (implicit).
+    MOZ_ASSERT(!controller->pullAgain());
+    MOZ_ASSERT(!controller->pulling());
 
     // Step 6: Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
     // Omitted.
 
     // Step 7: Perform ! ResetQueue(this).
     controller->setQueueTotalSize(0);
 
     // Step 8: Set this.[[started]] and this.[[closeRequested]] to false.
@@ -3403,18 +3398,17 @@ ReadableByteStreamControllerFinalize(Fre
     if (controller.getFixedSlot(ReadableStreamController::Slot_Flags).isUndefined()) {
         return;
     }
 
     if (!controller.hasExternalSource()) {
         return;
     }
 
-    void* underlyingSource = controller.underlyingSource().toPrivate();
-    obj->runtimeFromAnyThread()->readableStreamFinalizeCallback(underlyingSource);
+    controller.externalSource()->finalize();
 }
 
 static const ClassOps ReadableByteStreamControllerClassOps = {
     nullptr,        /* addProperty */
     nullptr,        /* delProperty */
     nullptr,        /* enumerate */
     nullptr,        /* newEnumerate */
     nullptr,        /* resolve */
@@ -3460,34 +3454,33 @@ ReadableByteStreamControllerPullSteps(JS
     double queueTotalSize = unwrappedController->queueTotalSize();
     if (queueTotalSize > 0) {
         // Step 3.a: Assert: ! ReadableStreamGetNumReadRequests(_stream_) is 0.
         MOZ_ASSERT(ReadableStreamGetNumReadRequests(unwrappedStream) == 0);
 
         RootedObject view(cx);
 
         if (unwrappedStream->mode() == JS::ReadableStreamMode::ExternalSource) {
-            void* underlyingSource = unwrappedController->underlyingSource().toPrivate();
+            JS::ReadableStreamUnderlyingSource* source = unwrappedController->externalSource();
 
             view = JS_NewUint8Array(cx, queueTotalSize);
             if (!view) {
                 return nullptr;
             }
 
             size_t bytesWritten;
             {
                 AutoRealm ar(cx, unwrappedStream);
                 JS::AutoSuppressGCAnalysis suppressGC(cx);
                 JS::AutoCheckCannotGC noGC;
                 bool dummy;
                 void* buffer = JS_GetArrayBufferViewData(view, &dummy, noGC);
 
-                auto cb = cx->runtime()->readableStreamWriteIntoReadRequestCallback;
-                MOZ_ASSERT(cb);
-                cb(cx, unwrappedStream, underlyingSource, buffer, queueTotalSize, &bytesWritten);
+                source->writeIntoReadRequestBuffer(cx, unwrappedStream, buffer, queueTotalSize,
+                                                   &bytesWritten);
             }
 
             queueTotalSize = queueTotalSize - bytesWritten;
         } else {
             // Step 3.b: Let entry be the first element of this.[[queue]].
             // Step 3.c: Remove entry from this.[[queue]], shifting all other
             //           elements downward (so that the second becomes the
             //           first, and so on).
@@ -4209,55 +4202,16 @@ JS_FRIEND_API JSObject*
 js::UnwrapReadableStream(JSObject* obj)
 {
     if (JSObject* unwrapped = CheckedUnwrap(obj)) {
         return unwrapped->is<ReadableStream>() ? unwrapped : nullptr;
     }
     return nullptr;
 }
 
-extern JS_PUBLIC_API void
-JS::SetReadableStreamCallbacks(JSContext* cx,
-                               JS::RequestReadableStreamDataCallback dataRequestCallback,
-                               JS::WriteIntoReadRequestBufferCallback writeIntoReadRequestCallback,
-                               JS::CancelReadableStreamCallback cancelCallback,
-                               JS::ReadableStreamClosedCallback closedCallback,
-                               JS::ReadableStreamErroredCallback erroredCallback,
-                               JS::ReadableStreamFinalizeCallback finalizeCallback)
-{
-    MOZ_ASSERT(dataRequestCallback);
-    MOZ_ASSERT(writeIntoReadRequestCallback);
-    MOZ_ASSERT(cancelCallback);
-    MOZ_ASSERT(closedCallback);
-    MOZ_ASSERT(erroredCallback);
-    MOZ_ASSERT(finalizeCallback);
-
-    JSRuntime* rt = cx->runtime();
-
-    MOZ_ASSERT(!rt->readableStreamDataRequestCallback);
-    MOZ_ASSERT(!rt->readableStreamWriteIntoReadRequestCallback);
-    MOZ_ASSERT(!rt->readableStreamCancelCallback);
-    MOZ_ASSERT(!rt->readableStreamClosedCallback);
-    MOZ_ASSERT(!rt->readableStreamErroredCallback);
-    MOZ_ASSERT(!rt->readableStreamFinalizeCallback);
-
-    rt->readableStreamDataRequestCallback = dataRequestCallback;
-    rt->readableStreamWriteIntoReadRequestCallback = writeIntoReadRequestCallback;
-    rt->readableStreamCancelCallback = cancelCallback;
-    rt->readableStreamClosedCallback = closedCallback;
-    rt->readableStreamErroredCallback = erroredCallback;
-    rt->readableStreamFinalizeCallback = finalizeCallback;
-}
-
-JS_PUBLIC_API bool
-JS::HasReadableStreamCallbacks(JSContext* cx)
-{
-    return cx->runtime()->readableStreamDataRequestCallback;
-}
-
 JS_PUBLIC_API JSObject*
 JS::NewReadableDefaultStreamObject(JSContext* cx,
                                    JS::HandleObject underlyingSource /* = nullptr */,
                                    JS::HandleFunction size /* = nullptr */,
                                    double highWaterMark /* = 1 */,
                                    JS::HandleObject proto /* = nullptr */)
 {
     MOZ_ASSERT(!cx->zone()->isAtomsZone());
@@ -4275,34 +4229,26 @@ JS::NewReadableDefaultStreamObject(JSCon
     }
     RootedValue sourceVal(cx, ObjectValue(*source));
     RootedValue sizeVal(cx, size ? ObjectValue(*size) : UndefinedValue());
     return CreateReadableStream(cx, sourceVal, highWaterMark, sizeVal);
 }
 
 JS_PUBLIC_API JSObject*
 JS::NewReadableExternalSourceStreamObject(JSContext* cx,
-                                          void* underlyingSource,
+                                          JS::ReadableStreamUnderlyingSource* underlyingSource,
                                           HandleObject proto /* = nullptr */)
 {
     MOZ_ASSERT(!cx->zone()->isAtomsZone());
     AssertHeapIsIdle();
     CHECK_THREAD(cx);
+    MOZ_ASSERT(underlyingSource);
     MOZ_ASSERT((uintptr_t(underlyingSource) & 1) == 0,
                "external underlying source pointers must be aligned");
     cx->check(proto);
-#ifdef DEBUG
-    JSRuntime* rt = cx->runtime();
-    MOZ_ASSERT(rt->readableStreamDataRequestCallback);
-    MOZ_ASSERT(rt->readableStreamWriteIntoReadRequestCallback);
-    MOZ_ASSERT(rt->readableStreamCancelCallback);
-    MOZ_ASSERT(rt->readableStreamClosedCallback);
-    MOZ_ASSERT(rt->readableStreamErroredCallback);
-    MOZ_ASSERT(rt->readableStreamFinalizeCallback);
-#endif // DEBUG
 
     return ReadableStream::createExternalSourceStream(cx, underlyingSource, proto);
 }
 
 JS_PUBLIC_API bool
 JS::IsReadableStream(JSObject* obj)
 {
     return obj->canUnwrapAs<ReadableStream>();
@@ -4405,17 +4351,18 @@ JS::ReadableStreamGetReader(JSContext* c
     }
 
     JSObject* result = CreateReadableStreamDefaultReader(cx, unwrappedStream);
     MOZ_ASSERT_IF(result, IsObjectInContextCompartment(result, cx));
     return result;
 }
 
 JS_PUBLIC_API bool
-JS::ReadableStreamGetExternalUnderlyingSource(JSContext* cx, HandleObject streamObj, void** source)
+JS::ReadableStreamGetExternalUnderlyingSource(JSContext* cx, HandleObject streamObj,
+                                              JS::ReadableStreamUnderlyingSource** source)
 {
     AssertHeapIsIdle();
     CHECK_THREAD(cx);
 
     Rooted<ReadableStream*> unwrappedStream(cx,
         APIUnwrapAndDowncast<ReadableStream>(cx, streamObj));
     if (!unwrappedStream) {
         return false;
@@ -4430,17 +4377,17 @@ JS::ReadableStreamGetExternalUnderlyingS
         JS_ReportErrorNumberASCII(cx, GetErrorMessage, nullptr,
                                   JSMSG_READABLESTREAMCONTROLLER_NOT_READABLE,
                                   "ReadableStreamGetExternalUnderlyingSource");
         return false;
     }
 
     auto unwrappedController = &unwrappedStream->controller()->as<ReadableByteStreamController>();
     unwrappedController->setSourceLocked();
-    *source = unwrappedController->underlyingSource().toPrivate();
+    *source = unwrappedController->externalSource();
     return true;
 }
 
 JS_PUBLIC_API bool
 JS::ReadableStreamReleaseExternalUnderlyingSource(JSContext* cx, HandleObject streamObj)
 {
     ReadableStream* unwrappedStream = APIUnwrapAndDowncast<ReadableStream>(cx, streamObj);
     if (!unwrappedStream) {
@@ -4528,28 +4475,27 @@ JS::ReadableStreamUpdateDataAvailableFro
         if (!viewObj) {
             return false;
         }
         Rooted<ArrayBufferViewObject*> transferredView(cx, &viewObj->as<ArrayBufferViewObject>());
         if (!transferredView) {
             return false;
         }
 
-        void* underlyingSource = unwrappedController->underlyingSource().toPrivate();
+        JS::ReadableStreamUnderlyingSource* source = unwrappedController->externalSource();
 
         size_t bytesWritten;
         {
             AutoRealm ar(cx, unwrappedStream);
             JS::AutoSuppressGCAnalysis suppressGC(cx);
             JS::AutoCheckCannotGC noGC;
             bool dummy;
             void* buffer = JS_GetArrayBufferViewData(transferredView, &dummy, noGC);
-            auto cb = cx->runtime()->readableStreamWriteIntoReadRequestCallback;
-            MOZ_ASSERT(cb);
-            cb(cx, unwrappedStream, underlyingSource, buffer, availableData, &bytesWritten);
+            source->writeIntoReadRequestBuffer(cx, unwrappedStream, buffer, availableData,
+                                               &bytesWritten);
         }
 
         // Step iii: Perform ! ReadableStreamFulfillReadRequest(stream,
         //                                                      transferredView,
         //                                                      false).
         RootedValue chunk(cx, ObjectValue(*transferredView));
         if (!ReadableStreamFulfillReadOrReadIntoRequest(cx, unwrappedStream, chunk, false)) {
             return false;
--- a/js/src/builtin/Stream.h
+++ b/js/src/builtin/Stream.h
@@ -93,17 +93,18 @@ class ReadableStream : public NativeObje
     Value storedError() const { return getFixedSlot(Slot_StoredError); }
     void setStoredError(HandleValue value) { setFixedSlot(Slot_StoredError, value); }
 
     JS::ReadableStreamMode mode() const;
 
     bool locked() const;
 
     static MOZ_MUST_USE ReadableStream* create(JSContext* cx, HandleObject proto = nullptr);
-    static ReadableStream* createExternalSourceStream(JSContext* cx, void* underlyingSource,
+    static ReadableStream* createExternalSourceStream(JSContext* cx,
+                                                      JS::ReadableStreamUnderlyingSource* source,
                                                       HandleObject proto = nullptr);
 
     static bool constructor(JSContext* cx, unsigned argc, Value* vp);
     static const ClassSpec classSpec_;
     static const Class class_;
     static const ClassSpec protoClassSpec_;
     static const Class protoClass_;
 };
@@ -263,16 +264,28 @@ class ReadableStreamController : public 
     ReadableStream* stream() const {
         return &getFixedSlot(Slot_Stream).toObject().as<ReadableStream>();
     }
     void setStream(ReadableStream* stream) { setFixedSlot(Slot_Stream, ObjectValue(*stream)); }
     Value underlyingSource() const { return getFixedSlot(Slot_UnderlyingSource); }
     void setUnderlyingSource(const Value& underlyingSource) {
         setFixedSlot(Slot_UnderlyingSource, underlyingSource);
     }
+    JS::ReadableStreamUnderlyingSource* externalSource() const {
+        static_assert(alignof(JS::ReadableStreamUnderlyingSource) >= 2,
+                      "External underling sources are stored as PrivateValues, "
+                      "so they must have even addresses");
+        MOZ_ASSERT(hasExternalSource());
+        return static_cast<JS::ReadableStreamUnderlyingSource*>(underlyingSource().toPrivate());
+    }
+    void setExternalSource(JS::ReadableStreamUnderlyingSource* underlyingSource) {
+        MOZ_ASSERT(getFixedSlot(Slot_Flags).isUndefined());
+        setUnderlyingSource(JS::PrivateValue(underlyingSource));
+        setFlags(Flag_ExternalSource);
+    }
     double strategyHWM() const { return getFixedSlot(Slot_StrategyHWM).toNumber(); }
     void setStrategyHWM(double highWaterMark) {
         setFixedSlot(Slot_StrategyHWM, NumberValue(highWaterMark));
     }
     uint32_t flags() const { return getFixedSlot(Slot_Flags).toInt32(); }
     void setFlags(uint32_t flags) { setFixedSlot(Slot_Flags, Int32Value(flags)); }
     void addFlags(uint32_t flags) { setFlags(this->flags() | flags); }
     void removeFlags(uint32_t flags) { setFlags(this->flags() & ~flags); }
@@ -325,18 +338,19 @@ class ReadableStreamDefaultController : 
 
 class ReadableByteStreamController : public ReadableStreamController
 {
   public:
     /**
      * Memory layout for ReadableByteStreamControllers, starting after the
      * slots shared among all types of controllers.
      *
-     * PendingPullIntos is guaranteed to be in the  same compartment as the
-     * controller, but might contain wrappers for objects from other compartments.
+     * PendingPullIntos is guaranteed to be in the same compartment as the
+     * controller, but might contain wrappers for objects from other
+     * compartments.
      *
      * AutoAllocateSize is a primitive (numeric) value.
      */
     enum Slots {
         Slot_BYOBRequest = ReadableStreamController::SlotCount,
         Slot_PendingPullIntos,
         Slot_AutoAllocateSize,
         SlotCount
--- a/js/src/jsapi-tests/testReadableStream.cpp
+++ b/js/src/jsapi-tests/testReadableStream.cpp
@@ -7,159 +7,149 @@
 
 #include "jsapi.h"
 #include "jsfriendapi.h"
 #include "js/Stream.h"
 #include "jsapi-tests/tests.h"
 
 using namespace JS;
 
-struct StubExternalUnderlyingSource {
-    void* buffer;
+char testBufferData[] = "1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+
+struct StubExternalUnderlyingSource : public JS::ReadableStreamUnderlyingSource {
+    void* buffer = testBufferData;
+    bool dataRequestCBCalled = false;
+    bool writeIntoRequestBufferCBCalled = false;
+    bool cancelStreamCBCalled = false;
+    Value cancelStreamReason;
+    bool streamClosedCBCalled = false;
+    Value streamClosedReason;
+    bool streamErroredCBCalled = false;
+    Value streamErroredReason;
+    bool finalizeStreamCBCalled = false;
+    void* finalizedStreamUnderlyingSource;
+
+    static StubExternalUnderlyingSource instance;
+
+    void requestData(JSContext* cx, HandleObject stream, size_t desiredSize) override {
+        js::AssertSameCompartment(cx, stream);
+        MOZ_RELEASE_ASSERT(!dataRequestCBCalled, "Invalid test setup");
+        dataRequestCBCalled = true;
+    }
+
+    void writeIntoReadRequestBuffer(JSContext* cx, HandleObject stream,
+                                    void* buffer, size_t length, size_t* bytesWritten) override
+    {
+        js::AssertSameCompartment(cx, stream);
+        MOZ_RELEASE_ASSERT(!writeIntoRequestBufferCBCalled, "Invalid test setup");
+        writeIntoRequestBufferCBCalled = true;
+
+        MOZ_RELEASE_ASSERT(this == &StubExternalUnderlyingSource::instance);
+        MOZ_RELEASE_ASSERT(StubExternalUnderlyingSource::instance.buffer == testBufferData);
+        MOZ_RELEASE_ASSERT(length <= sizeof(testBufferData));
+        memcpy(buffer, testBufferData, length);
+        *bytesWritten = length;
+    }
+
+    Value cancel(JSContext* cx, HandleObject stream, HandleValue reason) override {
+        js::AssertSameCompartment(cx, stream);
+        js::AssertSameCompartment(cx, reason);
+        MOZ_RELEASE_ASSERT(!cancelStreamCBCalled, "Invalid test setup");
+        cancelStreamCBCalled = true;
+        cancelStreamReason = reason;
+        return reason;
+    }
+
+    void onClosed(JSContext* cx, HandleObject stream) override {
+        js::AssertSameCompartment(cx, stream);
+        MOZ_RELEASE_ASSERT(!streamClosedCBCalled, "Invalid test setup");
+        streamClosedCBCalled = true;
+    }
+
+    void onErrored(JSContext* cx, HandleObject stream, HandleValue reason) override {
+        js::AssertSameCompartment(cx, stream);
+        js::AssertSameCompartment(cx, reason);
+        MOZ_RELEASE_ASSERT(!streamErroredCBCalled, "Invalid test setup");
+        streamErroredCBCalled = true;
+        streamErroredReason = reason;
+    }
+
+    void finalize() override {
+        MOZ_RELEASE_ASSERT(!finalizeStreamCBCalled, "Invalid test setup");
+        finalizeStreamCBCalled = true;
+        finalizedStreamUnderlyingSource = this;
+    }
+
+    void reset() {
+        dataRequestCBCalled = false;
+        writeIntoRequestBufferCBCalled = false;
+        cancelStreamReason = UndefinedValue();
+        cancelStreamCBCalled = false;
+        streamClosedCBCalled = false;
+        streamErroredCBCalled = false;
+        finalizeStreamCBCalled = false;
+    }
 };
 
-char testBufferData[] = "1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+StubExternalUnderlyingSource StubExternalUnderlyingSource::instance;
 
-StubExternalUnderlyingSource stubExternalUnderlyingSource = {
-    testBufferData
-};
 
 static_assert(MOZ_ALIGNOF(StubExternalUnderlyingSource) > 1,
               "UnderlyingSource pointers must not have the low bit set");
 
 static JSObject*
 NewDefaultStream(JSContext* cx, HandleObject source = nullptr, HandleFunction size = nullptr,
                  double highWaterMark = 1, HandleObject proto = nullptr)
 {
     RootedObject stream(cx, NewReadableDefaultStreamObject(cx, source, size, highWaterMark,
                                                            proto));
-    MOZ_ASSERT_IF(stream, IsReadableStream(stream));
+    if (stream) {
+        MOZ_RELEASE_ASSERT(IsReadableStream(stream));
+    }
     return stream;
 }
 
-static bool dataRequestCBCalled = false;
-static void
-DataRequestCB(JSContext* cx, HandleObject stream, void* underlyingSource,
-              size_t desiredSize)
-{
-    js::AssertSameCompartment(cx, stream);
-    MOZ_ASSERT(!dataRequestCBCalled, "Invalid test setup");
-    dataRequestCBCalled = true;
-}
-
-static bool writeIntoRequestBufferCBCalled = false;
-static void
-WriteIntoRequestBufferCB(JSContext* cx, HandleObject stream, void* underlyingSource,
-                         void* buffer, size_t length, size_t* bytesWritten)
-{
-    js::AssertSameCompartment(cx, stream);
-    MOZ_ASSERT(!writeIntoRequestBufferCBCalled, "Invalid test setup");
-    writeIntoRequestBufferCBCalled = true;
-
-    MOZ_ASSERT(underlyingSource == &stubExternalUnderlyingSource);
-    MOZ_ASSERT(stubExternalUnderlyingSource.buffer == testBufferData);
-    MOZ_ASSERT(length <= sizeof(testBufferData));
-    memcpy(buffer, testBufferData, length);
-    *bytesWritten = length;
-}
-
-static bool cancelStreamCBCalled = false;
-static Value cancelStreamReason;
-static Value
-CancelStreamCB(JSContext* cx, HandleObject stream, void* underlyingSource,
-               HandleValue reason)
-{
-    js::AssertSameCompartment(cx, stream);
-    js::AssertSameCompartment(cx, reason);
-    MOZ_ASSERT(!cancelStreamCBCalled, "Invalid test setup");
-    cancelStreamCBCalled = true;
-    cancelStreamReason = reason;
-    return reason;
-}
-
-static bool streamClosedCBCalled = false;
-static Value streamClosedReason;
-static void
-StreamClosedCB(JSContext* cx, HandleObject stream, void* underlyingSource)
-{
-    js::AssertSameCompartment(cx, stream);
-    MOZ_ASSERT(!streamClosedCBCalled, "Invalid test setup");
-    streamClosedCBCalled = true;
-}
-
-static bool streamErroredCBCalled = false;
-static Value streamErroredReason;
-static void
-StreamErroredCB(JSContext* cx, HandleObject stream, void* underlyingSource,
-                HandleValue reason)
-{
-    js::AssertSameCompartment(cx, stream);
-    js::AssertSameCompartment(cx, reason);
-    MOZ_ASSERT(!streamErroredCBCalled, "Invalid test setup");
-    streamErroredCBCalled = true;
-    streamErroredReason = reason;
-}
-
-static bool finalizeStreamCBCalled = false;
-static void* finalizedStreamUnderlyingSource;
-static void
-FinalizeStreamCB(void* underlyingSource)
-{
-    MOZ_ASSERT(!finalizeStreamCBCalled, "Invalid test setup");
-    finalizeStreamCBCalled = true;
-    finalizedStreamUnderlyingSource = underlyingSource;
-}
-
-static void
-ResetCallbacks()
-{
-    dataRequestCBCalled = false;
-    writeIntoRequestBufferCBCalled = false;
-    cancelStreamReason = UndefinedValue();
-    cancelStreamCBCalled = false;
-    streamClosedCBCalled = false;
-    streamErroredCBCalled = false;
-    finalizeStreamCBCalled = false;
-}
 
 static bool
 GetIterResult(JSContext* cx, HandleObject promise, MutableHandleValue value, bool* done)
 {
     RootedObject iterResult(cx, &GetPromiseResult(promise).toObject());
 
     bool found;
     if (!JS_HasProperty(cx, iterResult, "value", &found)) {
         return false;
     }
-    MOZ_ASSERT(found);
+    MOZ_RELEASE_ASSERT(found);
     if (!JS_HasProperty(cx, iterResult, "done", &found)) {
         return false;
     }
-    MOZ_ASSERT(found);
+    MOZ_RELEASE_ASSERT(found);
 
     RootedValue doneVal(cx);
     if (!JS_GetProperty(cx, iterResult, "value", value)) {
         return false;
     }
     if (!JS_GetProperty(cx, iterResult, "done", &doneVal)) {
         return false;
     }
 
     *done = doneVal.toBoolean();
-    MOZ_ASSERT_IF(*done, value.isUndefined());
+    if (*done) {
+        MOZ_RELEASE_ASSERT(value.isUndefined());
+    }
 
     return true;
 }
 
 static JSObject*
 GetReadChunk(JSContext* cx, HandleObject readRequest)
 {
-    MOZ_ASSERT(GetPromiseState(readRequest) == PromiseState::Fulfilled);
+    MOZ_RELEASE_ASSERT(GetPromiseState(readRequest) == PromiseState::Fulfilled);
     RootedValue resultVal(cx, GetPromiseResult(readRequest));
-    MOZ_ASSERT(resultVal.isObject());
+    MOZ_RELEASE_ASSERT(resultVal.isObject());
     RootedObject result(cx, &resultVal.toObject());
     RootedValue chunkVal(cx);
     JS_GetProperty(cx, result, "value", &chunkVal);
     return &chunkVal.toObject();
 }
 
 struct StreamTestFixture : public JSAPITest
 {
@@ -261,19 +251,16 @@ BEGIN_FIXTURE_TEST(StreamTestFixture,
     return true;
 }
 END_FIXTURE_TEST(StreamTestFixture,
                  testReadableStream_ReadableStreamDefaultReaderRead)
 
 BEGIN_FIXTURE_TEST(StreamTestFixture,
                    testReadableStream_ReadableStreamDefaultReaderClose)
 {
-    SetReadableStreamCallbacks(cx, &DataRequestCB, &WriteIntoRequestBufferCB,
-                               &CancelStreamCB, &StreamClosedCB, &StreamErroredCB,
-                               &FinalizeStreamCB);
     RootedObject stream(cx, NewDefaultStream(cx));
     CHECK(stream);
     RootedObject reader(cx, ReadableStreamGetReader(cx, stream, ReadableStreamReaderMode::Default));
     CHECK(reader);
 
     RootedObject request(cx, ReadableStreamDefaultReaderRead(cx, reader));
     CHECK(request);
     CHECK(IsPromiseObject(request));
@@ -284,30 +271,27 @@ BEGIN_FIXTURE_TEST(StreamTestFixture,
     bool done;
     RootedValue value(cx);
     CHECK(GetPromiseState(request) == PromiseState::Fulfilled);
     CHECK(GetIterResult(cx, request, &value, &done));
     CHECK(value.isUndefined());
     CHECK(done);
 
     // The callbacks are only invoked for external streams.
-    CHECK(!streamClosedCBCalled);
+    CHECK(!StubExternalUnderlyingSource::instance.streamClosedCBCalled);
 
     return true;
 }
 END_FIXTURE_TEST(StreamTestFixture,
                  testReadableStream_ReadableStreamDefaultReaderClose)
 
 BEGIN_FIXTURE_TEST(StreamTestFixture,
                    testReadableStream_ReadableStreamDefaultReaderError)
 {
-    ResetCallbacks();
-    SetReadableStreamCallbacks(cx, &DataRequestCB, &WriteIntoRequestBufferCB,
-                               &CancelStreamCB, &StreamClosedCB, &StreamErroredCB,
-                               &FinalizeStreamCB);
+    StubExternalUnderlyingSource::instance.reset();
     RootedObject stream(cx, NewDefaultStream(cx));
     CHECK(stream);
     RootedObject reader(cx, ReadableStreamGetReader(cx, stream, ReadableStreamReaderMode::Default));
     CHECK(reader);
 
     RootedObject request(cx, ReadableStreamDefaultReaderRead(cx, reader));
     CHECK(request);
     CHECK(IsPromiseObject(request));
@@ -323,97 +307,83 @@ BEGIN_FIXTURE_TEST(StreamTestFixture,
     CHECK(ReadableStreamError(cx, stream, error));
 
     CHECK(GetPromiseState(request) == PromiseState::Rejected);
     RootedValue reason(cx, GetPromiseResult(request));
     CHECK(reason.isInt32());
     CHECK(reason.toInt32() == 42);
 
     // The callbacks are only invoked for external streams.
-    CHECK(!streamErroredCBCalled);
+    CHECK(!StubExternalUnderlyingSource::instance.streamErroredCBCalled);
 
     return true;
 }
 END_FIXTURE_TEST(StreamTestFixture,
                  testReadableStream_ReadableStreamDefaultReaderError)
 
 static JSObject*
-NewExternalSourceStream(JSContext* cx, void* underlyingSource,
-                        RequestReadableStreamDataCallback dataRequestCallback,
-                        WriteIntoReadRequestBufferCallback writeIntoReadRequestCallback,
-                        CancelReadableStreamCallback cancelCallback,
-                        ReadableStreamClosedCallback closedCallback,
-                        ReadableStreamErroredCallback erroredCallback,
-                        ReadableStreamFinalizeCallback finalizeCallback)
+NewExternalSourceStream(JSContext* cx, ReadableStreamUnderlyingSource* source)
 {
-    SetReadableStreamCallbacks(cx, dataRequestCallback, writeIntoReadRequestCallback,
-                               cancelCallback, closedCallback, erroredCallback,
-                               finalizeCallback);
-    RootedObject stream(cx, NewReadableExternalSourceStreamObject(cx, underlyingSource));
-    MOZ_ASSERT_IF(stream, IsReadableStream(stream));
+    RootedObject stream(cx, NewReadableExternalSourceStreamObject(cx, source));
+    if (stream) {
+        MOZ_RELEASE_ASSERT(IsReadableStream(stream));
+    }
     return stream;
 }
 
 static JSObject*
 NewExternalSourceStream(JSContext* cx)
 {
-    return NewExternalSourceStream(cx,
-                                   &stubExternalUnderlyingSource,
-                                   &DataRequestCB,
-                                   &WriteIntoRequestBufferCB,
-                                   &CancelStreamCB,
-                                   &StreamClosedCB,
-                                   &StreamErroredCB,
-                                   &FinalizeStreamCB);
+    return NewExternalSourceStream(cx, &StubExternalUnderlyingSource::instance);
 }
 
 BEGIN_FIXTURE_TEST(StreamTestFixture,
                    testReadableStream_CreateReadableByteStreamWithExternalSource)
 {
-    ResetCallbacks();
+    StubExternalUnderlyingSource::instance.reset();
 
     RootedObject stream(cx, NewExternalSourceStream(cx));
     CHECK(stream);
     ReadableStreamMode mode;
     CHECK(ReadableStreamGetMode(cx, stream, &mode));
     CHECK(mode == ReadableStreamMode::ExternalSource);
-    void* underlyingSource;
+    ReadableStreamUnderlyingSource* underlyingSource;
     CHECK(ReadableStreamGetExternalUnderlyingSource(cx, stream, &underlyingSource));
-    CHECK(underlyingSource == &stubExternalUnderlyingSource);
+    CHECK(underlyingSource == &StubExternalUnderlyingSource::instance);
     bool locked;
     CHECK(ReadableStreamIsLocked(cx, stream, &locked));
     CHECK(locked);
     CHECK(ReadableStreamReleaseExternalUnderlyingSource(cx, stream));
 
     return true;
 }
 END_FIXTURE_TEST(StreamTestFixture,
                  testReadableStream_CreateReadableByteStreamWithExternalSource)
 
 BEGIN_FIXTURE_TEST(StreamTestFixture,
                    testReadableStream_ExternalSourceCancel)
 {
-    ResetCallbacks();
+    StubExternalUnderlyingSource::instance.reset();
 
     RootedObject stream(cx, NewExternalSourceStream(cx));
     CHECK(stream);
     RootedValue reason(cx, Int32Value(42));
     CHECK(ReadableStreamCancel(cx, stream, reason));
-    CHECK(cancelStreamCBCalled);
-    CHECK(cancelStreamReason == reason);
+    CHECK(StubExternalUnderlyingSource::instance.cancelStreamCBCalled);
+    CHECK(StubExternalUnderlyingSource::instance.cancelStreamReason == reason);
 
     return true;
 }
 END_FIXTURE_TEST(StreamTestFixture,
                  testReadableStream_ExternalSourceCancel)
 
 BEGIN_FIXTURE_TEST(StreamTestFixture,
                    testReadableStream_ExternalSourceGetReader)
 {
-    ResetCallbacks();
+    StubExternalUnderlyingSource::instance.reset();
 
     RootedObject stream(cx, NewExternalSourceStream(cx));
     CHECK(stream);
 
     RootedValue streamVal(cx, ObjectValue(*stream));
     CHECK(JS_SetProperty(cx, global, "stream", streamVal));
     RootedValue rval(cx);
     EVAL("stream.getReader()", &rval);
@@ -470,51 +440,51 @@ struct ReadFromExternalSourceFixture : p
         return true;
     }
 
     bool readWithoutDataAvailable(CompartmentMode compartmentMode,
                                   const char* evalSrc,
                                   const char* evalSrc2,
                                   uint32_t writtenLength)
     {
-        ResetCallbacks();
+        StubExternalUnderlyingSource::instance.reset();
         definePrint();
 
         // Create the stream.
         RootedObject streamGlobal(cx);
         RootedObject stream(cx);  // can be a wrapper
         CHECK(createExternalSourceStream(compartmentMode, &streamGlobal, &stream));
         js::RunJobs(cx);
 
         // GetExternalUnderlyingSource locks the stream.
-        void* underlyingSource;
+        ReadableStreamUnderlyingSource* underlyingSource;
         CHECK(ReadableStreamGetExternalUnderlyingSource(cx, stream, &underlyingSource));
-        CHECK(underlyingSource == &stubExternalUnderlyingSource);
+        CHECK(underlyingSource == &StubExternalUnderlyingSource::instance);
         bool locked;
         CHECK(ReadableStreamIsLocked(cx, stream, &locked));
         CHECK(locked);
         CHECK(ReadableStreamReleaseExternalUnderlyingSource(cx, stream));
 
         // Run caller-supplied JS code to read from the stream.
         RootedValue streamVal(cx, ObjectValue(*stream));
         CHECK(JS_SetProperty(cx, global, "stream", streamVal));
         RootedValue rval(cx);
         EVAL(evalSrc, &rval);
-        CHECK(dataRequestCBCalled);
-        CHECK(!writeIntoRequestBufferCBCalled);
+        CHECK(StubExternalUnderlyingSource::instance.dataRequestCBCalled);
+        CHECK(!StubExternalUnderlyingSource::instance.writeIntoRequestBufferCBCalled);
         CHECK(rval.isObject());
         RootedObject unwrappedPromise(cx, js::CheckedUnwrap(&rval.toObject()));
         CHECK(unwrappedPromise);
         CHECK(IsPromiseObject(unwrappedPromise));
         CHECK(GetPromiseState(unwrappedPromise) == PromiseState::Pending);
 
         // Stream in some data; this resolves the read() result promise.
         size_t length = sizeof(testBufferData);
         CHECK(ReadableStreamUpdateDataAvailableFromSource(cx, stream, length));
-        CHECK(writeIntoRequestBufferCBCalled);
+        CHECK(StubExternalUnderlyingSource::instance.writeIntoRequestBufferCBCalled);
         CHECK(GetPromiseState(unwrappedPromise) == PromiseState::Fulfilled);
         RootedObject chunk(cx);
         {
             JSAutoRealm ar(cx, unwrappedPromise);
             RootedValue iterVal(cx);
             bool done;
             if (!GetIterResult(cx, unwrappedPromise, &iterVal, &done)) {
                 return false;
@@ -528,56 +498,56 @@ struct ReadFromExternalSourceFixture : p
         {
             JS::AutoCheckCannotGC noGC(cx);
             bool dummy;
             void* buffer = JS_GetArrayBufferViewData(chunk, &dummy, noGC);
             CHECK(!memcmp(buffer, testBufferData, writtenLength));
         }
 
         // Check the callbacks fired by calling read() again.
-        dataRequestCBCalled = false;
-        writeIntoRequestBufferCBCalled = false;
+        StubExternalUnderlyingSource::instance.dataRequestCBCalled = false;
+        StubExternalUnderlyingSource::instance.writeIntoRequestBufferCBCalled = false;
         EVAL(evalSrc2, &rval);
-        CHECK(dataRequestCBCalled);
-        CHECK(!writeIntoRequestBufferCBCalled);
+        CHECK(StubExternalUnderlyingSource::instance.dataRequestCBCalled);
+        CHECK(!StubExternalUnderlyingSource::instance.writeIntoRequestBufferCBCalled);
 
         return true;
     }
 
     bool readWithDataAvailable(CompartmentMode compartmentMode,
                                const char* evalSrc,
                                uint32_t writtenLength)
     {
-        ResetCallbacks();
+        StubExternalUnderlyingSource::instance.reset();
         definePrint();
 
         // Create a stream.
         RootedObject streamGlobal(cx);
         RootedObject stream(cx);
         CHECK(createExternalSourceStream(compartmentMode, &streamGlobal, &stream));
 
         // Getting the underlying source locks the stream.
-        void* underlyingSource;
+        ReadableStreamUnderlyingSource* underlyingSource;
         CHECK(ReadableStreamGetExternalUnderlyingSource(cx, stream, &underlyingSource));
-        CHECK(underlyingSource == &stubExternalUnderlyingSource);
+        CHECK(underlyingSource == &StubExternalUnderlyingSource::instance);
         bool locked;
         CHECK(ReadableStreamIsLocked(cx, stream, &locked));
         CHECK(locked);
         CHECK(ReadableStreamReleaseExternalUnderlyingSource(cx, stream));
 
         // Make some data available.
         size_t length = sizeof(testBufferData);
         CHECK(ReadableStreamUpdateDataAvailableFromSource(cx, stream, length));
 
         // Read from the stream.
         RootedValue streamVal(cx, ObjectValue(*stream));
         CHECK(JS_SetProperty(cx, global, "stream", streamVal));
         RootedValue rval(cx);
         EVAL(evalSrc, &rval);
-        CHECK(writeIntoRequestBufferCBCalled);
+        CHECK(StubExternalUnderlyingSource::instance.writeIntoRequestBufferCBCalled);
         CHECK(rval.isObject());
         RootedObject unwrappedPromise(cx, js::CheckedUnwrap(&rval.toObject()));
         CHECK(unwrappedPromise);
         CHECK(IsPromiseObject(unwrappedPromise));
         CHECK(GetPromiseState(unwrappedPromise) == PromiseState::Fulfilled);
         RootedObject chunk(cx);
         {
             JSAutoRealm ar(cx, unwrappedPromise);
@@ -668,17 +638,17 @@ BEGIN_FIXTURE_TEST(ReadFromExternalSourc
     CHECK(IsPromiseObject(request));
     CHECK(GetPromiseState(request) == PromiseState::Pending);
 
     CHECK(JS_GetProperty(cx, global, "stream", &val));
     RootedObject stream(cx, &val.toObject());
     ReadableStreamClose(cx, stream);
 
     val = GetPromiseResult(request);
-    MOZ_ASSERT(val.isObject());
+    CHECK(val.isObject());
     RootedObject result(cx, &val.toObject());
 
     JS_GetProperty(cx, result, "done", &val);
     CHECK(val.isBoolean());
     CHECK(val.toBoolean() == true);
 
     JS_GetProperty(cx, result, "value", &val);
     CHECK(val.isUndefined());
@@ -765,33 +735,33 @@ BEGIN_FIXTURE_TEST(StreamTestFixture,
     return true;
 }
 END_FIXTURE_TEST(StreamTestFixture,
                  testReadableStream_ReadableStreamOtherGlobalDefaultReaderRead)
 
 BEGIN_FIXTURE_TEST(StreamTestFixture,
                    testReadableStream_ReadableStreamGetExternalUnderlyingSource)
 {
-    ResetCallbacks();
+    StubExternalUnderlyingSource::instance.reset();
 
     RootedObject stream(cx, NewExternalSourceStream(cx));
     CHECK(stream);
-    void* source;
+    ReadableStreamUnderlyingSource* source;
     CHECK(ReadableStreamGetExternalUnderlyingSource(cx, stream, &source));
-    CHECK(source == &stubExternalUnderlyingSource);
+    CHECK(source == &StubExternalUnderlyingSource::instance);
     CHECK(ReadableStreamReleaseExternalUnderlyingSource(cx, stream));
 
     RootedObject otherGlobal(cx, createGlobal());
     CHECK(otherGlobal);
     {
         JSAutoRealm ar(cx, otherGlobal);
         CHECK(JS_WrapObject(cx, &stream));
-        void* source;
+        ReadableStreamUnderlyingSource* source;
         CHECK(ReadableStreamGetExternalUnderlyingSource(cx, stream, &source));
-        CHECK(source == &stubExternalUnderlyingSource);
+        CHECK(source == &StubExternalUnderlyingSource::instance);
         CHECK(ReadableStreamReleaseExternalUnderlyingSource(cx, stream));
     }
 
     return true;
 }
 END_FIXTURE_TEST(StreamTestFixture,
                  testReadableStream_ReadableStreamGetExternalUnderlyingSource)
 
--- a/js/src/vm/Runtime.cpp
+++ b/js/src/vm/Runtime.cpp
@@ -97,22 +97,16 @@ JSRuntime::JSRuntime(JSRuntime* parentRu
     updateChildRuntimeCount(parentRuntime),
     initialized_(false),
 #endif
     mainContext_(nullptr),
     profilerSampleBufferRangeStart_(0),
     telemetryCallback(nullptr),
     consumeStreamCallback(nullptr),
     reportStreamErrorCallback(nullptr),
-    readableStreamDataRequestCallback(nullptr),
-    readableStreamWriteIntoReadRequestCallback(nullptr),
-    readableStreamCancelCallback(nullptr),
-    readableStreamClosedCallback(nullptr),
-    readableStreamErroredCallback(nullptr),
-    readableStreamFinalizeCallback(nullptr),
     hadOutOfMemory(false),
     allowRelazificationForTesting(false),
     destroyCompartmentCallback(nullptr),
     sizeOfIncludingThisCompartmentCallback(nullptr),
     destroyRealmCallback(nullptr),
     realmNameCallback(nullptr),
     externalStringSizeofCallback(nullptr),
     securityCallbacks(&NullSecurityCallbacks),
--- a/js/src/vm/Runtime.h
+++ b/js/src/vm/Runtime.h
@@ -344,23 +344,16 @@ struct JSRuntime : public js::MallocProv
     js::UnprotectedData<JS::ReportStreamErrorCallback> reportStreamErrorCallback;
 
     js::GlobalObject* getIncumbentGlobal(JSContext* cx);
     bool enqueuePromiseJob(JSContext* cx, js::HandleFunction job, js::HandleObject promise,
                            js::Handle<js::GlobalObject*> incumbentGlobal);
     void addUnhandledRejectedPromise(JSContext* cx, js::HandleObject promise);
     void removeUnhandledRejectedPromise(JSContext* cx, js::HandleObject promise);
 
-    js::UnprotectedData<JS::RequestReadableStreamDataCallback> readableStreamDataRequestCallback;
-    js::UnprotectedData<JS::WriteIntoReadRequestBufferCallback> readableStreamWriteIntoReadRequestCallback;
-    js::UnprotectedData<JS::CancelReadableStreamCallback> readableStreamCancelCallback;
-    js::UnprotectedData<JS::ReadableStreamClosedCallback> readableStreamClosedCallback;
-    js::UnprotectedData<JS::ReadableStreamErroredCallback> readableStreamErroredCallback;
-    js::UnprotectedData<JS::ReadableStreamFinalizeCallback> readableStreamFinalizeCallback;
-
     /* Had an out-of-memory error which did not populate an exception. */
     mozilla::Atomic<bool, mozilla::SequentiallyConsistent,
                     mozilla::recordreplay::Behavior::DontPreserve> hadOutOfMemory;
 
     /*
      * Allow relazifying functions in compartments that are active. This is
      * only used by the relazifyFunctions() testing function.
      */