Bug 1097823 - Implement MediaTaskQueue::ForceDispatch. r=cpearce, a=sledru
authorBobby Holley <bobbyholley@gmail.com>
Mon, 08 Dec 2014 14:45:37 -0800
changeset 242449 7e9f02cd14540ff45692b5a67a9477b368cf7644
parent 242448 034a07dd81f67adbcd8b188d27e9455c7223fc3e
child 242450 316c9301028ef8b73e5ce90c991ad070eb80e590
push id4311
push userraliiev@mozilla.com
push dateMon, 12 Jan 2015 19:37:41 +0000
treeherdermozilla-beta@150c9fed433b [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerscpearce, sledru
bugs1097823
milestone36.0a2
Bug 1097823 - Implement MediaTaskQueue::ForceDispatch. r=cpearce, a=sledru This is necessary to have strong guarantees that promises will be resolved. While we're flushing the task queue, normal dispatch starts to fail, meaning that we can't dispatch promise resolution. We have 3 options to handle this: (A) Never respond to the promise. (B) Invoke the Resolve/Reject callback synchronously if dispatch fails. (C) Prevent dispatch from failing. (C) seems like the option least likely to violate invariants if we can get away with it. Promise resolution is unlikely to be a heavyweight task in the way that a decode task might be, so this should hopefully be ok. Note that this still doesn't help for bonafide task queue shutdown. It's up to consumers to tear down their MediaPromiseHolders before the task queues are shut down.
dom/media/MediaTaskQueue.cpp
dom/media/MediaTaskQueue.h
--- a/dom/media/MediaTaskQueue.cpp
+++ b/dom/media/MediaTaskQueue.cpp
@@ -30,27 +30,34 @@ MediaTaskQueue::~MediaTaskQueue()
 nsresult
 MediaTaskQueue::Dispatch(TemporaryRef<nsIRunnable> aRunnable)
 {
   MonitorAutoLock mon(mQueueMonitor);
   return DispatchLocked(aRunnable, AbortIfFlushing);
 }
 
 nsresult
+MediaTaskQueue::ForceDispatch(TemporaryRef<nsIRunnable> aRunnable)
+{
+  MonitorAutoLock mon(mQueueMonitor);
+  return DispatchLocked(aRunnable, Forced);
+}
+
+nsresult
 MediaTaskQueue::DispatchLocked(TemporaryRef<nsIRunnable> aRunnable,
                                DispatchMode aMode)
 {
   mQueueMonitor.AssertCurrentThreadOwns();
   if (mIsFlushing && aMode == AbortIfFlushing) {
     return NS_ERROR_ABORT;
   }
   if (mIsShutdown) {
     return NS_ERROR_FAILURE;
   }
-  mTasks.push(aRunnable);
+  mTasks.push(TaskQueueEntry(aRunnable, aMode == Forced));
   if (mIsRunning) {
     return NS_OK;
   }
   RefPtr<nsIRunnable> runner(new Runner(this));
   nsresult rv = mPool->Dispatch(runner, NS_DISPATCH_NORMAL);
   if (NS_FAILED(rv)) {
     NS_WARNING("Failed to dispatch runnable to run MediaTaskQueue");
     return rv;
@@ -135,34 +142,47 @@ MediaTaskQueue::BeginShutdown()
   mon.NotifyAll();
 }
 
 nsresult
 MediaTaskQueue::FlushAndDispatch(TemporaryRef<nsIRunnable> aRunnable)
 {
   MonitorAutoLock mon(mQueueMonitor);
   AutoSetFlushing autoFlush(this);
-  while (!mTasks.empty()) {
-    mTasks.pop();
-  }
+  FlushLocked();
   nsresult rv = DispatchLocked(aRunnable, IgnoreFlushing);
   NS_ENSURE_SUCCESS(rv, rv);
   AwaitIdleLocked();
   return NS_OK;
 }
 
 void
 MediaTaskQueue::Flush()
 {
   MonitorAutoLock mon(mQueueMonitor);
   AutoSetFlushing autoFlush(this);
-  while (!mTasks.empty()) {
+  FlushLocked();
+  AwaitIdleLocked();
+}
+
+void
+MediaTaskQueue::FlushLocked()
+{
+  mQueueMonitor.AssertCurrentThreadOwns();
+  MOZ_ASSERT(mIsFlushing);
+
+  // Clear the tasks, but preserve those with mForceDispatch by re-appending
+  // them to the queue.
+  size_t numTasks = mTasks.size();
+  for (size_t i = 0; i < numTasks; ++i) {
+    if (mTasks.front().mForceDispatch) {
+      mTasks.push(mTasks.front());
+    }
     mTasks.pop();
   }
-  AwaitIdleLocked();
 }
 
 bool
 MediaTaskQueue::IsEmpty()
 {
   MonitorAutoLock mon(mQueueMonitor);
   return mTasks.empty();
 }
@@ -186,17 +206,17 @@ MediaTaskQueue::Runner::Run()
     MonitorAutoLock mon(mQueue->mQueueMonitor);
     MOZ_ASSERT(mQueue->mIsRunning);
     mQueue->mRunningThread = NS_GetCurrentThread();
     if (mQueue->mTasks.size() == 0) {
       mQueue->mIsRunning = false;
       mon.NotifyAll();
       return NS_OK;
     }
-    event = mQueue->mTasks.front();
+    event = mQueue->mTasks.front().mRunnable;
     mQueue->mTasks.pop();
   }
   MOZ_ASSERT(event);
 
   // Note that dropping the queue monitor before running the task, and
   // taking the monitor again after the task has run ensures we have memory
   // fences enforced. This means that if the object we're calling wasn't
   // designed to be threadsafe, it will be, provided we're only calling it
--- a/dom/media/MediaTaskQueue.h
+++ b/dom/media/MediaTaskQueue.h
@@ -29,16 +29,20 @@ class MediaTaskQueue MOZ_FINAL {
 
 public:
   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(MediaTaskQueue)
 
   explicit MediaTaskQueue(TemporaryRef<SharedThreadPool> aPool);
 
   nsresult Dispatch(TemporaryRef<nsIRunnable> aRunnable);
 
+  // This should only be used for things that absolutely can't afford to be
+  // flushed. Normal operations should use Dispatch.
+  nsresult ForceDispatch(TemporaryRef<nsIRunnable> aRunnable);
+
   nsresult SyncDispatch(TemporaryRef<nsIRunnable> aRunnable);
 
   nsresult FlushAndDispatch(TemporaryRef<nsIRunnable> aRunnable);
 
   // Removes all pending tasks from the task queue, and blocks until
   // the currently running task (if any) finishes.
   void Flush();
 
@@ -63,28 +67,37 @@ public:
 
 private:
 
   // Blocks until all task finish executing. Called internally by methods
   // that need to wait until the task queue is idle.
   // mQueueMonitor must be held.
   void AwaitIdleLocked();
 
-  enum DispatchMode { AbortIfFlushing, IgnoreFlushing };
+  enum DispatchMode { AbortIfFlushing, IgnoreFlushing, Forced };
 
   nsresult DispatchLocked(TemporaryRef<nsIRunnable> aRunnable,
                           DispatchMode aMode);
+  void FlushLocked();
 
   RefPtr<SharedThreadPool> mPool;
 
   // Monitor that protects the queue and mIsRunning;
   Monitor mQueueMonitor;
 
+  struct TaskQueueEntry {
+    RefPtr<nsIRunnable> mRunnable;
+    bool mForceDispatch;
+
+    TaskQueueEntry(TemporaryRef<nsIRunnable> aRunnable, bool aForceDispatch = false)
+      : mRunnable(aRunnable), mForceDispatch(aForceDispatch) {}
+  };
+
   // Queue of tasks to run.
-  std::queue<RefPtr<nsIRunnable>> mTasks;
+  std::queue<TaskQueueEntry> mTasks;
 
   // The thread currently running the task queue. We store a reference
   // to this so that IsCurrentThreadIn() can tell if the current thread
   // is the thread currently running in the task queue.
   RefPtr<nsIThread> mRunningThread;
 
   // True if we've dispatched an event to the pool to execute events from
   // the queue.