Bug 1151656 - Make MediaPromises operate with TaskDispatchers. r=mattwoodrow
authorBobby Holley <bobbyholley@gmail.com>
Thu, 02 Apr 2015 16:47:35 -0700
changeset 268148 8b257d9772718afe34a7fd8896e48cd28b298182
parent 268147 9b43eca6d969d21e1bb72a9716fa8096f79093f8
child 268149 8c22476cabcae21cb93d8680f3663009f65843a6
push id4830
push userjlund@mozilla.com
push dateMon, 29 Jun 2015 20:18:48 +0000
treeherdermozilla-beta@4c2175bb0420 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersmattwoodrow
bugs1151656
milestone40.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1151656 - Make MediaPromises operate with TaskDispatchers. r=mattwoodrow
dom/media/MediaPromise.h
dom/media/MediaTaskQueue.cpp
dom/media/MediaTaskQueue.h
dom/media/TaskDispatcher.h
--- a/dom/media/MediaPromise.h
+++ b/dom/media/MediaPromise.h
@@ -5,16 +5,17 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #if !defined(MediaPromise_h_)
 #define MediaPromise_h_
 
 #include "prlog.h"
 
 #include "AbstractThread.h"
+#include "TaskDispatcher.h"
 
 #include "nsTArray.h"
 #include "nsThreadUtils.h"
 
 #include "mozilla/DebugOnly.h"
 #include "mozilla/Maybe.h"
 #include "mozilla/Mutex.h"
 #include "mozilla/Monitor.h"
@@ -69,28 +70,30 @@ public:
   // interface (upon which the creator of the promise may invoke Resolve() or
   // Reject()). APIs should create and store a MediaPromise::Private (usually
   // via a MediaPromiseHolder), and return a MediaPromise to consumers.
   //
   // NB: We can include the definition of this class inline once B2G ICS is gone.
   class Private;
 
   static nsRefPtr<MediaPromise>
-  CreateAndResolve(ResolveValueType aResolveValue, const char* aResolveSite)
+  CreateAndResolve(ResolveValueType aResolveValue, const char* aResolveSite,
+                   TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
   {
     nsRefPtr<typename MediaPromise::Private> p = new MediaPromise::Private(aResolveSite);
-    p->Resolve(aResolveValue, aResolveSite);
+    p->Resolve(aResolveValue, aResolveSite, aDispatcher);
     return Move(p);
   }
 
   static nsRefPtr<MediaPromise>
-  CreateAndReject(RejectValueType aRejectValue, const char* aRejectSite)
+  CreateAndReject(RejectValueType aRejectValue, const char* aRejectSite,
+                  TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
   {
     nsRefPtr<typename MediaPromise::Private> p = new MediaPromise::Private(aRejectSite);
-    p->Reject(aRejectValue, aRejectSite);
+    p->Reject(aRejectValue, aRejectSite, aDispatcher);
     return Move(p);
   }
 
   class Consumer
   {
   public:
     NS_INLINE_DECL_THREADSAFE_REFCOUNTING(Consumer)
 
@@ -166,17 +169,18 @@ protected:
 
     private:
       nsRefPtr<ThenValueBase> mThenValue;
       RejectValueType mRejectValue;
     };
 
     explicit ThenValueBase(const char* aCallSite) : mCallSite(aCallSite) {}
 
-    virtual void Dispatch(MediaPromise *aPromise) = 0;
+    virtual void Dispatch(MediaPromise *aPromise,
+                          TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>()) = 0;
 
   protected:
     virtual void DoResolve(ResolveValueType aResolveValue) = 0;
     virtual void DoReject(RejectValueType aRejectValue) = 0;
 
     const char* mCallSite;
   };
 
@@ -214,37 +218,33 @@ protected:
               ResolveMethodType aResolveMethod, RejectMethodType aRejectMethod,
               const char* aCallSite)
       : ThenValueBase(aCallSite)
       , mResponseTarget(aResponseTarget)
       , mThisVal(aThisVal)
       , mResolveMethod(aResolveMethod)
       , mRejectMethod(aRejectMethod) {}
 
-    void Dispatch(MediaPromise *aPromise) override
+    void Dispatch(MediaPromise *aPromise, TaskDispatcher& aDispatcher) override
     {
       aPromise->mMutex.AssertCurrentThreadOwns();
       MOZ_ASSERT(!aPromise->IsPending());
       bool resolved = aPromise->mResolveValue.isSome();
       nsRefPtr<nsRunnable> runnable =
         resolved ? static_cast<nsRunnable*>(new (typename ThenValueBase::ResolveRunnable)(this, aPromise->mResolveValue.ref()))
                  : static_cast<nsRunnable*>(new (typename ThenValueBase::RejectRunnable)(this, aPromise->mRejectValue.ref()));
       PROMISE_LOG("%s Then() call made from %s [Runnable=%p, Promise=%p, ThenValue=%p]",
                   resolved ? "Resolving" : "Rejecting", ThenValueBase::mCallSite,
                   runnable.get(), aPromise, this);
-      nsresult rv = mResponseTarget->Dispatch(runnable.forget());
 
-      // NB: mDisconnected is only supposed to be accessed on the dispatch
-      // thread. However, we require the consumer to have disconnected any
-      // oustanding promise requests _before_ initiating shutdown on the
-      // thread or task queue. So the only non-buggy scenario for dispatch
-      // failing involves the target thread being unable to manipulate the
-      // ThenValue (since it's been disconnected), so it's safe to read here.
-      MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv) || Consumer::mDisconnected);
-      unused << rv;
+      // Promise consumers are allowed to disconnect the Consumer object and
+      // then shut down the thread or task queue that the promise result would
+      // be dispatched on. So we unfortunately can't assert that promise
+      // dispatch succeeds. :-(
+      aDispatcher.AddTask(mResponseTarget, runnable.forget(), /* aAssertDispatchSuccess = */ false);
     }
 
 #ifdef DEBUG
   void AssertOnDispatchThread()
   {
     MOZ_ASSERT(mResponseTarget->IsCurrentThreadIn());
   }
 #else
@@ -303,81 +303,94 @@ protected:
     nsRefPtr<ThisType> mThisVal; // Only accessed and refcounted on dispatch thread.
     ResolveMethodType mResolveMethod;
     RejectMethodType mRejectMethod;
   };
 public:
 
   template<typename ThisType, typename ResolveMethodType, typename RejectMethodType>
   already_AddRefed<Consumer> RefableThen(AbstractThread* aResponseThread, const char* aCallSite, ThisType* aThisVal,
-                                         ResolveMethodType aResolveMethod, RejectMethodType aRejectMethod)
+                                         ResolveMethodType aResolveMethod, RejectMethodType aRejectMethod,
+                                         TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
   {
     MutexAutoLock lock(mMutex);
+
+    // {Refable,}Then() rarely dispatch directly - they do so only in the case
+    // where the promise has already been resolved by the time {Refable,}Then()
+    // is invoked. This case is rare, but it _can_ happen, which makes it a ripe
+    // target for race bugs. So we do an extra assertion here to make sure our
+    // caller is using tail dispatch correctly no matter what, rather than
+    // relying on the assertion in Dispatch(), which may be called extremely
+    // infrequently.
+    aDispatcher.AssertIsTailDispatcherIfRequired();
+
     MOZ_DIAGNOSTIC_ASSERT(!IsExclusive || !mHaveConsumer);
     mHaveConsumer = true;
     nsRefPtr<ThenValueBase> thenValue = new ThenValue<ThisType, ResolveMethodType, RejectMethodType>(
                                               aResponseThread, aThisVal, aResolveMethod, aRejectMethod, aCallSite);
     PROMISE_LOG("%s invoking Then() [this=%p, thenValue=%p, aThisVal=%p, isPending=%d]",
                 aCallSite, this, thenValue.get(), aThisVal, (int) IsPending());
     if (!IsPending()) {
-      thenValue->Dispatch(this);
+      thenValue->Dispatch(this, aDispatcher);
     } else {
       mThenValues.AppendElement(thenValue);
     }
 
     return thenValue.forget();
   }
 
   template<typename ThisType, typename ResolveMethodType, typename RejectMethodType>
   void Then(AbstractThread* aResponseThread, const char* aCallSite, ThisType* aThisVal,
-            ResolveMethodType aResolveMethod, RejectMethodType aRejectMethod)
+            ResolveMethodType aResolveMethod, RejectMethodType aRejectMethod,
+            TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
   {
     nsRefPtr<Consumer> c =
-      RefableThen(aResponseThread, aCallSite, aThisVal, aResolveMethod, aRejectMethod);
+      RefableThen(aResponseThread, aCallSite, aThisVal, aResolveMethod, aRejectMethod, aDispatcher);
     return;
   }
 
-  void ChainTo(already_AddRefed<Private> aChainedPromise, const char* aCallSite)
+  void ChainTo(already_AddRefed<Private> aChainedPromise, const char* aCallSite,
+               TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
   {
     MutexAutoLock lock(mMutex);
     MOZ_DIAGNOSTIC_ASSERT(!IsExclusive || !mHaveConsumer);
     mHaveConsumer = true;
     nsRefPtr<Private> chainedPromise = aChainedPromise;
     PROMISE_LOG("%s invoking Chain() [this=%p, chainedPromise=%p, isPending=%d]",
                 aCallSite, this, chainedPromise.get(), (int) IsPending());
     if (!IsPending()) {
-      ForwardTo(chainedPromise);
+      ForwardTo(chainedPromise, aDispatcher);
     } else {
       mChainedPromises.AppendElement(chainedPromise);
     }
   }
 
 protected:
   bool IsPending() { return mResolveValue.isNothing() && mRejectValue.isNothing(); }
-  void DispatchAll()
+  void DispatchAll(TaskDispatcher& aDispatcher)
   {
     mMutex.AssertCurrentThreadOwns();
     for (size_t i = 0; i < mThenValues.Length(); ++i) {
-      mThenValues[i]->Dispatch(this);
+      mThenValues[i]->Dispatch(this, aDispatcher);
     }
     mThenValues.Clear();
 
     for (size_t i = 0; i < mChainedPromises.Length(); ++i) {
-      ForwardTo(mChainedPromises[i]);
+      ForwardTo(mChainedPromises[i], aDispatcher);
     }
     mChainedPromises.Clear();
   }
 
-  void ForwardTo(Private* aOther)
+  void ForwardTo(Private* aOther, TaskDispatcher& aDispatcher)
   {
     MOZ_ASSERT(!IsPending());
     if (mResolveValue.isSome()) {
-      aOther->Resolve(mResolveValue.ref(), "<chained promise>");
+      aOther->Resolve(mResolveValue.ref(), "<chained promise>", aDispatcher);
     } else {
-      aOther->Reject(mRejectValue.ref(), "<chained promise>");
+      aOther->Reject(mRejectValue.ref(), "<chained promise>", aDispatcher);
     }
   }
 
   ~MediaPromise()
   {
     PROMISE_LOG("MediaPromise::~MediaPromise [this=%p]", this);
     MOZ_ASSERT(!IsPending());
     MOZ_ASSERT(mThenValues.IsEmpty());
@@ -395,32 +408,34 @@ protected:
 
 template<typename ResolveValueT, typename RejectValueT, bool IsExclusive>
 class MediaPromise<ResolveValueT, RejectValueT, IsExclusive>::Private
   : public MediaPromise<ResolveValueT, RejectValueT, IsExclusive>
 {
 public:
   explicit Private(const char* aCreationSite) : MediaPromise(aCreationSite) {}
 
-  void Resolve(ResolveValueT aResolveValue, const char* aResolveSite)
+  void Resolve(ResolveValueT aResolveValue, const char* aResolveSite,
+               TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
   {
     MutexAutoLock lock(mMutex);
     MOZ_ASSERT(IsPending());
     PROMISE_LOG("%s resolving MediaPromise (%p created at %s)", aResolveSite, this, mCreationSite);
     mResolveValue.emplace(aResolveValue);
-    DispatchAll();
+    DispatchAll(aDispatcher);
   }
 
-  void Reject(RejectValueT aRejectValue, const char* aRejectSite)
+  void Reject(RejectValueT aRejectValue, const char* aRejectSite,
+              TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
   {
     MutexAutoLock lock(mMutex);
     MOZ_ASSERT(IsPending());
     PROMISE_LOG("%s rejecting MediaPromise (%p created at %s)", aRejectSite, this, mCreationSite);
     mRejectValue.emplace(aRejectValue);
-    DispatchAll();
+    DispatchAll(aDispatcher);
   }
 };
 
 /*
  * Class to encapsulate a promise for a particular role. Use this as the member
  * variable for a class whose method returns a promise.
  */
 template<typename PromiseType>
@@ -471,50 +486,56 @@ public:
     }
 
     nsRefPtr<typename PromiseType::Private> p = mPromise;
     mPromise = nullptr;
     return p.forget();
   }
 
   void Resolve(typename PromiseType::ResolveValueType aResolveValue,
-               const char* aMethodName)
+               const char* aMethodName,
+               TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
   {
     if (mMonitor) {
       mMonitor->AssertCurrentThreadOwns();
     }
     MOZ_ASSERT(mPromise);
-    mPromise->Resolve(aResolveValue, aMethodName);
+    mPromise->Resolve(aResolveValue, aMethodName, aDispatcher);
     mPromise = nullptr;
   }
 
+
   void ResolveIfExists(typename PromiseType::ResolveValueType aResolveValue,
-                       const char* aMethodName)
+                       const char* aMethodName,
+                       TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
   {
     if (!IsEmpty()) {
-      Resolve(aResolveValue, aMethodName);
+      Resolve(aResolveValue, aMethodName, aDispatcher);
     }
   }
 
   void Reject(typename PromiseType::RejectValueType aRejectValue,
-              const char* aMethodName)
+              const char* aMethodName,
+              TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
   {
     if (mMonitor) {
       mMonitor->AssertCurrentThreadOwns();
     }
     MOZ_ASSERT(mPromise);
-    mPromise->Reject(aRejectValue, aMethodName);
+    mPromise->Reject(aRejectValue, aMethodName, aDispatcher);
     mPromise = nullptr;
   }
 
+
   void RejectIfExists(typename PromiseType::RejectValueType aRejectValue,
-                      const char* aMethodName)
+                      const char* aMethodName,
+                      TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
   {
     if (!IsEmpty()) {
-      Reject(aRejectValue, aMethodName);
+      Reject(aRejectValue, aMethodName, aDispatcher);
     }
   }
 
 private:
   Monitor* mMonitor;
   nsRefPtr<typename PromiseType::Private> mPromise;
 };
 
@@ -640,55 +661,57 @@ public:
 
 private:
   nsRefPtr<typename PromiseType::Private> mProxyPromise;
   nsAutoPtr<MethodCallBase<PromiseType>> mMethodCall;
 };
 
 template<typename PromiseType>
 static nsRefPtr<PromiseType>
-ProxyInternal(AbstractThread* aTarget, MethodCallBase<PromiseType>* aMethodCall, const char* aCallerName)
+ProxyInternal(AbstractThread* aTarget, MethodCallBase<PromiseType>* aMethodCall, const char* aCallerName,
+              TaskDispatcher& aDispatcher)
 {
   nsRefPtr<typename PromiseType::Private> p = new (typename PromiseType::Private)(aCallerName);
   nsRefPtr<ProxyRunnable<PromiseType>> r = new ProxyRunnable<PromiseType>(p, aMethodCall);
-  nsresult rv = aTarget->Dispatch(r.forget());
-  MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
-  unused << rv;
+  aDispatcher.AddTask(aTarget, r.forget());
   return Move(p);
 }
 
 } // namespace detail
 
 template<typename PromiseType, typename ThisType>
 static nsRefPtr<PromiseType>
 ProxyMediaCall(AbstractThread* aTarget, ThisType* aThisVal, const char* aCallerName,
-               nsRefPtr<PromiseType>(ThisType::*aMethod)())
+               nsRefPtr<PromiseType>(ThisType::*aMethod)(),
+               TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
 {
   typedef detail::MethodCallWithNoArgs<PromiseType, ThisType> MethodCallType;
   MethodCallType* methodCall = new MethodCallType(aThisVal, aMethod);
-  return detail::ProxyInternal(aTarget, methodCall, aCallerName);
+  return detail::ProxyInternal(aTarget, methodCall, aCallerName, aDispatcher);
 }
 
 template<typename PromiseType, typename ThisType, typename Arg1Type>
 static nsRefPtr<PromiseType>
 ProxyMediaCall(AbstractThread* aTarget, ThisType* aThisVal, const char* aCallerName,
-               nsRefPtr<PromiseType>(ThisType::*aMethod)(Arg1Type), Arg1Type aArg1)
+               nsRefPtr<PromiseType>(ThisType::*aMethod)(Arg1Type), Arg1Type aArg1,
+               TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
 {
   typedef detail::MethodCallWithOneArg<PromiseType, ThisType, Arg1Type> MethodCallType;
   MethodCallType* methodCall = new MethodCallType(aThisVal, aMethod, aArg1);
-  return detail::ProxyInternal(aTarget, methodCall, aCallerName);
+  return detail::ProxyInternal(aTarget, methodCall, aCallerName, aDispatcher);
 }
 
 template<typename PromiseType, typename ThisType, typename Arg1Type, typename Arg2Type>
 static nsRefPtr<PromiseType>
 ProxyMediaCall(AbstractThread* aTarget, ThisType* aThisVal, const char* aCallerName,
-               nsRefPtr<PromiseType>(ThisType::*aMethod)(Arg1Type, Arg2Type), Arg1Type aArg1, Arg2Type aArg2)
+               nsRefPtr<PromiseType>(ThisType::*aMethod)(Arg1Type, Arg2Type), Arg1Type aArg1, Arg2Type aArg2,
+               TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
 {
   typedef detail::MethodCallWithTwoArgs<PromiseType, ThisType, Arg1Type, Arg2Type> MethodCallType;
   MethodCallType* methodCall = new MethodCallType(aThisVal, aMethod, aArg1, aArg2);
-  return detail::ProxyInternal(aTarget, methodCall, aCallerName);
+  return detail::ProxyInternal(aTarget, methodCall, aCallerName, aDispatcher);
 }
 
 #undef PROMISE_LOG
 
 } // namespace mozilla
 
 #endif
--- a/dom/media/MediaTaskQueue.cpp
+++ b/dom/media/MediaTaskQueue.cpp
@@ -292,9 +292,24 @@ MediaTaskQueue::Runner::Run()
       mon.NotifyAll();
     }
     mQueue->mRunningThread = nullptr;
   }
 
   return NS_OK;
 }
 
+#ifdef DEBUG
+void
+TaskDispatcher::AssertIsTailDispatcherIfRequired()
+{
+  MediaTaskQueue* currentQueue = MediaTaskQueue::GetCurrentQueue();
+
+  // NB: Make sure not to use the TailDispatcher() accessor, since that
+  // asserts IsCurrentThreadIn(), which acquires the queue monitor, which
+  // triggers a deadlock during shutdown between the queue monitor and the
+  // MediaPromise monitor.
+  MOZ_ASSERT_IF(currentQueue && currentQueue->RequiresTailDispatch(),
+                this == currentQueue->mTailDispatcher);
+}
+#endif
+
 } // namespace mozilla
--- a/dom/media/MediaTaskQueue.h
+++ b/dom/media/MediaTaskQueue.h
@@ -164,16 +164,17 @@ protected:
       sCurrentQueueTLS.set(nullptr);
       mQueue->mTailDispatcher = nullptr;
     }
 
   private:
   MediaTaskQueue* mQueue;
   };
 
+  friend class TaskDispatcher;
   TaskDispatcher* mTailDispatcher;
 
   // True if we've dispatched an event to the pool to execute events from
   // the queue.
   bool mIsRunning;
 
   // True if we've started our shutdown process.
   bool mIsShutdown;
--- a/dom/media/TaskDispatcher.h
+++ b/dom/media/TaskDispatcher.h
@@ -39,16 +39,22 @@ public:
   TaskDispatcher() {}
   virtual ~TaskDispatcher() {}
 
   virtual void AddStateChangeTask(AbstractThread* aThread,
                                   already_AddRefed<nsIRunnable> aRunnable) = 0;
   virtual void AddTask(AbstractThread* aThread,
                        already_AddRefed<nsIRunnable> aRunnable,
                        bool aAssertDispatchSuccess = true) = 0;
+
+#ifdef DEBUG
+  void AssertIsTailDispatcherIfRequired();
+#else
+  void AssertIsTailDispatcherIfRequired() {}
+#endif
 };
 
 /*
  * AutoTaskDispatcher is a stack-scoped TaskDispatcher implementation that fires
  * its queued tasks when it is popped off the stack.
  */
 class MOZ_STACK_CLASS AutoTaskDispatcher : public TaskDispatcher
 {
@@ -137,11 +143,23 @@ private:
     mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread));
     return *mTaskGroups.LastElement();
   }
 
   // Task groups, organized by thread.
   nsTArray<UniquePtr<PerThreadTaskGroup>> mTaskGroups;
 };
 
+// Little utility class to allow declaring AutoTaskDispatcher as a default
+// parameter for methods that take a TaskDispatcher&.
+template<typename T>
+class PassByRef
+{
+public:
+  PassByRef() {}
+  operator T&() { return mVal; }
+private:
+  T mVal;
+};
+
 } // namespace mozilla
 
 #endif