Bug 1347644 - Baldr: shell WebAssembly.compileStreaming and instantiateStreaming (r=till)
authorLuke Wagner <luke@mozilla.com>
Tue, 10 Oct 2017 14:17:50 -0500
changeset 385375 b9053d53c1cafbf876ad84aa0742bb1178514f53
parent 385374 208cf9b36e87238ae9694a74d7ea8b3baec57796
child 385376 2db8ced6f6cc86bf9a21427ee1afac0cc52b1742
push id96001
push userlwagner@mozilla.com
push dateTue, 10 Oct 2017 20:05:10 +0000
treeherdermozilla-inbound@2db8ced6f6cc [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerstill
bugs1347644
milestone58.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1347644 - Baldr: shell WebAssembly.compileStreaming and instantiateStreaming (r=till) MozReview-Commit-ID: 9b6RH95tqUI
js/src/jit-test/tests/wasm/streaming.js
js/src/js.msg
js/src/jsapi.cpp
js/src/jsapi.h
js/src/shell/js.cpp
js/src/vm/MutexIDs.h
js/src/vm/Runtime.cpp
js/src/vm/Runtime.h
js/src/vm/TypedArrayObject.cpp
js/src/vm/TypedArrayObject.h
js/src/wasm/WasmJS.cpp
new file mode 100644
--- /dev/null
+++ b/js/src/jit-test/tests/wasm/streaming.js
@@ -0,0 +1,82 @@
+function testInstantiate(source, importObj, exportName, expectedValue) {
+    var result;
+    WebAssembly.instantiateStreaming(code, importObj).then(r => { result = r });
+    drainJobQueue();
+    assertEq(result !== undefined, true);
+    assertEq(result.module instanceof WebAssembly.Module, true);
+    assertEq(result.instance instanceof WebAssembly.Instance, true);
+    assertEq(result.instance.exports[exportName](), expectedValue);
+}
+function testBoth(source, exportName, expectedValue) {
+    var module;
+    WebAssembly.compileStreaming(code).then(m => { module = m });
+    drainJobQueue();
+    assertEq(module !== undefined, true);
+    assertEq(module instanceof WebAssembly.Module, true);
+    assertEq(new WebAssembly.Instance(module).exports[exportName](), expectedValue);
+
+    testInstantiate(source, undefined, exportName, expectedValue);
+}
+
+function testFailInstantiate(source, importObj, error) {
+    var caught = false;
+    WebAssembly.instantiateStreaming(source).catch(err => {
+        assertEq(err instanceof error, true);
+        caught = true;
+    });
+    drainJobQueue();
+    assertEq(caught, true);
+}
+function testFailBoth(source, error) {
+    var caught = false;
+    WebAssembly.compileStreaming(source).catch(err => {
+        assertEq(err instanceof error, true);
+        caught = true;
+    });
+    drainJobQueue();
+    assertEq(caught, true);
+
+    testFailInstantiate(source, undefined, error);
+}
+
+var code = wasmTextToBinary('(module (func (export "run") (result i32) i32.const 42))');
+testBoth(code, 'run', 42);
+testFailBoth(42, TypeError);
+testBoth(Promise.resolve(code), 'run', 42);
+testFailBoth(Promise.resolve(42), TypeError);
+testFailBoth(Promise.reject(new String("fail")), String);
+testBoth({then(resolve) { resolve(code) }}, 'run', 42);
+testFailBoth({then(resolve) { resolve(42) }}, TypeError);
+testFailBoth(new Promise((resolve, reject) => { reject(new Error("hi")) }), Error);
+testFailBoth(new Promise((resolve, reject) => { reject(new String("hi")) }), String);
+
+var code = wasmTextToBinary('(module (func $im (import "js" "foo") (result i32)) (func (export "run") (result i32) (i32.add (i32.const 1) (call $im))))');
+testInstantiate(code, {js:{foo() { return 42 }}}, 'run', 43);
+testFailInstantiate(code, null, TypeError);
+testFailInstantiate(code, {js:42}, TypeError);
+testFailInstantiate(code, {js:{foo:42}}, TypeError);
+
+var text = `(module\n`;
+text += ` (func (result i32) i32.const 0)\n`;
+for (var i = 1; i <= 100; i++)
+    text += ` (func (result i32) (i32.add (i32.const ${i}) (call ${i-1})))\n`;
+text += ` (func (export "run") (result i32) call 100)\n`;
+text += `)`;
+var code = wasmTextToBinary(text);
+
+assertEq(code.length > 1000, true);
+for ([delayMillis, chunkSize] of [[0, 10], [1, 10], [0, 100], [1, 100], [0, 1000], [1, 1000], [10, 1000]]) {
+    setBufferStreamParams(delayMillis, chunkSize);
+    testBoth(code, 'run', 5050);
+}
+
+setBufferStreamParams(1, 100);
+var arr = [];
+for (var i = 0; i < 10; i++)
+    arr.push(WebAssembly.instantiateStreaming(code));
+var results;
+Promise.all(arr).then(r => results = r);
+drainJobQueue();
+assertEq(results.length === 10, true);
+for (var i = 0; i < 10; i++)
+    assertEq(results[i].instance.exports.run(), 5050);
--- a/js/src/js.msg
+++ b/js/src/js.msg
@@ -387,16 +387,17 @@ MSG_DEF(JSMSG_WASM_BAD_MOD_ARG,        0
 MSG_DEF(JSMSG_WASM_BAD_BUF_MOD_ARG,    0, JSEXN_TYPEERR,     "first argument must be a WebAssembly.Module, ArrayBuffer or typed array object")
 MSG_DEF(JSMSG_WASM_BAD_DESC_ARG,       1, JSEXN_TYPEERR,     "first argument must be a {0} descriptor")
 MSG_DEF(JSMSG_WASM_BAD_ELEMENT,        0, JSEXN_TYPEERR,     "\"element\" property of table descriptor must be \"anyfunc\"")
 MSG_DEF(JSMSG_WASM_BAD_IMPORT_ARG,     0, JSEXN_TYPEERR,     "second argument must be an object")
 MSG_DEF(JSMSG_WASM_BAD_IMPORT_FIELD,   1, JSEXN_TYPEERR,     "import object field '{0}' is not an Object")
 MSG_DEF(JSMSG_WASM_BAD_TABLE_VALUE,    0, JSEXN_TYPEERR,     "can only assign WebAssembly exported functions to Table")
 MSG_DEF(JSMSG_WASM_BAD_I64,            0, JSEXN_TYPEERR,     "cannot pass i64 to or from JS")
 MSG_DEF(JSMSG_WASM_NO_TRANSFER,        0, JSEXN_TYPEERR,     "cannot transfer WebAssembly/asm.js ArrayBuffer")
+MSG_DEF(JSMSG_WASM_STREAM_ERROR,       0, JSEXN_TYPEERR,     "stream error during WebAssembly compilation")
 MSG_DEF(JSMSG_WASM_TEXT_FAIL,          1, JSEXN_SYNTAXERR,   "wasm text error: {0}")
 
 // Proxy
 MSG_DEF(JSMSG_BAD_TRAP_RETURN_VALUE,   2, JSEXN_TYPEERR,"trap {1} for {0} returned a primitive value")
 MSG_DEF(JSMSG_BAD_GETPROTOTYPEOF_TRAP_RETURN,0,JSEXN_TYPEERR,"proxy getPrototypeOf handler returned a non-object, non-null value")
 MSG_DEF(JSMSG_INCONSISTENT_GETPROTOTYPEOF_TRAP,0,JSEXN_TYPEERR,"proxy getPrototypeOf handler didn't return the target object's prototype")
 MSG_DEF(JSMSG_PROXY_SETPROTOTYPEOF_RETURNED_FALSE, 0, JSEXN_TYPEERR, "proxy setPrototypeOf handler returned false")
 MSG_DEF(JSMSG_PROXY_ISEXTENSIBLE_RETURNED_FALSE,0,JSEXN_TYPEERR,"proxy isExtensible handler must return the same extensibility as target")
@@ -627,8 +628,11 @@ MSG_DEF(JSMSG_READABLEBYTESTREAMCONTROLL
 MSG_DEF(JSMSG_READABLEBYTESTREAMCONTROLLER_BAD_CHUNK,    1, JSEXN_TYPEERR, "{0} passed a bad chunk.")
 MSG_DEF(JSMSG_READABLEBYTESTREAMCONTROLLER_CLOSE_PENDING_PULL, 0, JSEXN_TYPEERR, "The ReadableByteStreamController cannot be closed while the buffer is being filled.")
 MSG_DEF(JSMSG_READABLESTREAMBYOBREQUEST_NO_CONTROLLER,   1, JSEXN_TYPEERR, "ReadableStreamBYOBRequest method '{0}' called on a request with no controller.")
 MSG_DEF(JSMSG_READABLESTREAMBYOBREQUEST_RESPOND_CLOSED,  0, JSEXN_TYPEERR, "ReadableStreamBYOBRequest method 'respond' called with non-zero number of bytes with a closed controller.")
 MSG_DEF(JSMSG_READABLESTREAM_METHOD_NOT_IMPLEMENTED,     1, JSEXN_TYPEERR, "ReadableStream method {0} not yet implemented")
 
 // Other Stream-related
 MSG_DEF(JSMSG_STREAM_INVALID_HIGHWATERMARK,             0, JSEXN_RANGEERR, "'highWaterMark' must be a non-negative, non-NaN number.")
+
+// Response-related
+MSG_DEF(JSMSG_BAD_RESPONSE_VALUE,                        0, JSEXN_TYPEERR,  "expected Response or Promise resolving to Response")
--- a/js/src/jsapi.cpp
+++ b/js/src/jsapi.cpp
@@ -5699,16 +5699,22 @@ JS::InitDispatchToEventLoop(JSContext* c
 
 JS_PUBLIC_API(void)
 JS::ShutdownAsyncTasks(JSContext* cx)
 {
     cx->runtime()->offThreadPromiseState.ref().shutdown(cx);
 }
 
 JS_PUBLIC_API(void)
+JS::InitConsumeStreamCallback(JSContext* cx, ConsumeStreamCallback callback)
+{
+    cx->runtime()->consumeStreamCallback = callback;
+}
+
+JS_PUBLIC_API(void)
 JS_RequestInterruptCallback(JSContext* cx)
 {
     cx->requestInterrupt(JSContext::RequestInterruptUrgent);
 }
 
 JS_PUBLIC_API(void)
 JS_RequestInterruptCallbackCanWait(JSContext* cx)
 {
--- a/js/src/jsapi.h
+++ b/js/src/jsapi.h
@@ -4928,16 +4928,58 @@ class JS_PUBLIC_API(Dispatchable)
 
 typedef bool
 (*DispatchToEventLoopCallback)(void* closure, Dispatchable* dispatchable);
 
 extern JS_PUBLIC_API(void)
 InitDispatchToEventLoop(JSContext* cx, DispatchToEventLoopCallback callback, void* closure);
 
 /**
+ * The ConsumeStreamCallback is called from an active JSContext, passing a
+ * StreamConsumer that wishes to consume the given host object as a stream of
+ * bytes with the given MIME type. On failure, the embedding must report the
+ * appropriate error on 'cx'. On success, the embedding must call
+ * consumer->consumeChunk() repeatedly on any thread until exactly one of:
+ *  - consumeChunk() returns false
+ *  - the embedding calls consumer->streamClosed()
+ * before JS_DestroyContext(cx) or JS::ShutdownAsyncTasks(cx) is called.
+ *
+ * Note: consumeChunk() and streamClosed() may be called synchronously by
+ * ConsumeStreamCallback.
+ */
+
+class JS_PUBLIC_API(StreamConsumer)
+{
+  protected:
+    // AsyncStreamConsumers are created and destroyed by SpiderMonkey.
+    StreamConsumer() = default;
+    virtual ~StreamConsumer() = default;
+
+  public:
+    // Called by the embedding as each chunk of bytes becomes available.
+    // If this function returns 'false', the stream must drop all pointers to
+    // this StreamConsumer.
+    virtual bool consumeChunk(const uint8_t* begin, size_t length) = 0;
+
+    // Called by the embedding when the stream is closed according to the
+    // contract described above.
+    enum CloseReason { EndOfFile, Error };
+    virtual void streamClosed(CloseReason reason) = 0;
+};
+
+enum class MimeType { Wasm };
+
+typedef bool
+(*ConsumeStreamCallback)(JSContext* cx, JS::HandleObject obj, MimeType mimeType,
+                         StreamConsumer* consumer);
+
+extern JS_PUBLIC_API(void)
+InitConsumeStreamCallback(JSContext* cx, ConsumeStreamCallback callback);
+
+/**
  * When a JSRuntime is destroyed it implicitly cancels all async tasks in
  * progress, releasing any roots held by the task. However, this is not soon
  * enough for cycle collection, which needs to have roots dropped earlier so
  * that the cycle collector can transitively remove roots for a future GC. For
  * these and other cases, the set of pending async tasks can be canceled
  * with this call earlier than JSRuntime destruction.
  */
 
--- a/js/src/shell/js.cpp
+++ b/js/src/shell/js.cpp
@@ -13,16 +13,17 @@
 #include "mozilla/GuardObjects.h"
 #include "mozilla/IntegerPrintfMacros.h"
 #include "mozilla/mozalloc.h"
 #include "mozilla/PodOperations.h"
 #include "mozilla/ScopeExit.h"
 #include "mozilla/Sprintf.h"
 #include "mozilla/TimeStamp.h"
 
+#include <chrono>
 #ifdef XP_WIN
 # include <direct.h>
 # include <process.h>
 #endif
 #include <errno.h>
 #include <fcntl.h>
 #if defined(XP_WIN)
 # include <io.h>     /* for isatty() */
@@ -33,16 +34,17 @@
 #endif
 #include <math.h>
 #include <signal.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <sys/stat.h>
 #include <sys/types.h>
+#include <thread>
 #ifdef XP_UNIX
 # include <sys/mman.h>
 # include <sys/stat.h>
 # include <sys/wait.h>
 # include <unistd.h>
 #endif
 
 #include "jsapi.h"
@@ -5562,16 +5564,159 @@ SetSharedArrayBuffer(JSContext* cx, unsi
             oldBuffer->dropReference();
         sharedArrayBufferMailbox = newBuffer;
     }
 
     args.rval().setUndefined();
     return true;
 }
 
+struct BufferStreamJob
+{
+    Vector<uint8_t, 0, SystemAllocPolicy> bytes;
+    Thread thread;
+    JS::StreamConsumer* consumer;
+
+    explicit BufferStreamJob(JS::StreamConsumer* consumer)
+      : consumer(consumer)
+    {}
+};
+
+struct BufferStreamState
+{
+    Vector<UniquePtr<BufferStreamJob>, 0, SystemAllocPolicy> jobs;
+    ConditionVariable jobsEmpty;
+    size_t delayMillis;
+    size_t chunkSize;
+    bool shutdown;
+
+    BufferStreamState()
+      : delayMillis(1),
+        chunkSize(10),
+        shutdown(false)
+    {}
+
+    ~BufferStreamState()
+    {
+        MOZ_ASSERT(shutdown);
+        MOZ_ASSERT(jobs.empty());
+    }
+};
+
+ExclusiveData<BufferStreamState> bufferStreamState(mutexid::BufferStreamState);
+
+static void
+BufferStreamMain(BufferStreamJob* job)
+{
+    const uint8_t* const bytes = job->bytes.begin();
+
+    size_t byteOffset = 0;
+    while (true) {
+        if (byteOffset == job->bytes.length()) {
+            job->consumer->streamClosed(JS::StreamConsumer::EndOfFile);
+            break;
+        }
+
+        bool shutdown;
+        size_t delayMillis;
+        size_t chunkSize;
+        {
+            auto state = bufferStreamState.lock();
+            shutdown = state->shutdown;
+            delayMillis = state->delayMillis;
+            chunkSize = state->chunkSize;
+        }
+
+        if (shutdown) {
+            job->consumer->streamClosed(JS::StreamConsumer::Error);
+            break;
+        }
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(delayMillis));
+
+        chunkSize = Min(chunkSize, job->bytes.length() - byteOffset);
+
+        if (!job->consumer->consumeChunk(bytes + byteOffset, chunkSize))
+            break;
+
+        byteOffset += chunkSize;
+    }
+
+    auto state = bufferStreamState.lock();
+    size_t jobIndex = 0;
+    while (state->jobs[jobIndex].get() != job)
+        jobIndex++;
+    job->thread.detach();  // quiet assert in ~Thread() called by erase().
+    state->jobs.erase(state->jobs.begin() + jobIndex);
+    if (state->jobs.empty())
+        state->jobsEmpty.notify_all();
+}
+
+static bool
+ConsumeBufferSource(JSContext* cx, JS::HandleObject obj, JS::MimeType, JS::StreamConsumer* consumer)
+{
+    SharedMem<uint8_t*> dataPointer;
+    size_t byteLength;
+    if (!IsBufferSource(obj, &dataPointer, &byteLength)) {
+        JS_ReportErrorASCII(cx, "shell streaming consumes a buffer source (buffer or view)");
+        return false;
+    }
+
+    auto job = cx->make_unique<BufferStreamJob>(consumer);
+    if (!job || !job->bytes.resize(byteLength))
+        return false;
+
+    memcpy(job->bytes.begin(), dataPointer.unwrap(), byteLength);
+
+    BufferStreamJob* jobPtr = job.get();
+
+    {
+        auto state = bufferStreamState.lock();
+        MOZ_ASSERT(!state->shutdown);
+        if (!state->jobs.append(Move(job)))
+            return false;
+    }
+
+    return jobPtr->thread.init(BufferStreamMain, jobPtr);
+}
+
+static void
+ShutdownBufferStreams()
+{
+    auto state = bufferStreamState.lock();
+    state->shutdown = true;
+    while (!state->jobs.empty())
+        state.wait(state->jobsEmpty);
+}
+
+static bool
+SetBufferStreamParams(JSContext* cx, unsigned argc, Value* vp)
+{
+    CallArgs args = CallArgsFromVp(argc, vp);
+    if (!args.requireAtLeast(cx, "setBufferStreamParams", 2))
+        return false;
+
+    double delayMillis;
+    if (!ToNumber(cx, args[0], &delayMillis))
+        return false;
+
+    double chunkSize;
+    if (!ToNumber(cx, args[1], &chunkSize))
+        return false;
+
+    {
+        auto state = bufferStreamState.lock();
+        state->delayMillis = delayMillis;
+        state->chunkSize = chunkSize;
+    }
+
+    args.rval().setUndefined();
+    return true;
+}
+
 class SprintOptimizationTypeInfoOp : public JS::ForEachTrackedOptimizationTypeInfoOp
 {
     Sprinter* sp;
     bool startedTypes_;
     bool hadError_;
 
   public:
     explicit SprintOptimizationTypeInfoOp(Sprinter* sp)
@@ -6865,16 +7010,21 @@ TestAssertRecoveredOnBailout,
 "  On non-ARM, no-op. On ARM, set the hardware capabilities. The list of \n"
 "  flags is available by calling this function with \"help\" as the flag's name"),
 
     JS_FN_HELP("wasmLoop", WasmLoop, 2, 0,
 "wasmLoop(filename, imports)",
 "  Performs an AFL-style persistent loop reading data from the given file and passing it\n"
 "  to the 'wasmEval' function together with the specified imports object."),
 
+    JS_FN_HELP("setBufferStreamParams", SetBufferStreamParams, 2, 0,
+"setBufferStreamParams(delayMillis, chunkByteSize)",
+"  Set the delay time (between calls to StreamConsumer::consumeChunk) and chunk\n"
+"  size (in bytes)."),
+
     JS_FS_HELP_END
 };
 
 static const JSFunctionSpecWithHelp console_functions[] = {
     JS_FN_HELP("log", Print, 0, 0,
 "log([exp ...])",
 "  Evaluate and print expressions to stdout.\n"
 "  This function is an alias of the print() function."),
@@ -8696,16 +8846,17 @@ main(int argc, char** argv, char** envp)
 
     JS_SetTrustedPrincipals(cx, &ShellPrincipals::fullyTrusted);
     JS_SetSecurityCallbacks(cx, &ShellPrincipals::securityCallbacks);
     JS_InitDestroyPrincipalsCallback(cx, ShellPrincipals::destroy);
 
     JS_AddInterruptCallback(cx, ShellInterruptCallback);
     JS::SetBuildIdOp(cx, ShellBuildId);
     JS::SetAsmJSCacheOps(cx, &asmJSCacheOps);
+    JS::InitConsumeStreamCallback(cx, ConsumeBufferSource);
 
     JS_SetNativeStackQuota(cx, gMaxStackSize);
 
     JS::dbg::SetDebuggerMallocSizeOf(cx, moz_malloc_size_of);
 
     js::UseInternalJobQueues(cx);
 
     if (!JS::InitSelfHostedCode(cx))
@@ -8747,14 +8898,15 @@ main(int argc, char** argv, char** envp)
 
     // Must clear out some of sc's pointer containers before JS_DestroyContext.
     sc->markObservers.reset();
 
     KillWatchdog(cx);
 
     KillWorkerThreads(cx);
 
+    ShutdownBufferStreams();
     DestructSharedArrayBufferMailbox();
 
     JS_DestroyContext(cx);
     JS_ShutDown();
     return result;
 }
--- a/js/src/vm/MutexIDs.h
+++ b/js/src/vm/MutexIDs.h
@@ -36,18 +36,19 @@
   _(Arm64SimulatorLock,          500) \
   _(IonSpewer,                   500) \
   _(PerfSpewer,                  500) \
   _(CacheIRSpewer,               500) \
   _(TraceLoggerThreadState,      500) \
   _(DateTimeInfoMutex,           500) \
   _(IcuTimeZoneStateMutex,       500) \
   _(ProcessExecutableRegion,     500) \
+  _(OffThreadPromiseState,       500) \
+  _(BufferStreamState,           500) \
   _(WasmCodeProfilingLabels,     500) \
-  _(OffThreadPromiseState,       500) \
   _(WasmModuleTieringLock,       500) \
   _(WasmCompileTaskState,        500) \
                                       \
   _(TraceLoggerGraphState,       600) \
   _(VTuneLock,                   600)
 
 namespace js {
 namespace mutexid {
--- a/js/src/vm/Runtime.cpp
+++ b/js/src/vm/Runtime.cpp
@@ -100,16 +100,17 @@ JSRuntime::JSRuntime(JSRuntime* parentRu
     activeContextChangeProhibited_(0),
     singleThreadedExecutionRequired_(0),
     startingSingleThreadedExecution_(false),
     beginSingleThreadedExecutionCallback(nullptr),
     endSingleThreadedExecutionCallback(nullptr),
     profilerSampleBufferGen_(0),
     profilerSampleBufferLapCount_(1),
     telemetryCallback(nullptr),
+    consumeStreamCallback(nullptr),
     readableStreamDataRequestCallback(nullptr),
     readableStreamWriteIntoReadRequestCallback(nullptr),
     readableStreamCancelCallback(nullptr),
     readableStreamClosedCallback(nullptr),
     readableStreamErroredCallback(nullptr),
     readableStreamFinalizeCallback(nullptr),
     hadOutOfMemory(false),
     allowRelazificationForTesting(false),
--- a/js/src/vm/Runtime.h
+++ b/js/src/vm/Runtime.h
@@ -468,16 +468,17 @@ struct JSRuntime : public js::MallocProv
     // absence of usage of a feature on a specific web page and document which
     // the passed JSObject belongs to.
     void setUseCounter(JSObject* obj, JSUseCounter counter);
 
     void setUseCounterCallback(JSRuntime* rt, JSSetUseCounterCallback callback);
 
   public:
     js::UnprotectedData<js::OffThreadPromiseRuntimeState> offThreadPromiseState;
+    js::UnprotectedData<JS::ConsumeStreamCallback> consumeStreamCallback;
 
     JSObject* getIncumbentGlobal(JSContext* cx);
     bool enqueuePromiseJob(JSContext* cx, js::HandleFunction job, js::HandleObject promise,
                            js::HandleObject incumbentGlobal);
     void addUnhandledRejectedPromise(JSContext* cx, js::HandleObject promise);
     void removeUnhandledRejectedPromise(JSContext* cx, js::HandleObject promise);
 
     js::UnprotectedData<JS::RequestReadableStreamDataCallback> readableStreamDataRequestCallback;
--- a/js/src/vm/TypedArrayObject.cpp
+++ b/js/src/vm/TypedArrayObject.cpp
@@ -2134,16 +2134,49 @@ js::IsTypedArrayConstructor(HandleValue 
       case Scalar::Uint8Clamped:
         return IsNativeFunction(v, Uint8ClampedArray::class_constructor);
       case Scalar::MaxTypedArrayViewType:
         break;
     }
     MOZ_CRASH("unexpected typed array type");
 }
 
+bool
+js::IsBufferSource(JSObject* object, SharedMem<uint8_t*>* dataPointer, size_t* byteLength)
+{
+    if (object->is<TypedArrayObject>()) {
+        TypedArrayObject& view = object->as<TypedArrayObject>();
+        *dataPointer = view.viewDataEither().cast<uint8_t*>();
+        *byteLength = view.byteLength();
+        return true;
+    }
+
+    if (object->is<DataViewObject>()) {
+        DataViewObject& view = object->as<DataViewObject>();
+        *dataPointer = view.dataPointerEither().cast<uint8_t*>();
+        *byteLength = view.byteLength();
+    }
+
+    if (object->is<ArrayBufferObject>()) {
+        ArrayBufferObject& buffer = object->as<ArrayBufferObject>();
+        *dataPointer = buffer.dataPointerShared();
+        *byteLength = buffer.byteLength();
+        return true;
+    }
+
+    if (object->is<SharedArrayBufferObject>()) {
+        SharedArrayBufferObject& buffer = object->as<SharedArrayBufferObject>();
+        *dataPointer = buffer.dataPointerShared();
+        *byteLength = buffer.byteLength();
+        return true;
+    }
+
+    return false;
+}
+
 template <typename CharT>
 bool
 js::StringIsTypedArrayIndex(const CharT* s, size_t length, uint64_t* indexp)
 {
     const CharT* end = s + length;
 
     if (s == end)
         return false;
--- a/js/src/vm/TypedArrayObject.h
+++ b/js/src/vm/TypedArrayObject.h
@@ -321,16 +321,21 @@ IsTypedArrayClass(const Class* clasp)
 {
     return &TypedArrayObject::classes[0] <= clasp &&
            clasp < &TypedArrayObject::classes[Scalar::MaxTypedArrayViewType];
 }
 
 bool
 IsTypedArrayConstructor(HandleValue v, uint32_t type);
 
+// In WebIDL terminology, a BufferSource is either an ArrayBuffer or a typed
+// array view. In either case, extract the dataPointer/byteLength.
+bool
+IsBufferSource(JSObject* object, SharedMem<uint8_t*>* dataPointer, size_t* byteLength);
+
 inline Scalar::Type
 TypedArrayObject::type() const
 {
     MOZ_ASSERT(IsTypedArrayClass(getClass()));
     return static_cast<Scalar::Type>(getClass() - &classes[0]);
 }
 
 inline size_t
--- a/js/src/wasm/WasmJS.cpp
+++ b/js/src/wasm/WasmJS.cpp
@@ -827,32 +827,24 @@ static bool
 GetBufferSource(JSContext* cx, JSObject* obj, unsigned errorNumber, MutableBytes* bytecode)
 {
     *bytecode = cx->new_<ShareableBytes>();
     if (!*bytecode)
         return false;
 
     JSObject* unwrapped = CheckedUnwrap(obj);
 
-    size_t byteLength = 0;
-    uint8_t* ptr = nullptr;
-    if (unwrapped && unwrapped->is<TypedArrayObject>()) {
-        TypedArrayObject& view = unwrapped->as<TypedArrayObject>();
-        byteLength = view.byteLength();
-        ptr = (uint8_t*)view.viewDataEither().unwrap();
-    } else if (unwrapped && unwrapped->is<ArrayBufferObject>()) {
-        ArrayBufferObject& buffer = unwrapped->as<ArrayBufferObject>();
-        byteLength = buffer.byteLength();
-        ptr = buffer.dataPointer();
-    } else {
+    SharedMem<uint8_t*> dataPointer;
+    size_t byteLength;
+    if (!unwrapped || !IsBufferSource(unwrapped, &dataPointer, &byteLength)) {
         JS_ReportErrorNumberASCII(cx, GetErrorMessage, nullptr, errorNumber);
         return false;
     }
 
-    if (!(*bytecode)->append(ptr, byteLength)) {
+    if (!(*bytecode)->append(dataPointer.unwrap(), byteLength)) {
         ReportOutOfMemory(cx);
         return false;
     }
 
     return true;
 }
 
 static SharedCompileArgs
@@ -2125,24 +2117,306 @@ WebAssembly_validate(JSContext* cx, unsi
         ReportOutOfMemory(cx);
         return false;
     }
 
     callArgs.rval().setBoolean(validated);
     return true;
 }
 
+static bool
+EnsureStreamSupport(JSContext* cx)
+{
+    if (!EnsurePromiseSupport(cx))
+        return false;
+
+    if (!cx->runtime()->consumeStreamCallback) {
+        JS_ReportErrorASCII(cx, "WebAssembly.compileStreaming not supported in this runtime.");
+        return false;
+    }
+
+    return true;
+}
+
+static bool
+RejectWithErrorNumber(JSContext* cx, uint32_t errorNumber, Handle<PromiseObject*> promise)
+{
+    JS_ReportErrorNumberASCII(cx, GetErrorMessage, nullptr, errorNumber);
+    return RejectWithPendingException(cx, promise);
+}
+
+struct CompileStreamTask : OffThreadPromiseTask, JS::StreamConsumer
+{
+    SharedCompileArgs      compileArgs;
+    MutableBytes           streamBuffer;
+    UniqueChars            compileError;
+    Maybe<uint32_t>        streamError;
+    SharedModule           module;
+    bool                   instantiate;
+    PersistentRootedObject importObj;
+
+    CompileStreamTask(JSContext* cx, Handle<PromiseObject*> promise,
+                      const CompileArgs& compileArgs, bool instantiate,
+                      HandleObject importObj)
+      : OffThreadPromiseTask(cx, promise),
+        compileArgs(&compileArgs),
+        instantiate(instantiate),
+        importObj(cx, importObj)
+    {
+        MOZ_ASSERT_IF(importObj, instantiate);
+    }
+
+    bool init(JSContext* cx) {
+        streamBuffer = cx->new_<ShareableBytes>();
+        if (!streamBuffer)
+            return false;
+        return OffThreadPromiseTask::init(cx);
+    }
+
+    bool consumeChunk(const uint8_t* begin, size_t length) override {
+        if (!streamBuffer->append(begin, length)) {
+            streamError = Some(JSMSG_OUT_OF_MEMORY);
+            dispatchResolveAndDestroy();
+            return false;
+        }
+        return true;
+    }
+
+    void streamClosed(JS::StreamConsumer::CloseReason closeReason) override {
+        switch (closeReason) {
+          case JS::StreamConsumer::EndOfFile:
+            module = CompileInitialTier(*streamBuffer, *compileArgs, &compileError);
+            break;
+          case JS::StreamConsumer::Error:
+            streamError = Some(JSMSG_WASM_STREAM_ERROR);
+            break;
+        }
+        dispatchResolveAndDestroy();
+    }
+
+    bool resolve(JSContext* cx, Handle<PromiseObject*> promise) override {
+        MOZ_ASSERT_IF(module, !streamError && !compileError);
+        MOZ_ASSERT_IF(!module, !!streamError ^ !!compileError);
+        return module
+               ? Resolve(cx, *module, *compileArgs, promise, instantiate, importObj)
+               : streamError
+                 ? RejectWithErrorNumber(cx, *streamError, promise)
+                 : Reject(cx, *compileArgs, Move(compileError), promise);
+    }
+};
+
+// A short-lived object that captures the arguments of a
+// WebAssembly.{compileStreaming,instantiateStreaming} while waiting for
+// the Promise<Response> to resolve to a (hopefully) Promise.
+class ResolveResponseClosure : public NativeObject
+{
+    static const unsigned COMPILE_ARGS_SLOT = 0;
+    static const unsigned PROMISE_OBJ_SLOT = 1;
+    static const unsigned INSTANTIATE_SLOT = 2;
+    static const unsigned IMPORT_OBJ_SLOT = 3;
+    static const ClassOps classOps_;
+
+    static void finalize(FreeOp* fop, JSObject* obj) {
+        obj->as<ResolveResponseClosure>().compileArgs().Release();
+    }
+
+  public:
+    static const unsigned RESERVED_SLOTS = 4;
+    static const Class class_;
+
+    static ResolveResponseClosure* create(JSContext* cx, const CompileArgs& args,
+                                          HandleObject promise, bool instantiate,
+                                          HandleObject importObj)
+    {
+        MOZ_ASSERT_IF(importObj, instantiate);
+
+        AutoSetNewObjectMetadata metadata(cx);
+        auto* obj = NewObjectWithGivenProto<ResolveResponseClosure>(cx, nullptr);
+        if (!obj)
+            return nullptr;
+
+        args.AddRef();
+        obj->setReservedSlot(COMPILE_ARGS_SLOT, PrivateValue(const_cast<CompileArgs*>(&args)));
+        obj->setReservedSlot(PROMISE_OBJ_SLOT, ObjectValue(*promise));
+        obj->setReservedSlot(INSTANTIATE_SLOT, BooleanValue(instantiate));
+        obj->setReservedSlot(IMPORT_OBJ_SLOT, ObjectOrNullValue(importObj));
+        return obj;
+    }
+
+    const CompileArgs& compileArgs() const {
+        return *(const CompileArgs*)getReservedSlot(COMPILE_ARGS_SLOT).toPrivate();
+    }
+    PromiseObject& promise() const {
+        return getReservedSlot(PROMISE_OBJ_SLOT).toObject().as<PromiseObject>();
+    }
+    bool instantiate() const {
+        return getReservedSlot(INSTANTIATE_SLOT).toBoolean();
+    }
+    JSObject* importObj() const {
+        return getReservedSlot(IMPORT_OBJ_SLOT).toObjectOrNull();
+    }
+};
+
+const ClassOps ResolveResponseClosure::classOps_ =
+{
+    nullptr, /* addProperty */
+    nullptr, /* delProperty */
+    nullptr, /* enumerate */
+    nullptr, /* newEnumerate */
+    nullptr, /* resolve */
+    nullptr, /* mayResolve */
+    ResolveResponseClosure::finalize
+};
+
+const Class ResolveResponseClosure::class_ =
+{
+    "WebAssembly ResolveResponseClosure",
+    JSCLASS_DELAY_METADATA_BUILDER |
+    JSCLASS_HAS_RESERVED_SLOTS(ResolveResponseClosure::RESERVED_SLOTS) |
+    JSCLASS_FOREGROUND_FINALIZE,
+    &ResolveResponseClosure::classOps_,
+};
+
+static ResolveResponseClosure*
+ToResolveResponseClosure(CallArgs args)
+{
+    return &args.callee().as<JSFunction>().getExtendedSlot(0).toObject().as<ResolveResponseClosure>();
+}
+
+static bool
+ResolveResponse_OnFulfilled(JSContext* cx, unsigned argc, Value* vp)
+{
+    CallArgs callArgs = CallArgsFromVp(argc, vp);
+
+    Rooted<ResolveResponseClosure*> closure(cx, ToResolveResponseClosure(callArgs));
+    Rooted<PromiseObject*> promise(cx, &closure->promise());
+    const CompileArgs& compileArgs = closure->compileArgs();
+    bool instantiate = closure->instantiate();
+    Rooted<JSObject*> importObj(cx, closure->importObj());
+
+    auto task = cx->make_unique<CompileStreamTask>(cx, promise, compileArgs, instantiate, importObj);
+    if (!task || !task->init(cx))
+        return false;
+
+    if (!callArgs.get(0).isObject())
+        return RejectWithErrorNumber(cx, JSMSG_BAD_RESPONSE_VALUE, promise);
+
+    RootedObject response(cx, &callArgs.get(0).toObject());
+    if (!cx->runtime()->consumeStreamCallback(cx, response, JS::MimeType::Wasm, task.get()))
+        return RejectWithPendingException(cx, promise);
+
+    Unused << task.release();
+
+    callArgs.rval().setUndefined();
+    return true;
+}
+
+static bool
+ResolveResponse_OnRejected(JSContext* cx, unsigned argc, Value* vp)
+{
+    CallArgs args = CallArgsFromVp(argc, vp);
+
+    Rooted<ResolveResponseClosure*> closure(cx, ToResolveResponseClosure(args));
+    Rooted<PromiseObject*> promise(cx, &closure->promise());
+
+    if (!PromiseObject::reject(cx, promise, args.get(0)))
+        return false;
+
+    args.rval().setUndefined();
+    return true;
+}
+
+static bool
+ResolveResponse(JSContext* cx, CallArgs callArgs, Handle<PromiseObject*> promise,
+                bool instantiate = false, HandleObject importObj = nullptr)
+{
+    MOZ_ASSERT_IF(importObj, instantiate);
+
+    SharedCompileArgs compileArgs = InitCompileArgs(cx);
+    if (!compileArgs)
+        return false;
+
+    RootedObject closure(cx, ResolveResponseClosure::create(cx, *compileArgs, promise,
+                                                            instantiate, importObj));
+    if (!closure)
+        return false;
+
+    RootedFunction onResolved(cx, NewNativeFunction(cx, ResolveResponse_OnFulfilled, 1, nullptr,
+                                                    gc::AllocKind::FUNCTION_EXTENDED));
+    if (!onResolved)
+        return false;
+
+    RootedFunction onRejected(cx, NewNativeFunction(cx, ResolveResponse_OnRejected, 1, nullptr,
+                                                    gc::AllocKind::FUNCTION_EXTENDED));
+    if (!onResolved)
+        return false;
+
+    onResolved->setExtendedSlot(0, ObjectValue(*closure));
+    onRejected->setExtendedSlot(0, ObjectValue(*closure));
+
+    RootedObject resolve(cx, PromiseObject::unforgeableResolve(cx, callArgs.get(0)));
+    if (!resolve)
+        return false;
+
+    return JS::AddPromiseReactions(cx, resolve, onResolved, onRejected);
+}
+
+static bool
+WebAssembly_compileStreaming(JSContext* cx, unsigned argc, Value* vp)
+{
+    if (!EnsureStreamSupport(cx))
+        return false;
+
+    Rooted<PromiseObject*> promise(cx, PromiseObject::createSkippingExecutor(cx));
+    if (!promise)
+        return false;
+
+    CallArgs callArgs = CallArgsFromVp(argc, vp);
+
+    if (!ResolveResponse(cx, callArgs, promise))
+        return RejectWithPendingException(cx, promise, callArgs);
+
+    callArgs.rval().setObject(*promise);
+    return true;
+}
+
+static bool
+WebAssembly_instantiateStreaming(JSContext* cx, unsigned argc, Value* vp)
+{
+    if (!EnsureStreamSupport(cx))
+        return false;
+
+    Rooted<PromiseObject*> promise(cx, PromiseObject::createSkippingExecutor(cx));
+    if (!promise)
+        return false;
+
+    CallArgs callArgs = CallArgsFromVp(argc, vp);
+
+    RootedObject firstArg(cx);
+    RootedObject importObj(cx);
+    if (!GetInstantiateArgs(cx, callArgs, &firstArg, &importObj))
+        return RejectWithPendingException(cx, promise, callArgs);
+
+    if (!ResolveResponse(cx, callArgs, promise, true, importObj))
+        return RejectWithPendingException(cx, promise, callArgs);
+
+    callArgs.rval().setObject(*promise);
+    return true;
+}
+
 static const JSFunctionSpec WebAssembly_static_methods[] =
 {
 #if JS_HAS_TOSOURCE
     JS_FN(js_toSource_str, WebAssembly_toSource, 0, 0),
 #endif
     JS_FN("compile", WebAssembly_compile, 1, 0),
     JS_FN("instantiate", WebAssembly_instantiate, 1, 0),
     JS_FN("validate", WebAssembly_validate, 1, 0),
+    JS_FN("compileStreaming", WebAssembly_compileStreaming, 1, 0),
+    JS_FN("instantiateStreaming", WebAssembly_instantiateStreaming, 1, 0),
     JS_FS_END
 };
 
 const Class js::WebAssemblyClass =
 {
     js_WebAssembly_str,
     JSCLASS_HAS_CACHED_PROTO(JSProto_WebAssembly)
 };