Bug 1499534: Add Pause, Resume, and IsPaused methods to ThrottledEventQueue. r=froydnj
authorJim Blandy <jimb@mozilla.com>
Tue, 23 Oct 2018 06:21:10 +0000
changeset 490849 dfa1eb1d036fc0b11a449627c0a69702b3f6309f
parent 490848 8ad2b19be6b9f41fde82dc95a9ae71bced32e64c
child 490850 6e96c7ec0d1187c1b488dd4ba645df9cfd68ec16
child 490871 88c9e1c85918410a8d6bd10c25b4e1b17b5b58fc
push id247
push userfmarier@mozilla.com
push dateSat, 27 Oct 2018 01:06:44 +0000
reviewersfroydnj
bugs1499534
milestone65.0a1
Bug 1499534: Add Pause, Resume, and IsPaused methods to ThrottledEventQueue. r=froydnj Depends on D8913 Differential Revision: https://phabricator.services.mozilla.com/D8914
xpcom/tests/gtest/TestThrottledEventQueue.cpp
xpcom/threads/ThrottledEventQueue.cpp
xpcom/threads/ThrottledEventQueue.h
--- a/xpcom/tests/gtest/TestThrottledEventQueue.cpp
+++ b/xpcom/tests/gtest/TestThrottledEventQueue.cpp
@@ -402,8 +402,208 @@ TEST(ThrottledEventQueue, AwaitIdleMixed
     ASSERT_TRUE(log == "(ab" || log == "(a)b" || log == "(ab)");
     while (!threadFinished)
       cond.Wait();
     ASSERT_TRUE(log == "(a)b" || log == "(ab)");
   }
 
   ASSERT_TRUE(NS_SUCCEEDED(thread->Shutdown()));
 }
+
+TEST(ThrottledEventQueue, SimplePauseResume)
+{
+  string log;
+
+  auto base = MakeRefPtr<RunnableQueue>();
+  RefPtr<ThrottledEventQueue> throttled = ThrottledEventQueue::Create(base);
+
+  ASSERT_FALSE(throttled->IsPaused());
+
+  Enqueue(throttled, [&]() { log += 'a'; });
+
+  ASSERT_EQ(log, "");
+  ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
+  ASSERT_EQ(log, "a");
+
+  ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true)));
+  ASSERT_TRUE(throttled->IsPaused());
+
+  Enqueue(throttled, [&]() { log += 'b'; });
+
+  ASSERT_EQ(log, "a");
+  ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
+  ASSERT_EQ(log, "a");
+
+  ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false)));
+  ASSERT_FALSE(throttled->IsPaused());
+
+  ASSERT_EQ(log, "a");
+  ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
+  ASSERT_EQ(log, "ab");
+
+  ASSERT_TRUE(base->IsEmpty());
+  ASSERT_TRUE(throttled->IsEmpty());
+}
+
+TEST(ThrottledEventQueue, MixedPauseResume)
+{
+  string log;
+
+  auto base = MakeRefPtr<RunnableQueue>();
+  RefPtr<ThrottledEventQueue> throttled = ThrottledEventQueue::Create(base);
+
+  ASSERT_FALSE(throttled->IsPaused());
+
+  Enqueue(base, [&]() { log += 'A'; });
+  Enqueue(throttled, [&]() {
+      log += 'b';
+      MOZ_ALWAYS_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true)));
+    });
+  Enqueue(throttled, [&]() { log += 'c'; });
+  Enqueue(base, [&]() { log += 'D'; });
+
+  ASSERT_EQ(log, "");
+  ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
+  // Since the 'b' event paused the throttled queue, 'c' should not have run.
+  // but 'D' was enqueued directly on the base, and should have run.
+  ASSERT_EQ(log, "AbD");
+  ASSERT_TRUE(base->IsEmpty());
+  ASSERT_FALSE(throttled->IsEmpty());
+  ASSERT_TRUE(throttled->IsPaused());
+
+  Enqueue(base, [&]() { log += 'E'; });
+  ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false)));
+  Enqueue(base, [&]() { log += 'F'; });
+  ASSERT_FALSE(throttled->IsPaused());
+
+  ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
+  // Since we've unpaused, 'c' should be able to run now. The executor should have
+  // been enqueued between 'E' and 'F'.
+  ASSERT_EQ(log, "AbDEcF");
+
+  ASSERT_TRUE(base->IsEmpty());
+  ASSERT_TRUE(throttled->IsEmpty());
+}
+
+TEST(ThrottledEventQueue, AwaitIdlePaused)
+{
+  Mutex mutex("AwaitIdlePaused");
+  CondVar cond(mutex, "AwaitIdlePaused");
+
+  string dequeue_await;          // mutex
+  bool threadFinished = false;   // mutex & cond
+  bool runnableFinished = false; // main thread only
+
+  auto base = MakeRefPtr<RunnableQueue>();
+  RefPtr<ThrottledEventQueue> throttled = ThrottledEventQueue::Create(base);
+
+  ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true)));
+
+  // Put an event in the queue so the AwaitIdle might block. Since throttled is
+  // paused, this should not enqueue an executor in the base target.
+  Enqueue(throttled, [&]() { runnableFinished = true; });
+  ASSERT_TRUE(base->IsEmpty());
+
+  // Create a separate thread that waits for the queue to become idle, and
+  // then takes observable action.
+  nsCOMPtr<nsIRunnable> await =
+    NS_NewRunnableFunction(
+      "AwaitIdlePaused",
+      [&]() {
+        throttled->AwaitIdle();
+        MutexAutoLock lock(mutex);
+        dequeue_await += " await";
+        threadFinished = true;
+        cond.Notify();
+      });
+
+  nsCOMPtr<nsIThread> thread;
+  nsresult rv = NS_NewNamedThread("AwaitIdlePaused", getter_AddRefs(thread),
+                                  await);
+  ASSERT_TRUE(NS_SUCCEEDED(rv));
+
+  // We can't guarantee that the thread has reached the AwaitIdle call, but we
+  // can get pretty close. Either way, it shouldn't affect the behavior of the
+  // test.
+  PR_Sleep(PR_MillisecondsToInterval(100));
+
+  // The AwaitIdle call should be blocked, even though there is no executor,
+  // because throttled is paused.
+  {
+    MutexAutoLock lock(mutex);
+    ASSERT_EQ(dequeue_await, "");
+    dequeue_await += "dequeue";
+    ASSERT_FALSE(threadFinished);
+  }
+
+  // A paused TEQ contributes no events to its base target. (This is covered by
+  // other tests...)
+  ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
+  ASSERT_TRUE(base->IsEmpty());
+  ASSERT_FALSE(throttled->IsEmpty());
+
+  // Resume and drain the queue.
+  ASSERT_FALSE(runnableFinished);
+  ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false)));
+  ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
+  ASSERT_TRUE(base->IsEmpty());
+  ASSERT_TRUE(throttled->IsEmpty());
+  ASSERT_TRUE(runnableFinished);
+
+  // Wait for the thread to finish.
+  {
+    MutexAutoLock lock(mutex);
+    while (!threadFinished)
+      cond.Wait();
+    ASSERT_EQ(dequeue_await, "dequeue await");
+  }
+
+  ASSERT_TRUE(NS_SUCCEEDED(thread->Shutdown()));
+}
+
+TEST(ThrottledEventQueue, ExecutorTransitions)
+{
+  string log;
+
+  auto base = MakeRefPtr<RunnableQueue>();
+  RefPtr<ThrottledEventQueue> throttled = ThrottledEventQueue::Create(base);
+
+  ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true)));
+
+  // Since we're paused, queueing an event on throttled shouldn't queue the
+  // executor on the base target.
+  Enqueue(throttled, [&]() { log += 'a'; });
+  ASSERT_EQ(throttled->Length(), 1U);
+  ASSERT_EQ(base->Length(), 0U);
+
+  // Resuming throttled should create the executor, since throttled is not empty.
+  ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false)));
+  ASSERT_EQ(throttled->Length(), 1U);
+  ASSERT_EQ(base->Length(), 1U);
+
+  // Pausing can't remove the executor from the base target since we've already
+  // queued it there, but it can ensure that it doesn't do anything.
+  ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true)));
+
+  ASSERT_EQ(log, "");
+  ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
+  ASSERT_EQ(log, "");
+  ASSERT_EQ(throttled->Length(), 1U);
+  ASSERT_EQ(base->Length(), 0U);
+
+  // As before, resuming must create the executor, since throttled is not empty.
+  ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false)));
+  ASSERT_EQ(throttled->Length(), 1U);
+  ASSERT_EQ(base->Length(), 1U);
+
+  ASSERT_EQ(log, "");
+  ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
+  ASSERT_EQ(log, "a");
+  ASSERT_EQ(throttled->Length(), 0U);
+  ASSERT_EQ(base->Length(), 0U);
+
+  // Since throttled is empty, pausing and resuming now should not enqueue an
+  // executor.
+  ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true)));
+  ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false)));
+  ASSERT_EQ(throttled->Length(), 0U);
+  ASSERT_EQ(base->Length(), 0U);
+}
--- a/xpcom/threads/ThrottledEventQueue.cpp
+++ b/xpcom/threads/ThrottledEventQueue.cpp
@@ -41,16 +41,28 @@ namespace {
 // it draws the next event from Inner's queue and runs it. If that queue has
 // more events, the executor is dispatched to the base again.
 //
 // The executor holds a strong reference to the Inner object. This means that if
 // the outer object is dereferenced and destroyed, the Inner object will remain
 // live for as long as the executor exists - that is, until the Inner's queue is
 // empty.
 //
+// A Paused ThrottledEventQueue does not enqueue an executor when new events are
+// added. Any executor previously queued on the base event target draws no
+// events from a Paused ThrottledEventQueue, and returns without re-enqueueing
+// itself. Since there is no executor keeping the Inner object alive until its
+// queue is empty, dropping a Paused ThrottledEventQueue may drop the Inner
+// while it still owns events. This is the correct behavior: if there are no
+// references to it, it will never be Resumed, and thus it will never dispatch
+// events again.
+//
+// Resuming a ThrottledEventQueue must dispatch an executor, so calls to Resume
+// are fallible for the same reasons as calls to Dispatch.
+//
 // The xpcom shutdown process drains the main thread's event queue several
 // times, so if a ThrottledEventQueue is being driven by the main thread, it
 // should get emptied out by the time we reach the "eventq shutdown" phase.
 class ThrottledEventQueue::Inner final : public nsISupports
 {
   // The runnable which is dispatched to the underlying base target.  Since
   // we only execute one event at a time we just re-use a single instance
   // of this class while there are events left in the queue.
@@ -81,44 +93,82 @@ class ThrottledEventQueue::Inner final :
     }
 #endif
   };
 
   mutable Mutex mMutex;
   mutable CondVar mIdleCondVar;
 
   // As-of-yet unexecuted runnables queued on this ThrottledEventQueue.
-  // (Used from any thread, protected by mMutex.)
+  //
+  // Used from any thread; protected by mMutex. Signals mIdleCondVar when
+  // emptied.
   EventQueue mEventQueue;
 
   // The event target we dispatch our events (actually, just our Executor) to.
-  // (Written during construction on main thread; read by any thread.)
+  //
+  // Written only during construction. Readable by any thread without locking.
   nsCOMPtr<nsISerialEventTarget> mBaseTarget;
 
   // The Executor that we dispatch to mBaseTarget to draw runnables from our
   // queue. mExecutor->mInner points to this Inner, forming a reference loop.
-  // (Used from any thread, protected by mMutex.)
+  //
+  // Used from any thread; protected by mMutex.
   nsCOMPtr<nsIRunnable> mExecutor;
 
+  // True if this queue is currently paused.
+  // Used from any thread; protected by mMutex.
+  bool mIsPaused;
+
   explicit Inner(nsISerialEventTarget* aBaseTarget)
     : mMutex("ThrottledEventQueue")
     , mIdleCondVar(mMutex, "ThrottledEventQueue:Idle")
     , mBaseTarget(aBaseTarget)
+    , mIsPaused(false)
   {
   }
 
   ~Inner()
   {
 #ifdef DEBUG
     MutexAutoLock lock(mMutex);
+
+    // As long as an executor exists, it had better keep us alive, since it's
+    // going to call ExecuteRunnable on us.
     MOZ_ASSERT(!mExecutor);
-    MOZ_ASSERT(mEventQueue.IsEmpty(lock));
+
+    // If we have any events in our queue, there should be an executor queued
+    // for them, and that should have kept us alive. The exception is that, if
+    // we're paused, we don't enqueue an executor.
+    MOZ_ASSERT(mEventQueue.IsEmpty(lock) || IsPaused(lock));
+
+    // Some runnables are only safe to drop on the main thread, so if our queue
+    // isn't empty, we'd better be on the main thread.
+    MOZ_ASSERT_IF(!mEventQueue.IsEmpty(lock), NS_IsMainThread());
 #endif
   }
 
+  // Make sure an executor has been queued on our base target. If we already
+  // have one, do nothing; otherwise, create and dispatch it.
+  nsresult EnsureExecutor(MutexAutoLock& lock) {
+    if (mExecutor)
+      return NS_OK;
+
+    // Note, this creates a ref cycle keeping the inner alive
+    // until the queue is drained.
+    mExecutor = new Executor(this);
+    nsresult rv = mBaseTarget->Dispatch(mExecutor, NS_DISPATCH_NORMAL);
+    if (NS_WARN_IF(NS_FAILED(rv))) {
+      mExecutor = nullptr;
+      return rv;
+    }
+
+    return NS_OK;
+  }
+
   nsresult
   CurrentName(nsACString& aName)
   {
     nsCOMPtr<nsIRunnable> event;
 
 #ifdef DEBUG
     bool currentThread = false;
     mBaseTarget->IsOnCurrentThread(&currentThread);
@@ -153,16 +203,27 @@ class ThrottledEventQueue::Inner final :
     bool currentThread = false;
     mBaseTarget->IsOnCurrentThread(&currentThread);
     MOZ_ASSERT(currentThread);
 #endif
 
     {
       MutexAutoLock lock(mMutex);
 
+      // Normally, a paused queue doesn't dispatch any executor, but we might
+      // have been paused after the executor was already in flight. There's no
+      // way to yank the executor out of the base event target, so we just check
+      // for a paused queue here and return without running anything. We'll
+      // create a new executor when we're resumed.
+      if (IsPaused(lock)) {
+        // Note, this breaks a ref cycle.
+        mExecutor = nullptr;
+        return;
+      }
+
       // We only dispatch an executor runnable when we know there is something
       // in the queue, so this should never fail.
       event = mEventQueue.GetEvent(nullptr, lock);
       MOZ_ASSERT(event);
 
       // If there are more events in the queue, then dispatch the next
       // executor.  We do this now, before running the event, because
       // the event might spin the event loop and we don't want to stall
@@ -223,21 +284,53 @@ public:
     MOZ_ASSERT(!NS_IsMainThread());
 #ifdef DEBUG
     bool onBaseTarget = false;
     Unused << mBaseTarget->IsOnCurrentThread(&onBaseTarget);
     MOZ_ASSERT(!onBaseTarget);
 #endif
 
     MutexAutoLock lock(mMutex);
-    while (mExecutor) {
+    while (mExecutor || IsPaused(lock)) {
       mIdleCondVar.Wait();
     }
   }
 
+  bool
+  IsPaused() const
+  {
+    MutexAutoLock lock(mMutex);
+    return IsPaused(lock);
+  }
+
+  bool
+  IsPaused(const MutexAutoLock& aProofOfLock) const
+  {
+    return mIsPaused;
+  }
+
+  nsresult
+  SetIsPaused(bool aIsPaused)
+  {
+    MutexAutoLock lock(mMutex);
+
+    // If we will be unpaused, and we have events in our queue, make sure we
+    // have an executor queued on the base event target to run them. Do this
+    // before we actually change mIsPaused, since this is fallible.
+    if (!aIsPaused && !mEventQueue.IsEmpty(lock)) {
+      nsresult rv = EnsureExecutor(lock);
+      if (NS_FAILED(rv)) {
+        return rv;
+      }
+    }
+
+    mIsPaused = aIsPaused;
+    return NS_OK;
+  }
+
   nsresult
   DispatchFromScript(nsIRunnable* aEvent, uint32_t aFlags)
   {
     // Any thread
     nsCOMPtr<nsIRunnable> r = aEvent;
     return Dispatch(r.forget(), aFlags);
   }
 
@@ -245,29 +338,23 @@ public:
   Dispatch(already_AddRefed<nsIRunnable> aEvent, uint32_t aFlags)
   {
     MOZ_ASSERT(aFlags == NS_DISPATCH_NORMAL ||
                aFlags == NS_DISPATCH_AT_END);
 
     // Any thread
     MutexAutoLock lock(mMutex);
 
-    // We are not currently processing events, so we must start
-    // operating on our base target.  This is fallible, so do
-    // it first.  Our lock will prevent the executor from accessing
-    // the event queue before we add the event below.
-    if (!mExecutor) {
-      // Note, this creates a ref cycle keeping the inner alive
-      // until the queue is drained.
-      mExecutor = new Executor(this);
-      nsresult rv = mBaseTarget->Dispatch(mExecutor, NS_DISPATCH_NORMAL);
-      if (NS_WARN_IF(NS_FAILED(rv))) {
-        mExecutor = nullptr;
+    if (!IsPaused(lock)) {
+      // Make sure we have an executor in flight to process events. This is
+      // fallible, so do it first. Our lock will prevent the executor from
+      // accessing the event queue before we add the event below.
+      nsresult rv = EnsureExecutor(lock);
+      if (NS_FAILED(rv))
         return rv;
-      }
     }
 
     // Only add the event to the underlying queue if are able to
     // dispatch to our base target.
     mEventQueue.PutEvent(std::move(aEvent), EventPriority::Normal, lock);
     return NS_OK;
   }
 
@@ -327,16 +414,28 @@ ThrottledEventQueue::Length() const
 }
 
 void
 ThrottledEventQueue::AwaitIdle() const
 {
   return mInner->AwaitIdle();
 }
 
+nsresult
+ThrottledEventQueue::SetIsPaused(bool aIsPaused)
+{
+  return mInner->SetIsPaused(aIsPaused);
+}
+
+bool
+ThrottledEventQueue::IsPaused() const
+{
+  return mInner->IsPaused();
+}
+
 NS_IMETHODIMP
 ThrottledEventQueue::DispatchFromScript(nsIRunnable* aEvent, uint32_t aFlags)
 {
   return mInner->DispatchFromScript(aEvent, aFlags);
 }
 
 NS_IMETHODIMP
 ThrottledEventQueue::Dispatch(already_AddRefed<nsIRunnable> aEvent,
--- a/xpcom/threads/ThrottledEventQueue.h
+++ b/xpcom/threads/ThrottledEventQueue.h
@@ -17,17 +17,17 @@
 
 namespace mozilla {
 
 // A ThrottledEventQueue is an event target that can be used to throttle
 // events being dispatched to another base target.  It maintains its
 // own queue of events and only dispatches one at a time to the wrapped
 // target.  This can be used to avoid flooding the base target.
 //
-// Flooding is avoided via a very simply principal.  Runnables dispatched
+// Flooding is avoided via a very simple principle.  Runnables dispatched
 // to the ThrottledEventQueue are only dispatched to the base target
 // one at a time.  Only once that runnable has executed will we dispatch
 // the next runnable to the base target.  This in effect makes all
 // runnables passing through the ThrottledEventQueue yield to other work
 // on the base target.
 //
 // ThrottledEventQueue keeps runnables waiting to be dispatched to the
 // base in its own internal queue.  Code can query the length of this
@@ -69,20 +69,42 @@ public:
   Create(nsISerialEventTarget* aBaseTarget);
 
   // Determine if there are any events pending in the queue.
   bool IsEmpty() const;
 
   // Determine how many events are pending in the queue.
   uint32_t Length() const;
 
-  // Block the current thread until the queue is empty.  This may not
-  // be called on the main thread or the base target.
+  // Block the current thread until the queue is empty. This may not be called
+  // on the main thread or the base target. The ThrottledEventQueue must not be
+  // paused.
   void AwaitIdle() const;
 
+  // If |aIsPaused| is true, pause execution of events from this queue. No
+  // events from this queue will be run until this is called with |aIsPaused|
+  // false.
+  //
+  // To un-pause a ThrottledEventQueue, we need to dispatch a runnable to the
+  // underlying event target. That operation may fail, so this method is
+  // fallible as well.
+  //
+  // Note that, although ThrottledEventQueue's behavior is descibed as queueing
+  // events on the base target, an event queued on a TEQ is never actually moved
+  // to any other queue. What is actually dispatched to the base is an
+  // "executor" event which, when run, removes an event from the TEQ and runs it
+  // immediately. This means that you can pause a TEQ even after the executor
+  // has been queued on the base target, and even so, no events from the TEQ
+  // will run. When the base target gets around to running the executor, the
+  // executor will see that the TEQ is paused, and do nothing.
+  MOZ_MUST_USE nsresult SetIsPaused(bool aIsPaused);
+
+  // Return true if this ThrottledEventQueue is paused.
+  bool IsPaused() const;
+
   NS_DECL_THREADSAFE_ISUPPORTS
   NS_DECL_NSIEVENTTARGET_FULL
 
   NS_DECLARE_STATIC_IID_ACCESSOR(NS_THROTTLEDEVENTQUEUE_IID);
 };
 
 NS_DEFINE_STATIC_IID_ACCESSOR(ThrottledEventQueue, NS_THROTTLEDEVENTQUEUE_IID);