Bug 1300659 P1 Add the ThrottledEventQueue class. r=froydnj
authorBen Kelly <ben@wanderview.com>
Mon, 07 Nov 2016 12:30:17 -0800
changeset 366673 537dea427441823141e1c83fc6b4dc215b94f063
parent 366555 d19cffeae1febc6d669a024340de25228e544475
child 366674 a7b4c0d350ef04a8e33fcb98b680a8b103d453b8
push id1369
push userjlorenzo@mozilla.com
push dateMon, 27 Feb 2017 14:59:41 +0000
treeherdermozilla-release@d75a1dba431f [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersfroydnj
bugs1300659
milestone52.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1300659 P1 Add the ThrottledEventQueue class. r=froydnj
xpcom/threads/ThrottledEventQueue.cpp
xpcom/threads/ThrottledEventQueue.h
xpcom/threads/moz.build
xpcom/threads/nsEventQueue.cpp
xpcom/threads/nsEventQueue.h
new file mode 100644
--- /dev/null
+++ b/xpcom/threads/ThrottledEventQueue.cpp
@@ -0,0 +1,443 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "ThrottledEventQueue.h"
+
+#include "mozilla/Atomics.h"
+#include "mozilla/ClearOnShutdown.h"
+#include "mozilla/Mutex.h"
+#include "mozilla/Unused.h"
+#include "nsEventQueue.h"
+
+namespace mozilla {
+
+using mozilla::services::GetObserverService;
+
+namespace {
+
+static const char kShutdownTopic[] = "xpcom-shutdown";
+
+} // anonymous namespace
+
+// The ThrottledEventQueue is designed with inner and outer objects:
+//
+//       XPCOM code    nsObserverService
+//            |               |
+//            |               |
+//            v               |
+//        +-------+           |
+//        | Outer |           |
+//        +-------+           |
+//            |               |
+//            |   +-------+   |
+//            +-->| Inner |<--+
+//                +-------+
+//
+// Client code references the outer nsIEventTarget which in turn references
+// an inner object.  The inner object is also held alive by the observer
+// service.
+//
+// If the outer object is dereferenced and destroyed, it will trigger a
+// shutdown operation on the inner object.  Similarly if the observer
+// service notifies that the browser is shutting down, then the inner
+// object also starts shutting down.
+//
+// Once the queue has drained we unregister from the observer service.  If
+// the outer object is already gone, then the inner object is free'd at this
+// point.  If the outer object still exists then calls fall back to the
+// ThrottledEventQueue's base target.  We just don't queue things
+// any more.  The inner is then released once the outer object is released.
+//
+// Note, we must keep the inner object alive and attached to the observer
+// service until the TaskQueue is fully shutdown and idle.  We must delay
+// xpcom shutdown if the TaskQueue is in the middle of draining.
+class ThrottledEventQueue::Inner final : public nsIObserver
+{
+  // The runnable which is dispatched to the underlying base target.  Since
+  // we only execute one event at a time we just re-use a single instance
+  // of this class while there are events left in the queue.
+  class Executor final : public Runnable
+  {
+    RefPtr<Inner> mInner;
+
+  public:
+    explicit Executor(Inner* aInner)
+      : mInner(aInner)
+    { }
+
+    NS_IMETHODIMP
+    Run()
+    {
+      mInner->ExecuteRunnable();
+      return NS_OK;
+    }
+  };
+
+  mutable Mutex mMutex;
+  mutable CondVar mIdleCondVar;
+
+  // any thread, protected by mutex
+  nsEventQueue mEventQueue;
+
+  // written on main thread, read on any thread
+  nsCOMPtr<nsIEventTarget> mBaseTarget;
+
+  // any thread, protected by mutex
+  nsCOMPtr<nsIRunnable> mExecutor;
+
+  // any thread, atomic
+  Atomic<uint32_t> mExecutionDepth;
+
+  // any thread, protected by mutex
+  bool mShutdownStarted;
+
+  explicit Inner(nsIEventTarget* aBaseTarget)
+    : mMutex("ThrottledEventQueue")
+    , mIdleCondVar(mMutex, "ThrottledEventQueue:Idle")
+    , mEventQueue(mMutex)
+    , mBaseTarget(aBaseTarget)
+    , mExecutionDepth(0)
+    , mShutdownStarted(false)
+  {
+  }
+
+  ~Inner()
+  {
+    MOZ_ASSERT(!mExecutor);
+    MOZ_ASSERT(mShutdownStarted);
+  }
+
+  void
+  ExecuteRunnable()
+  {
+    // Any thread
+    nsCOMPtr<nsIRunnable> event;
+    bool shouldShutdown = false;
+
+#ifdef DEBUG
+    bool currentThread = false;
+    mBaseTarget->IsOnCurrentThread(&currentThread);
+    MOZ_ASSERT(currentThread);
+#endif
+
+    {
+      MutexAutoLock lock(mMutex);
+
+      // We only dispatch an executor runnable when we know there is something
+      // in the queue, so this should never fail.
+      MOZ_ALWAYS_TRUE(mEventQueue.GetPendingEvent(getter_AddRefs(event), lock));
+
+      // If there are more events in the queue, then dispatch the next
+      // executor.  We do this now, before running the event, because
+      // the event might spin the event loop and we don't want to stall
+      // the queue.
+      if (mEventQueue.HasPendingEvent(lock)) {
+        // Dispatch the next base target runnable to attempt to execute
+        // the next throttled event.  We must do this before executing
+        // the event in case the event spins the event loop.
+        MOZ_ALWAYS_SUCCEEDS(
+          mBaseTarget->Dispatch(mExecutor, NS_DISPATCH_NORMAL));
+      }
+
+      // Otherwise the queue is empty and we can stop dispatching the
+      // executor.  We might also need to shutdown after running the
+      // last event.
+      else {
+        shouldShutdown = mShutdownStarted;
+        // Note, this breaks a ref cycle.
+        mExecutor = nullptr;
+        mIdleCondVar.NotifyAll();
+      }
+    }
+
+    // Execute the event now that we have unlocked.
+    ++mExecutionDepth;
+    Unused << event->Run();
+    --mExecutionDepth;
+
+    // If shutdown was started and the queue is now empty we can now
+    // finalize the shutdown.  This is performed separately at the end
+    // of the method in order to wait for the event to finish running.
+    if (shouldShutdown) {
+      MOZ_ASSERT(IsEmpty());
+      NS_DispatchToMainThread(NewRunnableMethod(this, &Inner::ShutdownComplete));
+    }
+  }
+
+  void
+  ShutdownComplete()
+  {
+    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(IsEmpty());
+    nsCOMPtr<nsIObserverService> obs = GetObserverService();
+    obs->RemoveObserver(this, kShutdownTopic);
+  }
+
+public:
+  static already_AddRefed<Inner>
+  Create(nsIEventTarget* aBaseTarget)
+  {
+    MOZ_ASSERT(NS_IsMainThread());
+
+    if (ClearOnShutdown_Internal::sCurrentShutdownPhase != ShutdownPhase::NotInShutdown) {
+      return nullptr;
+    }
+
+    nsCOMPtr<nsIObserverService> obs = GetObserverService();
+    if (NS_WARN_IF(!obs)) {
+      return nullptr;
+    }
+
+    RefPtr<Inner> ref = new Inner(aBaseTarget);
+
+    nsresult rv = obs->AddObserver(ref, kShutdownTopic,
+                                   false /* means OS will hold a strong ref */);
+    if (NS_WARN_IF(NS_FAILED(rv))) {
+      ref->MaybeStartShutdown();
+      MOZ_ASSERT(ref->IsEmpty());
+      return nullptr;
+    }
+
+    return ref.forget();
+  }
+
+  NS_IMETHOD
+  Observe(nsISupports*, const char* aTopic, const char16_t*) override
+  {
+    MOZ_ASSERT(NS_IsMainThread());
+    MOZ_ASSERT(!strcmp(aTopic, kShutdownTopic));
+
+    MaybeStartShutdown();
+
+    // Once shutdown begins we set the Atomic<bool> mShutdownStarted flag.
+    // This prevents any new runnables from being dispatched into the
+    // TaskQueue.  Therefore this loop should be finite.
+    while (!IsEmpty()) {
+      MOZ_ALWAYS_TRUE(NS_ProcessNextEvent());
+    }
+
+    return NS_OK;
+  }
+
+  void
+  MaybeStartShutdown()
+  {
+    // Any thread
+    MutexAutoLock lock(mMutex);
+
+    if (mShutdownStarted) {
+      return;
+    }
+    mShutdownStarted = true;
+
+    // We are marked for shutdown now, but we are still processing runnables.
+    // Return for now.  The shutdown will be completed once the queue is
+    // drained.
+    if (mExecutor) {
+      return;
+    }
+
+    // The queue is empty, so we can complete immediately.
+    NS_DispatchToMainThread(NewRunnableMethod(this, &Inner::ShutdownComplete));
+  }
+
+  bool
+  IsEmpty() const
+  {
+    // Any thread
+    return Length() == 0;
+  }
+
+  uint32_t
+  Length() const
+  {
+    // Any thread
+    MutexAutoLock lock(mMutex);
+    return mEventQueue.Count(lock);
+  }
+
+  void
+  AwaitIdle() const
+  {
+    // Any thread, except the main thread or our base target.  Blocking the
+    // main thread is forbidden.  Blocking the base target is guaranteed to
+    // produce a deadlock.
+    MOZ_ASSERT(!NS_IsMainThread());
+#ifdef DEBUG
+    bool onBaseTarget = false;
+    Unused << mBaseTarget->IsOnCurrentThread(&onBaseTarget);
+    MOZ_ASSERT(!onBaseTarget);
+#endif
+
+    MutexAutoLock lock(mMutex);
+    while (mExecutor) {
+      mIdleCondVar.Wait();
+    }
+  }
+
+  nsresult
+  DispatchFromScript(nsIRunnable* aEvent, uint32_t aFlags)
+  {
+    // Any thread
+    nsCOMPtr<nsIRunnable> r = aEvent;
+    return Dispatch(r.forget(), aFlags);
+  }
+
+  nsresult
+  Dispatch(already_AddRefed<nsIRunnable> aEvent, uint32_t aFlags)
+  {
+    MOZ_ASSERT(aFlags == NS_DISPATCH_NORMAL ||
+               aFlags == NS_DISPATCH_AT_END);
+
+    // Any thread
+    MutexAutoLock lock(mMutex);
+
+    // If we are shutting down, just fall back to our base target
+    // directly.
+    if (mShutdownStarted) {
+      return mBaseTarget->Dispatch(Move(aEvent), aFlags);
+    }
+
+    // We are not currently processing events, so we must start
+    // operating on our base target.  This is fallible, so do
+    // it first.  Our lock will prevent the executor from accessing
+    // the event queue before we add the event below.
+    if (!mExecutor) {
+      // Note, this creates a ref cycle keeping the inner alive
+      // until the queue is drained.
+      mExecutor = new Executor(this);
+      nsresult rv = mBaseTarget->Dispatch(mExecutor, NS_DISPATCH_NORMAL);
+      if (NS_WARN_IF(NS_FAILED(rv))) {
+        mExecutor = nullptr;
+        return rv;
+      }
+    }
+
+    // Only add the event to the underlying queue if are able to
+    // dispatch to our base target.
+    mEventQueue.PutEvent(Move(aEvent), lock);
+    return NS_OK;
+  }
+
+  nsresult
+  DelayedDispatch(already_AddRefed<nsIRunnable> aEvent, uint32_t aDelay)
+  {
+    // The base target may implement this, but we don't.  Always fail
+    // to provide consistent behavior.
+    return NS_ERROR_NOT_IMPLEMENTED;
+  }
+
+  nsresult
+  IsOnCurrentThread(bool* aResult)
+  {
+    // Any thread
+
+    bool shutdownAndIdle = false;
+    {
+      MutexAutoLock lock(mMutex);
+      shutdownAndIdle = mShutdownStarted && mEventQueue.Count(lock) == 0;
+    }
+
+    bool onBaseTarget = false;
+    nsresult rv = mBaseTarget->IsOnCurrentThread(&onBaseTarget);
+    if (NS_FAILED(rv)) {
+      return rv;
+    }
+
+    // We consider the current stack on this event target if are on
+    // the base target and one of the following is true
+    //  1) We are currently running an event OR
+    //  2) We are both shutting down and the queue is idle
+    *aResult = onBaseTarget && (mExecutionDepth || shutdownAndIdle);
+
+    return NS_OK;
+  }
+
+  NS_DECL_THREADSAFE_ISUPPORTS
+};
+
+NS_IMPL_ISUPPORTS(ThrottledEventQueue::Inner, nsIObserver);
+
+NS_IMPL_ISUPPORTS(ThrottledEventQueue, nsIEventTarget);
+
+ThrottledEventQueue::ThrottledEventQueue(already_AddRefed<Inner> aInner)
+  : mInner(aInner)
+{
+  MOZ_ASSERT(mInner);
+}
+
+ThrottledEventQueue::~ThrottledEventQueue()
+{
+  mInner->MaybeStartShutdown();
+}
+
+void
+ThrottledEventQueue::MaybeStartShutdown()
+{
+  return mInner->MaybeStartShutdown();
+}
+
+already_AddRefed<ThrottledEventQueue>
+ThrottledEventQueue::Create(nsIEventTarget* aBaseTarget)
+{
+  MOZ_ASSERT(NS_IsMainThread());
+  MOZ_ASSERT(aBaseTarget);
+
+  RefPtr<Inner> inner = Inner::Create(aBaseTarget);
+  if (NS_WARN_IF(!inner)) {
+    return nullptr;
+  }
+
+  RefPtr<ThrottledEventQueue> ref =
+    new ThrottledEventQueue(inner.forget());
+  return ref.forget();
+}
+
+bool
+ThrottledEventQueue::IsEmpty() const
+{
+  return mInner->IsEmpty();
+}
+
+uint32_t
+ThrottledEventQueue::Length() const
+{
+  return mInner->Length();
+}
+
+void
+ThrottledEventQueue::AwaitIdle() const
+{
+  return mInner->AwaitIdle();
+}
+
+NS_IMETHODIMP
+ThrottledEventQueue::DispatchFromScript(nsIRunnable* aEvent, uint32_t aFlags)
+{
+  return mInner->DispatchFromScript(aEvent, aFlags);
+}
+
+NS_IMETHODIMP
+ThrottledEventQueue::Dispatch(already_AddRefed<nsIRunnable> aEvent,
+                                     uint32_t aFlags)
+{
+  return mInner->Dispatch(Move(aEvent), aFlags);
+}
+
+NS_IMETHODIMP
+ThrottledEventQueue::DelayedDispatch(already_AddRefed<nsIRunnable> aEvent,
+                                            uint32_t aFlags)
+{
+  return mInner->DelayedDispatch(Move(aEvent), aFlags);
+}
+
+NS_IMETHODIMP
+ThrottledEventQueue::IsOnCurrentThread(bool* aResult)
+{
+  return mInner->IsOnCurrentThread(aResult);
+}
+
+} // namespace mozilla
new file mode 100644
--- /dev/null
+++ b/xpcom/threads/ThrottledEventQueue.h
@@ -0,0 +1,94 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// nsIEventTarget wrapper for throttling event dispatch.
+
+#ifndef mozilla_ThrottledEventQueue_h
+#define mozilla_ThrottledEventQueue_h
+
+#include "nsIEventTarget.h"
+
+namespace mozilla {
+
+// A ThrottledEventQueue is an event target that can be used to throttle
+// events being dispatched to another base target.  It maintains its
+// own queue of events and only dispatches one at a time to the wrapped
+// target.  This can be used to avoid flooding the base target.
+//
+// Flooding is avoided via a very simply principal.  Runnables dispatched
+// to the ThrottledEventQueue are only dispatched to the base target
+// one at a time.  Only once that runnable has executed will we dispatch
+// the next runnable to the base target.  This in effect makes all
+// runnables passing through the ThrottledEventQueue yield to other work
+// on the base target.
+//
+// ThrottledEventQueue keeps runnables waiting to be dispatched to the
+// base in its own internal queue.  Code can query the length of this
+// queue using IsEmpty() and Length().  Further, code implement back
+// pressure by checking the depth of the queue and deciding to stop
+// issuing runnables if they see the ThrottledEventQueue is backed up.
+// Code running on other threads could even use AwaitIdle() to block
+// all operation until the ThrottledEventQueue drains.
+//
+// Note, this class is similar to TaskQueue, but also differs in a few
+// ways.  First, it is a very simple nsIEventTarget implementation.  It
+// does not use the AbstractThread API.
+//
+// In addition, ThrottledEventQueue currently dispatches its next
+// runnable to the base target *before* running the current event.  This
+// allows the event code to spin the event loop without stalling the
+// ThrottledEventQueue.  In contrast, TaskQueue only dispatches its next
+// runnable after running the current event.  That approach is necessary
+// for TaskQueue in order to work with thread pool targets.
+//
+// So, if you are targeting a thread pool you probably want a TaskQueue.
+// If you are targeting a single thread or other non-concurrent event
+// target, you probably want a ThrottledEventQueue.
+//
+// ThrottledEventQueue also implements an automatic shutdown mechanism.
+// De-referencing the queue or browser shutdown will automatically begin
+// shutdown.
+//
+// Once shutdown begins all events will bypass the queue and be dispatched
+// straight to the underlying base target.
+class ThrottledEventQueue final : public nsIEventTarget
+{
+  class Inner;
+  RefPtr<Inner> mInner;
+
+  explicit ThrottledEventQueue(already_AddRefed<Inner> aInner);
+  ~ThrottledEventQueue();
+
+  // Begin shutdown of the event queue.  This has no effect if shutdown
+  // is already in process.  After this is called nsIEventTarget methods
+  // will bypass the queue and operate directly on the base target.
+  // Note, this could be made public if code needs to explicitly shutdown
+  // for some reason.
+  void MaybeStartShutdown();
+
+public:
+  // Attempt to create a ThrottledEventQueue for the given target.  This
+  // may return nullptr if the browser is already shutting down.
+  static already_AddRefed<ThrottledEventQueue>
+  Create(nsIEventTarget* aBaseTarget);
+
+  // Determine if there are any events pending in the queue.
+  bool IsEmpty() const;
+
+  // Determine how many events are pending in the queue.
+  uint32_t Length() const;
+
+  // Block the current thread until the queue is empty.  This may not
+  // be called on the main thread or the base target.
+  void AwaitIdle() const;
+
+  NS_DECL_THREADSAFE_ISUPPORTS
+  NS_DECL_NSIEVENTTARGET
+};
+
+} // namespace mozilla
+
+#endif // mozilla_ThrottledEventQueue_h
--- a/xpcom/threads/moz.build
+++ b/xpcom/threads/moz.build
@@ -38,16 +38,17 @@ EXPORTS.mozilla += [
     'MainThreadIdlePeriod.h',
     'MozPromise.h',
     'SharedThreadPool.h',
     'StateMirroring.h',
     'StateWatching.h',
     'SyncRunnable.h',
     'TaskDispatcher.h',
     'TaskQueue.h',
+    'ThrottledEventQueue.h',
 ]
 
 UNIFIED_SOURCES += [
     'AbstractThread.cpp',
     'BackgroundHangMonitor.cpp',
     'HangAnnotations.cpp',
     'HangMonitor.cpp',
     'LazyIdleThread.cpp',
@@ -58,16 +59,17 @@ UNIFIED_SOURCES += [
     'nsProcessCommon.cpp',
     'nsThread.cpp',
     'nsThreadManager.cpp',
     'nsThreadPool.cpp',
     'nsTimerImpl.cpp',
     'SharedThreadPool.cpp',
     'TaskQueue.cpp',
     'ThreadStackHelper.cpp',
+    'ThrottledEventQueue.cpp',
     'TimerThread.cpp',
 ]
 
 LOCAL_INCLUDES += [
     '../build',
     '/caps',
     '/tools/profiler',
 ]
--- a/xpcom/threads/nsEventQueue.cpp
+++ b/xpcom/threads/nsEventQueue.cpp
@@ -107,17 +107,17 @@ nsEventQueue::PutEvent(already_AddRefed<
 void
 nsEventQueue::PutEvent(nsIRunnable* aRunnable, MutexAutoLock& aProofOfLock)
 {
   nsCOMPtr<nsIRunnable> event(aRunnable);
   PutEvent(event.forget(), aProofOfLock);
 }
 
 size_t
-nsEventQueue::Count(MutexAutoLock& aProofOfLock)
+nsEventQueue::Count(MutexAutoLock& aProofOfLock) const
 {
   // It is obvious count is 0 when the queue is empty.
   if (!mHead) {
     return 0;
   }
 
   /* How we count the number of events in the queue:
    * 1. Let pageCount(x, y) denote the number of pages excluding the tail page
--- a/xpcom/threads/nsEventQueue.h
+++ b/xpcom/threads/nsEventQueue.h
@@ -48,17 +48,17 @@ public:
   }
 
   // This method returns the next pending event or null.
   bool GetPendingEvent(nsIRunnable** aRunnable, MutexAutoLock& aProofOfLock)
   {
     return GetEvent(false, aRunnable, aProofOfLock);
   }
 
-  size_t Count(MutexAutoLock&);
+  size_t Count(MutexAutoLock&) const;
 
 private:
   bool IsEmpty()
   {
     return !mHead || (mHead == mTail && mOffsetHead == mOffsetTail);
   }
 
   enum