Bug 1097823 - Implement MediaTaskQueue::ForceDispatch. r=cpearce, a=sledru
This is necessary to have strong guarantees that promises will be resolved.
While we're flushing the task queue, normal dispatch starts to fail,
meaning that we can't dispatch promise resolution. We have 3 options to handle
this:
(A) Never respond to the promise.
(B) Invoke the Resolve/Reject callback synchronously if dispatch fails.
(C) Prevent dispatch from failing.
(C) seems like the option least likely to violate invariants if we can get away
with it. Promise resolution is unlikely to be a heavyweight task in the way that
a decode task might be, so this should hopefully be ok.
Note that this still doesn't help for bonafide task queue shutdown. It's up to
consumers to tear down their MediaPromiseHolders before the task queues are shut
down.
--- a/dom/media/MediaTaskQueue.cpp
+++ b/dom/media/MediaTaskQueue.cpp
@@ -30,27 +30,34 @@ MediaTaskQueue::~MediaTaskQueue()
nsresult
MediaTaskQueue::Dispatch(TemporaryRef<nsIRunnable> aRunnable)
{
MonitorAutoLock mon(mQueueMonitor);
return DispatchLocked(aRunnable, AbortIfFlushing);
}
nsresult
+MediaTaskQueue::ForceDispatch(TemporaryRef<nsIRunnable> aRunnable)
+{
+ MonitorAutoLock mon(mQueueMonitor);
+ return DispatchLocked(aRunnable, Forced);
+}
+
+nsresult
MediaTaskQueue::DispatchLocked(TemporaryRef<nsIRunnable> aRunnable,
DispatchMode aMode)
{
mQueueMonitor.AssertCurrentThreadOwns();
if (mIsFlushing && aMode == AbortIfFlushing) {
return NS_ERROR_ABORT;
}
if (mIsShutdown) {
return NS_ERROR_FAILURE;
}
- mTasks.push(aRunnable);
+ mTasks.push(TaskQueueEntry(aRunnable, aMode == Forced));
if (mIsRunning) {
return NS_OK;
}
RefPtr<nsIRunnable> runner(new Runner(this));
nsresult rv = mPool->Dispatch(runner, NS_DISPATCH_NORMAL);
if (NS_FAILED(rv)) {
NS_WARNING("Failed to dispatch runnable to run MediaTaskQueue");
return rv;
@@ -135,34 +142,47 @@ MediaTaskQueue::BeginShutdown()
mon.NotifyAll();
}
nsresult
MediaTaskQueue::FlushAndDispatch(TemporaryRef<nsIRunnable> aRunnable)
{
MonitorAutoLock mon(mQueueMonitor);
AutoSetFlushing autoFlush(this);
- while (!mTasks.empty()) {
- mTasks.pop();
- }
+ FlushLocked();
nsresult rv = DispatchLocked(aRunnable, IgnoreFlushing);
NS_ENSURE_SUCCESS(rv, rv);
AwaitIdleLocked();
return NS_OK;
}
void
MediaTaskQueue::Flush()
{
MonitorAutoLock mon(mQueueMonitor);
AutoSetFlushing autoFlush(this);
- while (!mTasks.empty()) {
+ FlushLocked();
+ AwaitIdleLocked();
+}
+
+void
+MediaTaskQueue::FlushLocked()
+{
+ mQueueMonitor.AssertCurrentThreadOwns();
+ MOZ_ASSERT(mIsFlushing);
+
+ // Clear the tasks, but preserve those with mForceDispatch by re-appending
+ // them to the queue.
+ size_t numTasks = mTasks.size();
+ for (size_t i = 0; i < numTasks; ++i) {
+ if (mTasks.front().mForceDispatch) {
+ mTasks.push(mTasks.front());
+ }
mTasks.pop();
}
- AwaitIdleLocked();
}
bool
MediaTaskQueue::IsEmpty()
{
MonitorAutoLock mon(mQueueMonitor);
return mTasks.empty();
}
@@ -186,17 +206,17 @@ MediaTaskQueue::Runner::Run()
MonitorAutoLock mon(mQueue->mQueueMonitor);
MOZ_ASSERT(mQueue->mIsRunning);
mQueue->mRunningThread = NS_GetCurrentThread();
if (mQueue->mTasks.size() == 0) {
mQueue->mIsRunning = false;
mon.NotifyAll();
return NS_OK;
}
- event = mQueue->mTasks.front();
+ event = mQueue->mTasks.front().mRunnable;
mQueue->mTasks.pop();
}
MOZ_ASSERT(event);
// Note that dropping the queue monitor before running the task, and
// taking the monitor again after the task has run ensures we have memory
// fences enforced. This means that if the object we're calling wasn't
// designed to be threadsafe, it will be, provided we're only calling it
--- a/dom/media/MediaTaskQueue.h
+++ b/dom/media/MediaTaskQueue.h
@@ -29,16 +29,20 @@ class MediaTaskQueue MOZ_FINAL {
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(MediaTaskQueue)
explicit MediaTaskQueue(TemporaryRef<SharedThreadPool> aPool);
nsresult Dispatch(TemporaryRef<nsIRunnable> aRunnable);
+ // This should only be used for things that absolutely can't afford to be
+ // flushed. Normal operations should use Dispatch.
+ nsresult ForceDispatch(TemporaryRef<nsIRunnable> aRunnable);
+
nsresult SyncDispatch(TemporaryRef<nsIRunnable> aRunnable);
nsresult FlushAndDispatch(TemporaryRef<nsIRunnable> aRunnable);
// Removes all pending tasks from the task queue, and blocks until
// the currently running task (if any) finishes.
void Flush();
@@ -63,28 +67,37 @@ public:
private:
// Blocks until all task finish executing. Called internally by methods
// that need to wait until the task queue is idle.
// mQueueMonitor must be held.
void AwaitIdleLocked();
- enum DispatchMode { AbortIfFlushing, IgnoreFlushing };
+ enum DispatchMode { AbortIfFlushing, IgnoreFlushing, Forced };
nsresult DispatchLocked(TemporaryRef<nsIRunnable> aRunnable,
DispatchMode aMode);
+ void FlushLocked();
RefPtr<SharedThreadPool> mPool;
// Monitor that protects the queue and mIsRunning;
Monitor mQueueMonitor;
+ struct TaskQueueEntry {
+ RefPtr<nsIRunnable> mRunnable;
+ bool mForceDispatch;
+
+ TaskQueueEntry(TemporaryRef<nsIRunnable> aRunnable, bool aForceDispatch = false)
+ : mRunnable(aRunnable), mForceDispatch(aForceDispatch) {}
+ };
+
// Queue of tasks to run.
- std::queue<RefPtr<nsIRunnable>> mTasks;
+ std::queue<TaskQueueEntry> mTasks;
// The thread currently running the task queue. We store a reference
// to this so that IsCurrentThreadIn() can tell if the current thread
// is the thread currently running in the task queue.
RefPtr<nsIThread> mRunningThread;
// True if we've dispatched an event to the pool to execute events from
// the queue.