Bug 1582348 - Fill out the rest of |WritableStreamDefaultControllerAdvanceQueueIfNeeded| and implement |WritableStreamDefaultControllerProcess{Close,Write}| as required by it. r=arai
authorJeff Walden <jwalden@mit.edu>
Tue, 05 Nov 2019 05:35:30 +0000
changeset 500531 61afdffa6c61f15c997c560fb966716c0f4c00ed
parent 500530 0695bdca115cc0ac2596e5edf90669d7c95be168
child 500532 9515a29ca7471d96cdac1dbb28e2ddd88468510b
push id114166
push userapavel@mozilla.com
push dateThu, 07 Nov 2019 10:04:01 +0000
treeherdermozilla-inbound@d271c572a9bc [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersarai
bugs1582348
milestone72.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1582348 - Fill out the rest of |WritableStreamDefaultControllerAdvanceQueueIfNeeded| and implement |WritableStreamDefaultControllerProcess{Close,Write}| as required by it. r=arai Differential Revision: https://phabricator.services.mozilla.com/D50100
js/src/builtin/streams/QueueWithSizes-inl.h
js/src/builtin/streams/QueueWithSizes.h
js/src/builtin/streams/WritableStreamDefaultControllerOperations.cpp
js/src/builtin/streams/WritableStreamDefaultControllerOperations.h
--- a/js/src/builtin/streams/QueueWithSizes-inl.h
+++ b/js/src/builtin/streams/QueueWithSizes-inl.h
@@ -60,11 +60,18 @@ inline void QueueRemoveFirstValueAndSize
 inline MOZ_MUST_USE bool QueueAppendValueAndSize(
     JSContext* cx, JS::Handle<ListObject*> unwrappedQueue,
     JS::Handle<JS::Value> value, double size) {
   return unwrappedQueue->appendValueAndSize(cx, value, size);
 }
 
 }  // namespace detail
 
+/**
+ * Streams spec, 6.2.3. PeekQueueValue ( container ) nothrow
+ */
+inline JS::Value PeekQueueValue(ListObject* queue) {
+  return detail::QueueFirstValue(queue);
+}
+
 }  // namespace js
 
 #endif  // builtin_streams_QueueWithSizes_inl_h
--- a/js/src/builtin/streams/QueueWithSizes.h
+++ b/js/src/builtin/streams/QueueWithSizes.h
@@ -44,11 +44,22 @@ extern MOZ_MUST_USE bool EnqueueValueWit
     JS::Handle<JS::Value> value, JS::Handle<JS::Value> sizeVal);
 
 /**
  * Streams spec, 6.2.4. ResetQueue ( container ) nothrow
  */
 extern MOZ_MUST_USE bool ResetQueue(
     JSContext* cx, JS::Handle<StreamController*> unwrappedContainer);
 
+inline bool QueueIsEmpty(ListObject* unwrappedQueue) {
+  if (unwrappedQueue->isEmpty()) {
+    return true;
+  }
+
+  MOZ_ASSERT((unwrappedQueue->length() % 2) == 0,
+             "queue-with-sizes must consist of (value, size) element pairs and "
+             "so must have even length");
+  return false;
+}
+
 }  // namespace js
 
 #endif  // builtin_streams_QueueWithSizes_h
--- a/js/src/builtin/streams/WritableStreamDefaultControllerOperations.cpp
+++ b/js/src/builtin/streams/WritableStreamDefaultControllerOperations.cpp
@@ -10,50 +10,59 @@
 
 #include "mozilla/Assertions.h"  // MOZ_ASSERT
 #include "mozilla/Attributes.h"  // MOZ_MUST_USE
 
 #include "jsapi.h"  // JS_ReportErrorASCII
 
 #include "builtin/Promise.h"  // js::PromiseObject
 #include "builtin/streams/MiscellaneousOperations.h"  // js::CreateAlgorithmFromUnderlyingMethod, js::InvokeOrNoop
-#include "builtin/streams/QueueWithSizes.h"  // js::{EnqueueValueWithSize,ResetQueue}
+#include "builtin/streams/QueueWithSizes.h"  // js::{EnqueueValueWithSize,QueueIsEmpty,ResetQueue}
 #include "builtin/streams/WritableStream.h"  // js::WritableStream
 #include "builtin/streams/WritableStreamDefaultController.h"  // js::WritableStreamDefaultController
-#include "builtin/streams/WritableStreamOperations.h"  // js::WritableStream{DealWithRejection,{Start,Finish}Erroring,UpdateBackpressure}
+#include "builtin/streams/WritableStreamOperations.h"  // js::WritableStream{CloseQueuedOrInFlight,DealWithRejection,{Start,Finish}Erroring,UpdateBackpressure,Mark{Close,FirstWrite}RequestInFlight}
 #include "js/CallArgs.h"    // JS::CallArgs{,FromVp}
+#include "js/Promise.h"     // JS::AddPromiseReactions
 #include "js/RootingAPI.h"  // JS::Handle, JS::Rooted
 #include "js/Value.h"  // JS::{,Int32,Magic,Object}Value, JS::UndefinedHandleValue, JS_WRITABLESTREAM_CLOSE_RECORD
 #include "vm/Compartment.h"  // JS::Compartment
 #include "vm/JSContext.h"   // JSContext
 #include "vm/JSObject.h"    // JSObject
 #include "vm/List.h"        // js::ListObject
 #include "vm/Runtime.h"     // JSAtomState
 
 #include "builtin/streams/HandlerFunction-inl.h"  // js::TargetFromHandler
 #include "builtin/streams/MiscellaneousOperations-inl.h"  // js::PromiseCall
+#include "builtin/streams/QueueWithSizes-inl.h"           // js::PeekQueueValue
 #include "vm/Compartment-inl.h"                   // JS::Compartment::wrap
 #include "vm/JSContext-inl.h"                     // JSContext::check
 #include "vm/JSObject-inl.h"  // js::NewBuiltinClassInstance, js::NewObjectWithClassProto
 #include "vm/Realm-inl.h"     // js::AutoRealm
 
 using JS::CallArgs;
 using JS::CallArgsFromVp;
 using JS::Handle;
 using JS::Int32Value;
 using JS::MagicValue;
 using JS::ObjectValue;
 using JS::Rooted;
 using JS::UndefinedHandleValue;
 using JS::Value;
 
 using js::ListObject;
+using js::NewHandler;
+using js::PeekQueueValue;
+using js::TargetFromHandler;
 using js::WritableStream;
+using js::WritableStreamCloseQueuedOrInFlight;
 using js::WritableStreamDefaultController;
 using js::WritableStreamFinishErroring;
+using js::WritableStreamMarkCloseRequestInFlight;
+using js::WritableStreamMarkFirstWriteRequestInFlight;
+using js::WritableStreamUpdateBackpressure;
 
 /*** 4.7. Writable stream default controller internal methods ***************/
 
 /**
  * Streams spec, 4.7.5.1.
  *      [[AbortSteps]]( reason )
  */
 JSObject* js::WritableStreamControllerAbortSteps(
@@ -515,16 +524,20 @@ bool js::WritableStreamDefaultController
  * Streams spec, 4.8.7.
  *      WritableStreamDefaultControllerGetDesiredSize ( controller )
  */
 double js::WritableStreamDefaultControllerGetDesiredSize(
     const WritableStreamDefaultController* controller) {
   return controller->strategyHWM() - controller->queueTotalSize();
 }
 
+static MOZ_MUST_USE bool WritableStreamDefaultControllerProcessIfNeeded(
+    JSContext* cx,
+    Handle<WritableStreamDefaultController*> unwrappedController);
+
 /**
  * Streams spec, 4.8.9.
  *      WritableStreamDefaultControllerAdvanceQueueIfNeeded ( controller )
  */
 MOZ_MUST_USE bool WritableStreamDefaultControllerAdvanceQueueIfNeeded(
     JSContext* cx,
     Handle<WritableStreamDefaultController*> unwrappedController) {
   // Step 2: If controller.[[started]] is false, return.
@@ -547,31 +560,24 @@ MOZ_MUST_USE bool WritableStreamDefaultC
   MOZ_ASSERT(!unwrappedStream->errored());
   if (unwrappedStream->erroring()) {
     // Step 6a: Perform ! WritableStreamFinishErroring(stream).
     // Step 6b: Return.
     return WritableStreamFinishErroring(cx, unwrappedStream);
   }
 
   // Step 7: If controller.[[queue]] is empty, return.
-  Rooted<ListObject*> unwrappedQueue(cx, unwrappedController->queue());
-  MOZ_ASSERT((unwrappedQueue->length() % 2) == 0);
-  if (unwrappedQueue->isEmpty()) {
-    return true;
-  }
-
   // Step 8: Let writeRecord be ! PeekQueueValue(controller).
   // Step 9: If writeRecord is "close", perform
   //         ! WritableStreamDefaultControllerProcessClose(controller).
   // Step 10: Otherwise, perform
   //          ! WritableStreamDefaultControllerProcessWrite(
   //              controller, writeRecord.[[chunk]]).
-  // XXX jwalden fill me in!
-  JS_ReportErrorASCII(cx, "nope");
-  return false;
+  return WritableStreamDefaultControllerProcessIfNeeded(cx,
+                                                        unwrappedController);
 }
 
 /**
  * Streams spec, 4.8.10.
  *      WritableStreamDefaultControllerErrorIfNeeded ( controller, error )
  */
 bool js::WritableStreamDefaultControllerErrorIfNeeded(
     JSContext* cx, Handle<WritableStreamDefaultController*> unwrappedController,
@@ -584,16 +590,249 @@ bool js::WritableStreamDefaultController
     if (!WritableStreamDefaultControllerError(cx, unwrappedController, error)) {
       return false;
     }
   }
 
   return true;
 }
 
+static MOZ_MUST_USE JSObject* PerformCloseAlgorithm(
+    JSContext* cx,
+    Handle<WritableStreamDefaultController*> unwrappedController) {
+  // XXX jwalden fill me in!
+  JS_ReportErrorASCII(cx, "boo");
+  return nullptr;
+}
+
+static MOZ_MUST_USE JSObject* PerformWriteAlgorithm(
+    JSContext* cx, Handle<WritableStreamDefaultController*> unwrappedController,
+    Handle<Value> chunk) {
+  cx->check(chunk);
+
+  // XXX jwalden fill me in!
+  JS_ReportErrorASCII(cx, "boo");
+  return nullptr;
+}
+
+/**
+ * Streams spec, 4.8.11 step 7:
+ * Upon fulfillment of sinkClosePromise,
+ */
+static MOZ_MUST_USE bool WritableStreamCloseHandler(JSContext* cx,
+                                                    unsigned argc, Value* vp) {
+  CallArgs args = CallArgsFromVp(argc, vp);
+
+  Rooted<WritableStream*> unwrappedStream(
+      cx, TargetFromHandler<WritableStream>(args));
+
+  // Step 7.a: Perform ! WritableStreamFinishInFlightClose(stream).
+  if (!WritableStreamFinishInFlightClose(cx, unwrappedStream)) {
+    return false;
+  }
+
+  args.rval().setUndefined();
+  return true;
+}
+
+/**
+ * Streams spec, 4.8.11 step 8:
+ * Upon rejection of sinkClosePromise with reason reason,
+ */
+static MOZ_MUST_USE bool WritableStreamCloseFailedHandler(JSContext* cx,
+                                                          unsigned argc,
+                                                          Value* vp) {
+  CallArgs args = CallArgsFromVp(argc, vp);
+
+  Rooted<WritableStream*> unwrappedStream(
+      cx, TargetFromHandler<WritableStream>(args));
+
+  // Step 8.a: Perform
+  //           ! WritableStreamFinishInFlightCloseWithError(stream, reason).
+  if (!WritableStreamFinishInFlightCloseWithError(cx, unwrappedStream,
+                                                  args.get(0))) {
+    return false;
+  }
+
+  args.rval().setUndefined();
+  return true;
+}
+
+/**
+ * Streams spec, 4.8.12 step 4:
+ * Upon fulfillment of sinkWritePromise,
+ */
+static MOZ_MUST_USE bool WritableStreamWriteHandler(JSContext* cx,
+                                                    unsigned argc, Value* vp) {
+  CallArgs args = CallArgsFromVp(argc, vp);
+
+  Rooted<WritableStream*> unwrappedStream(
+      cx, TargetFromHandler<WritableStream>(args));
+
+  // Step 4.a: Perform ! WritableStreamFinishInFlightWrite(stream).
+  if (!WritableStreamFinishInFlightWrite(cx, unwrappedStream)) {
+    return false;
+  }
+
+  // Step 4.b: Let state be stream.[[state]].
+  // Step 4.c: Assert: state is "writable" or "erroring".
+  MOZ_ASSERT(unwrappedStream->writable() ^ unwrappedStream->erroring());
+
+  // Step 4.d: Perform ! DequeueValue(controller).
+  DequeueValue(unwrappedStream->controller(), cx);
+
+  // Step 4.e: If ! WritableStreamCloseQueuedOrInFlight(stream) is false and
+  //           state is "writable",
+  if (!WritableStreamCloseQueuedOrInFlight(unwrappedStream) &&
+      unwrappedStream->writable()) {
+    // Step 4.e.i: Let backpressure be
+    //             ! WritableStreamDefaultControllerGetBackpressure(
+    //                   controller).
+    bool backpressure = WritableStreamDefaultControllerGetBackpressure(
+        unwrappedStream->controller());
+
+    // Step 4.e.ii: Perform
+    //              ! WritableStreamUpdateBackpressure(stream, backpressure).
+    if (!WritableStreamUpdateBackpressure(cx, unwrappedStream, backpressure)) {
+      return false;
+    }
+  }
+
+  // Step 4.f: Perform
+  //           ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(
+  //                 controller).
+  Rooted<WritableStreamDefaultController*> unwrappedController(
+      cx, unwrappedStream->controller());
+  if (!WritableStreamDefaultControllerAdvanceQueueIfNeeded(
+          cx, unwrappedController)) {
+    return false;
+  }
+
+  args.rval().setUndefined();
+  return true;
+}
+
+/**
+ * Streams spec, 4.8.12 step 5:
+ * Upon rejection of sinkWritePromise with reason,
+ */
+static MOZ_MUST_USE bool WritableStreamWriteFailedHandler(JSContext* cx,
+                                                          unsigned argc,
+                                                          Value* vp) {
+  CallArgs args = CallArgsFromVp(argc, vp);
+
+  Rooted<WritableStream*> unwrappedStream(
+      cx, TargetFromHandler<WritableStream>(args));
+
+  // Step 5.a: If stream.[[state]] is "writable", perform
+  //           ! WritableStreamDefaultControllerClearAlgorithms(controller).
+  if (unwrappedStream->writable()) {
+    WritableStreamDefaultControllerClearAlgorithms(
+        unwrappedStream->controller());
+  }
+
+  // Step 5.b: Perform
+  //           ! WritableStreamFinishInFlightWriteWithError(stream, reason).
+  if (!WritableStreamFinishInFlightWriteWithError(cx, unwrappedStream,
+                                                  args.get(0))) {
+    return false;
+  }
+
+  args.rval().setUndefined();
+  return true;
+}
+
+/**
+ * Streams spec, 4.8.9 (steps 7-10),
+ *      WritableStreamDefaultControllerAdvanceQueueIfNeeded ( controller )
+ * Streams spec, 4.8.11.
+ *      WritableStreamDefaultControllerProcessClose ( controller )
+ * Streams spec, 4.8.12.
+ *      WritableStreamDefaultControllerProcessWrite ( controller, chunk )
+ */
+bool WritableStreamDefaultControllerProcessIfNeeded(
+    JSContext* cx,
+    Handle<WritableStreamDefaultController*> unwrappedController) {
+  // Step 7: If controller.[[queue]] is empty, return.
+  ListObject* unwrappedQueue = unwrappedController->queue();
+  if (QueueIsEmpty(unwrappedQueue)) {
+    return true;
+  }
+
+  // Step 8: Let writeRecord be ! PeekQueueValue(controller).
+  // Step 9: If writeRecord is "close", perform
+  //         ! WritableStreamDefaultControllerProcessClose(controller).
+  // Step 10: Otherwise, perform
+  //          ! WritableStreamDefaultControllerProcessWrite(
+  //                controller, writeRecord.[[chunk]]).
+  Rooted<JSObject*> sinkWriteOrClosePromise(cx);
+  JSNative onFulfilledFunc, onRejectedFunc;
+  if (PeekQueueValue(unwrappedQueue).isMagic(JS_WRITABLESTREAM_CLOSE_RECORD)) {
+    MOZ_ASSERT(unwrappedQueue->length() == 2);
+
+    onFulfilledFunc = WritableStreamCloseHandler;
+    onRejectedFunc = WritableStreamCloseFailedHandler;
+
+    // 4.8.11 step 1: Let stream be controller.[[controlledWritableStream]].
+    // 4.8.11 step 2: Perform ! WritableStreamMarkCloseRequestInFlight(stream).
+    WritableStreamMarkCloseRequestInFlight(unwrappedController->stream());
+
+    // 4.8.11 step 3: Perform ! DequeueValue(controller).
+    DequeueValue(unwrappedController, cx);
+
+    // 4.8.11 step 4: Assert: controller.[[queue]] is empty.
+    MOZ_ASSERT(unwrappedQueue->isEmpty());
+
+    // 4.8.11 step 5: Let sinkClosePromise be the result of performing
+    //         controller.[[closeAlgorithm]].
+    sinkWriteOrClosePromise = PerformCloseAlgorithm(cx, unwrappedController);
+  } else {
+    onFulfilledFunc = WritableStreamWriteHandler;
+    onRejectedFunc = WritableStreamWriteFailedHandler;
+
+    Rooted<Value> chunk(cx, PeekQueueValue(unwrappedQueue));
+    if (!cx->compartment()->wrap(cx, &chunk)) {
+      return false;
+    }
+
+    // 4.8.12 step 1: Let stream be controller.[[controlledWritableStream]].
+    // 4.8.12 step 2: Perform
+    //                ! WritableStreamMarkFirstWriteRequestInFlight(stream).
+    WritableStreamMarkFirstWriteRequestInFlight(unwrappedController->stream());
+
+    // 4.8.12 step 3: Let sinkWritePromise be the result of performing
+    //                controller.[[writeAlgorithm]], passing in chunk.
+    sinkWriteOrClosePromise =
+        PerformWriteAlgorithm(cx, unwrappedController, chunk);
+  }
+  if (!sinkWriteOrClosePromise) {
+    return false;
+  }
+
+  Rooted<JSObject*> stream(cx, unwrappedController->stream());
+  if (!cx->compartment()->wrap(cx, &stream)) {
+    return false;
+  }
+
+  // Step 7: Upon fulfillment of sinkClosePromise,
+  // Step 4: Upon fulfillment of sinkWritePromise,
+  // Step 8: Upon rejection of sinkClosePromise with reason reason,
+  // Step 5: Upon rejection of sinkWritePromise with reason,
+  Rooted<JSObject*> onFulfilled(cx, NewHandler(cx, onFulfilledFunc, stream));
+  if (!onFulfilled) {
+    return false;
+  }
+  Rooted<JSObject*> onRejected(cx, NewHandler(cx, onRejectedFunc, stream));
+  if (!onRejected) {
+    return false;
+  }
+  return JS::AddPromiseReactions(cx, sinkWriteOrClosePromise, onFulfilled,
+                                 onRejected);
+}
+
 /**
  * Streams spec, 4.8.13.
  *      WritableStreamDefaultControllerGetBackpressure ( controller )
  */
 bool js::WritableStreamDefaultControllerGetBackpressure(
     const WritableStreamDefaultController* unwrappedController) {
   return WritableStreamDefaultControllerGetDesiredSize(unwrappedController) <=
          0.0;
--- a/js/src/builtin/streams/WritableStreamDefaultControllerOperations.h
+++ b/js/src/builtin/streams/WritableStreamDefaultControllerOperations.h
@@ -76,16 +76,25 @@ extern MOZ_MUST_USE bool WritableStreamD
 extern double WritableStreamDefaultControllerGetDesiredSize(
     const WritableStreamDefaultController* controller);
 
 extern MOZ_MUST_USE bool WritableStreamDefaultControllerErrorIfNeeded(
     JSContext* cx,
     JS::Handle<WritableStreamDefaultController*> unwrappedController,
     JS::Handle<JS::Value> error);
 
+extern MOZ_MUST_USE bool WritableStreamDefaultControllerProcessClose(
+    JSContext* cx,
+    JS::Handle<WritableStreamDefaultController*> unwrappedController);
+
+extern MOZ_MUST_USE bool WritableStreamDefaultControllerProcessWrite(
+    JSContext* cx,
+    JS::Handle<WritableStreamDefaultController*> unwrappedController,
+    JS::Handle<JS::Value> chunk);
+
 extern bool WritableStreamDefaultControllerGetBackpressure(
     const WritableStreamDefaultController* unwrappedController);
 
 extern MOZ_MUST_USE bool WritableStreamDefaultControllerError(
     JSContext* cx,
     JS::Handle<WritableStreamDefaultController*> unwrappedController,
     JS::Handle<JS::Value> error);