Bug 1445587 - Port Fetch to WorkerRef - part 2 - FetchStream, r=smaug
authorAndrea Marchesini <amarchesini@mozilla.com>
Fri, 16 Mar 2018 16:52:29 +0100
changeset 408609 a5ef5edfe67a3c11f335c0f1ee5116cedf1e607a
parent 408608 947cf2126bb33164d21e0c45ce392c4372d25972
child 408610 5c7105dd8159160f30cff39223d6d9d0739f8b69
push id33649
push userbtara@mozilla.com
push dateSat, 17 Mar 2018 10:29:43 +0000
treeherdermozilla-central@97160a734959 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerssmaug
bugs1445587
milestone61.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1445587 - Port Fetch to WorkerRef - part 2 - FetchStream, r=smaug
dom/fetch/FetchStream.cpp
dom/fetch/FetchStream.h
--- a/dom/fetch/FetchStream.cpp
+++ b/dom/fetch/FetchStream.cpp
@@ -14,70 +14,28 @@
 #define FETCH_STREAM_FLAG 0
 
 static NS_DEFINE_CID(kStreamTransportServiceCID,
                      NS_STREAMTRANSPORTSERVICE_CID);
 
 namespace mozilla {
 namespace dom {
 
-namespace {
-
-class FetchStreamWorkerHolder final : public WorkerHolder
+class FetchStream::WorkerShutdown final : public WorkerControlRunnable
 {
 public:
-  explicit FetchStreamWorkerHolder(FetchStream* aStream)
-    : WorkerHolder("FetchStreamWorkerHolder",
-                   WorkerHolder::Behavior::AllowIdleShutdownStart)
+  WorkerShutdown(WorkerPrivate* aWorkerPrivate, RefPtr<FetchStream> aStream)
+    : WorkerControlRunnable(aWorkerPrivate, WorkerThreadUnchangedBusyCount)
     , mStream(aStream)
-    , mWasNotified(false)
-  {}
-
-  bool Notify(WorkerStatus aStatus) override
-  {
-    if (!mWasNotified) {
-      mWasNotified = true;
-      mStream->Close();
-    }
-
-    return true;
-  }
-
-  WorkerPrivate* GetWorkerPrivate() const
-  {
-    return mWorkerPrivate;
-  }
-
-private:
-  RefPtr<FetchStream> mStream;
-  bool mWasNotified;
-};
-
-class FetchStreamWorkerHolderShutdown final : public WorkerControlRunnable
-{
-public:
-  FetchStreamWorkerHolderShutdown(WorkerPrivate* aWorkerPrivate,
-                                  UniquePtr<WorkerHolder>&& aHolder,
-                                  nsCOMPtr<nsIGlobalObject>&& aGlobal,
-                                  RefPtr<FetchStreamHolder>&& aStreamHolder)
-    : WorkerControlRunnable(aWorkerPrivate, WorkerThreadUnchangedBusyCount)
-    , mHolder(Move(aHolder))
-    , mGlobal(Move(aGlobal))
-    , mStreamHolder(Move(aStreamHolder))
   {}
 
   bool
   WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
   {
-    mHolder = nullptr;
-    mGlobal = nullptr;
-
-    mStreamHolder->NullifyStream();
-    mStreamHolder = nullptr;
-
+    mStream->ReleaseObjects();
     return true;
   }
 
   // This runnable starts from a JS Thread. We need to disable a couple of
   // assertions overring the following methods.
 
   bool
   PreDispatch(WorkerPrivate* aWorkerPrivate) override
@@ -85,23 +43,19 @@ public:
     return true;
   }
 
   void
   PostDispatch(WorkerPrivate* aWorkerPrivate, bool aDispatchResult) override
   {}
 
 private:
-  UniquePtr<WorkerHolder> mHolder;
-  nsCOMPtr<nsIGlobalObject> mGlobal;
-  RefPtr<FetchStreamHolder> mStreamHolder;
+  RefPtr<FetchStream> mStream;
 };
 
-} // anonymous
-
 NS_IMPL_ISUPPORTS(FetchStream, nsIInputStreamCallback, nsIObserver,
                   nsISupportsWeakReference)
 
 /* static */ void
 FetchStream::Create(JSContext* aCx, FetchStreamHolder* aStreamHolder,
                     nsIGlobalObject* aGlobal, nsIInputStream* aInputStream,
                     JS::MutableHandle<JSObject*> aStream, ErrorResult& aRv)
 {
@@ -123,27 +77,30 @@ FetchStream::Create(JSContext* aCx, Fetc
     if (NS_WARN_IF(aRv.Failed())) {
       return;
     }
 
   } else {
     WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
     MOZ_ASSERT(workerPrivate);
 
-    UniquePtr<FetchStreamWorkerHolder> holder(
-      new FetchStreamWorkerHolder(stream));
-    if (NS_WARN_IF(!holder->HoldWorker(workerPrivate, Closing))) {
+    RefPtr<WeakWorkerRef> workerRef =
+      WeakWorkerRef::Create(workerPrivate, [stream]() {
+        stream->Close();
+      });
+
+    if (NS_WARN_IF(!workerRef)) {
       aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR);
       return;
     }
 
     // Note, this will create a ref-cycle between the holder and the stream.
     // The cycle is broken when the stream is closed or the worker begins
     // shutting down.
-    stream->mWorkerHolder = Move(holder);
+    stream->mWorkerRef = workerRef.forget();
   }
 
   if (!JS::HasReadableStreamCallbacks(aCx)) {
     JS::SetReadableStreamCallbacks(aCx,
                                    &FetchStream::RequestDataCallback,
                                    &FetchStream::WriteIntoReadRequestCallback,
                                    &FetchStream::CancelCallback,
                                    &FetchStream::ClosedCallback,
@@ -173,16 +130,19 @@ FetchStream::RequestDataCallback(JSConte
                                  uint8_t aFlags,
                                  size_t aDesiredSize)
 {
   MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
   MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG);
   MOZ_DIAGNOSTIC_ASSERT(JS::ReadableStreamIsDisturbed(aStream));
 
   RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource);
+  stream->AssertIsOnOwningThread();
+
+  MutexAutoLock lock(stream->mMutex);
 
   MOZ_DIAGNOSTIC_ASSERT(stream->mState == eInitializing ||
                         stream->mState == eWaiting ||
                         stream->mState == eChecking ||
                         stream->mState == eReading);
 
   if (stream->mState == eReading) {
     // We are already reading data.
@@ -209,32 +169,32 @@ FetchStream::RequestDataCallback(JSConte
     // mOriginalInputStream into an nsIAsyncInputStream.
     MOZ_ASSERT(stream->mOriginalInputStream);
 
     nsCOMPtr<nsIAsyncInputStream> asyncStream;
     nsresult rv =
       NS_MakeAsyncNonBlockingInputStream(stream->mOriginalInputStream.forget(),
                                          getter_AddRefs(asyncStream));
     if (NS_WARN_IF(NS_FAILED(rv))) {
-      stream->ErrorPropagation(aCx, aStream, rv);
+      stream->ErrorPropagation(aCx, lock, aStream, rv);
       return;
     }
 
     stream->mInputStream = asyncStream;
     stream->mOriginalInputStream = nullptr;
   }
 
   MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream);
   MOZ_DIAGNOSTIC_ASSERT(!stream->mOriginalInputStream);
 
   nsresult rv =
     stream->mInputStream->AsyncWait(stream, 0, 0,
                                     stream->mOwningEventTarget);
   if (NS_WARN_IF(NS_FAILED(rv))) {
-    stream->ErrorPropagation(aCx, aStream, rv);
+    stream->ErrorPropagation(aCx, lock, aStream, rv);
     return;
   }
 
   // All good.
 }
 
 /* static */ void
 FetchStream::WriteIntoReadRequestCallback(JSContext* aCx,
@@ -244,40 +204,43 @@ FetchStream::WriteIntoReadRequestCallbac
                                           size_t aLength, size_t* aByteWritten)
 {
   MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
   MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG);
   MOZ_DIAGNOSTIC_ASSERT(aBuffer);
   MOZ_DIAGNOSTIC_ASSERT(aByteWritten);
 
   RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource);
+  stream->AssertIsOnOwningThread();
+
+  MutexAutoLock lock(stream->mMutex);
 
   MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream);
   MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWriting);
   stream->mState = eChecking;
 
   uint32_t written;
   nsresult rv =
     stream->mInputStream->Read(static_cast<char*>(aBuffer), aLength, &written);
   if (NS_WARN_IF(NS_FAILED(rv))) {
-    stream->ErrorPropagation(aCx, aStream, rv);
+    stream->ErrorPropagation(aCx, lock, aStream, rv);
     return;
   }
 
   *aByteWritten = written;
 
   if (written == 0) {
-    stream->CloseAndReleaseObjects(aCx, aStream);
+    stream->CloseAndReleaseObjects(aCx, lock, aStream);
     return;
   }
 
   rv = stream->mInputStream->AsyncWait(stream, 0, 0,
                                        stream->mOwningEventTarget);
   if (NS_WARN_IF(NS_FAILED(rv))) {
-    stream->ErrorPropagation(aCx, aStream, rv);
+    stream->ErrorPropagation(aCx, lock, aStream, rv);
     return;
   }
 
   // All good.
 }
 
 /* static */ JS::Value
 FetchStream::CancelCallback(JSContext* aCx, JS::HandleObject aStream,
@@ -287,16 +250,17 @@ FetchStream::CancelCallback(JSContext* a
   MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
   MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG);
 
   // This is safe because we created an extra reference in FetchStream::Create()
   // that won't be released until FetchStream::FinalizeCallback() is called.
   // We are guaranteed that won't happen until the js ReadableStream object
   // is finalized.
   FetchStream* stream = static_cast<FetchStream*>(aUnderlyingSource);
+  stream->AssertIsOnOwningThread();
 
   if (stream->mState == eInitializing) {
     // The stream has been used for the first time.
     stream->mStreamHolder->MarkAsRead();
   }
 
   if (stream->mInputStream) {
     stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
@@ -322,16 +286,17 @@ FetchStream::ErroredCallback(JSContext* 
   MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
   MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG);
 
   // This is safe because we created an extra reference in FetchStream::Create()
   // that won't be released until FetchStream::FinalizeCallback() is called.
   // We are guaranteed that won't happen until the js ReadableStream object
   // is finalized.
   FetchStream* stream = static_cast<FetchStream*>(aUnderlyingSource);
+  stream->AssertIsOnOwningThread();
 
   if (stream->mState == eInitializing) {
     // The stream has been used for the first time.
     stream->mStreamHolder->MarkAsRead();
   }
 
   if (stream->mInputStream) {
     stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
@@ -353,61 +318,70 @@ FetchStream::FinalizeCallback(void* aUnd
     dont_AddRef(static_cast<FetchStream*>(aUnderlyingSource));
 
   stream->ReleaseObjects();
 }
 
 FetchStream::FetchStream(nsIGlobalObject* aGlobal,
                          FetchStreamHolder* aStreamHolder,
                          nsIInputStream* aInputStream)
-  : mState(eInitializing)
+  : mMutex("FetchStream::mMutex")
+  , mState(eInitializing)
   , mGlobal(aGlobal)
   , mStreamHolder(aStreamHolder)
   , mOwningEventTarget(aGlobal->EventTargetFor(TaskCategory::Other))
   , mOriginalInputStream(aInputStream)
 {
   MOZ_DIAGNOSTIC_ASSERT(aInputStream);
   MOZ_DIAGNOSTIC_ASSERT(aStreamHolder);
 }
 
 FetchStream::~FetchStream()
 {
 }
 
 void
-FetchStream::ErrorPropagation(JSContext* aCx, JS::HandleObject aStream,
+FetchStream::ErrorPropagation(JSContext* aCx,
+                              const MutexAutoLock& aProofOfLock,
+                              JS::HandleObject aStream,
                               nsresult aError)
 {
+  AssertIsOnOwningThread();
+
   // Nothing to do.
   if (mState == eClosed) {
     return;
   }
 
   // Let's close the stream.
   if (aError == NS_BASE_STREAM_CLOSED) {
-    CloseAndReleaseObjects(aCx, aStream);
+    CloseAndReleaseObjects(aCx, aProofOfLock, aStream);
     return;
   }
 
   // Let's use a generic error.
   RefPtr<DOMException> error = DOMException::Create(NS_ERROR_DOM_TYPE_ERR);
 
   JS::Rooted<JS::Value> errorValue(aCx);
   if (ToJSValue(aCx, error, &errorValue)) {
+    MutexAutoUnlock unlock(mMutex);
     JS::ReadableStreamError(aCx, aStream, errorValue);
   }
 
-  ReleaseObjects();
+  ReleaseObjects(aProofOfLock);
 }
 
 NS_IMETHODIMP
 FetchStream::OnInputStreamReady(nsIAsyncInputStream* aStream)
 {
+  AssertIsOnOwningThread();
   MOZ_DIAGNOSTIC_ASSERT(aStream);
 
+  MutexAutoLock lock(mMutex);
+
   // Already closed. We have nothing else to do here.
   if (mState == eClosed) {
     return NS_OK;
   }
 
   MOZ_DIAGNOSTIC_ASSERT(mInputStream);
   MOZ_DIAGNOSTIC_ASSERT(mState == eReading || mState == eChecking);
 
@@ -426,128 +400,172 @@ FetchStream::OnInputStreamReady(nsIAsync
   if (NS_SUCCEEDED(rv) && size == 0) {
     // In theory this should not happen. If size is 0, the stream should be
     // considered closed.
     rv = NS_BASE_STREAM_CLOSED;
   }
 
   // No warning for stream closed.
   if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) {
-    ErrorPropagation(cx, stream, rv);
+    ErrorPropagation(cx, lock, stream, rv);
     return NS_OK;
   }
 
   // This extra checking is completed. Let's wait for the next read request.
   if (mState == eChecking) {
     mState = eWaiting;
     return NS_OK;
   }
 
   mState = eWriting;
-  JS::ReadableStreamUpdateDataAvailableFromSource(cx, stream, size);
+
+  {
+    MutexAutoUnlock unlock(mMutex);
+    JS::ReadableStreamUpdateDataAvailableFromSource(cx, stream, size);
+  }
 
   // The WriteInto callback changes mState to eChecking.
   MOZ_DIAGNOSTIC_ASSERT(mState == eChecking);
 
   return NS_OK;
 }
 
 /* static */ nsresult
 FetchStream::RetrieveInputStream(void* aUnderlyingReadableStreamSource,
                                  nsIInputStream** aInputStream)
 {
   MOZ_ASSERT(aUnderlyingReadableStreamSource);
   MOZ_ASSERT(aInputStream);
 
   RefPtr<FetchStream> stream =
     static_cast<FetchStream*>(aUnderlyingReadableStreamSource);
+  stream->AssertIsOnOwningThread();
 
   // if mOriginalInputStream is null, the reading already started. We don't want
   // to expose the internal inputStream.
   if (NS_WARN_IF(!stream->mOriginalInputStream)) {
     return NS_ERROR_DOM_INVALID_STATE_ERR;
   }
 
   nsCOMPtr<nsIInputStream> inputStream = stream->mOriginalInputStream;
   inputStream.forget(aInputStream);
   return NS_OK;
 }
 
 void
 FetchStream::Close()
 {
+  AssertIsOnOwningThread();
+
+  MutexAutoLock lock(mMutex);
+
   if (mState == eClosed) {
     return;
   }
 
   AutoJSAPI jsapi;
   if (NS_WARN_IF(!jsapi.Init(mGlobal))) {
-    ReleaseObjects();
+    ReleaseObjects(lock);
     return;
   }
 
   JSContext* cx = jsapi.cx();
   JS::Rooted<JSObject*> stream(cx, mStreamHolder->ReadableStreamBody());
-  CloseAndReleaseObjects(cx, stream);
+  CloseAndReleaseObjects(cx, lock, stream);
 }
 
 void
-FetchStream::CloseAndReleaseObjects(JSContext* aCx, JS::HandleObject aStream)
+FetchStream::CloseAndReleaseObjects(JSContext* aCx,
+                                    const MutexAutoLock& aProofOfLock,
+                                    JS::HandleObject aStream)
 {
+  AssertIsOnOwningThread();
   MOZ_DIAGNOSTIC_ASSERT(mState != eClosed);
 
-  ReleaseObjects();
+  ReleaseObjects(aProofOfLock);
 
+  MutexAutoUnlock unlock(mMutex);
   if (JS::ReadableStreamIsReadable(aStream)) {
     JS::ReadableStreamClose(aCx, aStream);
   }
 }
 
 void
 FetchStream::ReleaseObjects()
 {
+  MutexAutoLock lock(mMutex);
+  ReleaseObjects(lock);
+}
+
+void
+FetchStream::ReleaseObjects(const MutexAutoLock& aProofOfLock)
+{
+  // This method can be called on 2 possible threads: the owning one and a JS
+  // thread used to release resources. If we are on the JS thread, we need to
+  // dispatch a runnable to go back to the owning thread in order to release
+  // resources correctly.
+
   if (mState == eClosed) {
+    // Already gone. Nothing to do.
     return;
   }
 
   mState = eClosed;
 
-  if (mWorkerHolder) {
-    RefPtr<FetchStreamWorkerHolderShutdown> r =
-      new FetchStreamWorkerHolderShutdown(
-        static_cast<FetchStreamWorkerHolder*>(mWorkerHolder.get())->GetWorkerPrivate(),
-        Move(mWorkerHolder), Move(mGlobal), Move(mStreamHolder));
-    r->Dispatch();
-  } else {
+  if (!NS_IsMainThread() && !IsCurrentThreadRunningWorker()) {
+    // Let's dispatch a WorkerControlRunnable if the owning thread is a worker.
+    if (mWorkerRef) {
+      RefPtr<WorkerShutdown> r =
+        new WorkerShutdown(mWorkerRef->GetUnsafePrivate(), this);
+      Unused << NS_WARN_IF(!r->Dispatch());
+      return;
+    }
+
+    // A normal runnable of the owning thread is the main-thread.
     RefPtr<FetchStream> self = this;
-    RefPtr<Runnable> r = NS_NewRunnableFunction(
-      "FetchStream::ReleaseObjects",
-      [self] () {
-        nsCOMPtr<nsIObserverService> obs = mozilla::services::GetObserverService();
-        if (obs) {
-          obs->RemoveObserver(self, DOM_WINDOW_DESTROYED_TOPIC);
-        }
-        self->mGlobal = nullptr;
+    RefPtr<Runnable> r =
+      NS_NewRunnableFunction("FetchStream::ReleaseObjects",
+                             [self] () { self->ReleaseObjects(); });
+    mOwningEventTarget->Dispatch(r.forget());
+    return;
+  }
+
+  AssertIsOnOwningThread();
 
-        self->mStreamHolder->NullifyStream();
-        self->mStreamHolder = nullptr;
-      });
+  if (NS_IsMainThread()) {
+    nsCOMPtr<nsIObserverService> obs = mozilla::services::GetObserverService();
+    if (obs) {
+      obs->RemoveObserver(this, DOM_WINDOW_DESTROYED_TOPIC);
+    }
+  }
+
+  mWorkerRef = nullptr;
+  mGlobal = nullptr;
 
-    mOwningEventTarget->Dispatch(r.forget());
-  }
+  mStreamHolder->NullifyStream();
+  mStreamHolder = nullptr;
 }
 
+#ifdef DEBUG
+void
+FetchStream::AssertIsOnOwningThread()
+{
+  NS_ASSERT_OWNINGTHREAD(FetchStream);
+}
+#endif
+
 // nsIObserver
 // -----------
 
 NS_IMETHODIMP
 FetchStream::Observe(nsISupports* aSubject, const char* aTopic,
                      const char16_t* aData)
 {
   AssertIsOnMainThread();
+  AssertIsOnOwningThread();
 
   MOZ_ASSERT(strcmp(aTopic, DOM_WINDOW_DESTROYED_TOPIC) == 0);
 
   nsCOMPtr<nsPIDOMWindowInner> window = do_QueryInterface(mGlobal);
   if (SameCOMIdentity(aSubject, window)) {
     Close();
   }
 
--- a/dom/fetch/FetchStream.h
+++ b/dom/fetch/FetchStream.h
@@ -17,17 +17,17 @@
 class nsIGlobalObject;
 
 class nsIInputStream;
 
 namespace mozilla {
 namespace dom {
 
 class FetchStreamHolder;
-class WorkerHolder;
+class WeakWorkerRef;
 
 class FetchStream final : public nsIInputStreamCallback
                         , public nsIObserver
                         , public nsSupportsWeakReference
 {
 public:
   NS_DECL_THREADSAFE_ISUPPORTS
   NS_DECL_NSIINPUTSTREAMCALLBACK
@@ -45,16 +45,24 @@ public:
   RetrieveInputStream(void* aUnderlyingReadableStreamSource,
                       nsIInputStream** aInputStream);
 
 private:
   FetchStream(nsIGlobalObject* aGlobal, FetchStreamHolder* aStreamHolder,
               nsIInputStream* aInputStream);
   ~FetchStream();
 
+#ifdef DEBUG
+  void
+  AssertIsOnOwningThread();
+#else
+  void
+  AssertIsOnOwningThread() {}
+#endif
+
   static void
   RequestDataCallback(JSContext* aCx, JS::HandleObject aStream,
                       void* aUnderlyingSource, uint8_t aFlags,
                       size_t aDesiredSize);
 
   static void
   WriteIntoReadRequestCallback(JSContext* aCx, JS::HandleObject aStream,
                                void* aUnderlyingSource, uint8_t aFlags,
@@ -74,20 +82,29 @@ private:
   ErroredCallback(JSContext* aCx, JS::HandleObject aStream,
                   void* aUnderlyingSource, uint8_t aFlags,
                   JS::HandleValue reason);
 
   static void
   FinalizeCallback(void* aUnderlyingSource, uint8_t aFlags);
 
   void
-  ErrorPropagation(JSContext* aCx, JS::HandleObject aStream, nsresult aRv);
+  ErrorPropagation(JSContext* aCx,
+                   const MutexAutoLock& aProofOfLock,
+                   JS::HandleObject aStream, nsresult aRv);
 
   void
-  CloseAndReleaseObjects(JSContext* aCx, JS::HandleObject aSteam);
+  CloseAndReleaseObjects(JSContext* aCx,
+                         const MutexAutoLock& aProofOfLock,
+                         JS::HandleObject aSteam);
+
+  class WorkerShutdown;
+
+  void
+  ReleaseObjects(const MutexAutoLock& aProofOfLock);
 
   void
   ReleaseObjects();
 
   // Common methods
 
   enum State {
     // This is the beginning state before any reading operation.
@@ -107,28 +124,33 @@ private:
     // check, we go back to eWaiting. If a reading request happens in the
     // meantime, we move to eReading state.
     eChecking,
 
     // Operation completed.
     eClosed,
   };
 
-  // Touched only on the target thread.
+  // We need a mutex because JS engine can release FetchStream on a non-owning
+  // thread. We must be sure that the releasing of resources doesn't trigger
+  // race conditions.
+  Mutex mMutex;
+
+  // Protected by mutex.
   State mState;
 
   nsCOMPtr<nsIGlobalObject> mGlobal;
   RefPtr<FetchStreamHolder> mStreamHolder;
   nsCOMPtr<nsIEventTarget> mOwningEventTarget;
 
   // This is the original inputStream received during the CTOR. It will be
   // converted into an nsIAsyncInputStream and stored into mInputStream at the
   // first use.
   nsCOMPtr<nsIInputStream> mOriginalInputStream;
   nsCOMPtr<nsIAsyncInputStream> mInputStream;
 
-  UniquePtr<WorkerHolder> mWorkerHolder;
+  RefPtr<WeakWorkerRef> mWorkerRef;
 };
 
 } // dom namespace
 } // mozilla namespace
 
 #endif // mozilla_dom_FetchStream_h