Bug 1637592 - Part 1: Integrate off main thread task execution into TaskController. r=smaug
authorBas Schouten <bschouten@mozilla.com>
Tue, 27 Oct 2020 15:29:11 +0000
changeset 554706 ee7e7a720e7d78cb053bf94cf8ccdde4d162e0e9
parent 554705 8eeb925b28d5539f9550d73af18e05d910b3bd4b
child 554707 42bf476bba9a12cfad1a25f9269d2b3113a2d8b2
push id37898
push userabutkovits@mozilla.com
push dateWed, 28 Oct 2020 09:24:21 +0000
treeherdermozilla-central@83bf4fd3b1fb [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerssmaug
bugs1637592
milestone84.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1637592 - Part 1: Integrate off main thread task execution into TaskController. r=smaug Differential Revision: https://phabricator.services.mozilla.com/D75105
xpcom/threads/TaskController.cpp
xpcom/threads/TaskController.h
--- a/xpcom/threads/TaskController.cpp
+++ b/xpcom/threads/TaskController.cpp
@@ -8,29 +8,50 @@
 #include "nsIIdleRunnable.h"
 #include "nsIRunnable.h"
 #include "nsThreadUtils.h"
 #include <algorithm>
 #include <initializer_list>
 #include "mozilla/EventQueue.h"
 #include "mozilla/BackgroundHangMonitor.h"
 #include "mozilla/InputTaskManager.h"
+#include "mozilla/IOInterposer.h"
 #include "mozilla/StaticMutex.h"
 #include "mozilla/SchedulerGroup.h"
 #include "mozilla/ScopeExit.h"
 #include "mozilla/Unused.h"
 #include "nsIThreadInternal.h"
 #include "nsQueryObject.h"
 #include "nsThread.h"
+#include "prsystem.h"
 
 namespace mozilla {
 
 std::unique_ptr<TaskController> TaskController::sSingleton;
+thread_local size_t mThreadPoolIndex = -1;
 std::atomic<uint64_t> Task::sCurrentTaskSeqNo = 0;
 
+const int32_t kMaximumPoolThreadCount = 8;
+
+static int32_t GetPoolThreadCount() {
+  if (PR_GetEnv("MOZ_TASKCONTROLLER_THREADCOUNT")) {
+    return strtol(PR_GetEnv("MOZ_TASKCONTROLLER_THREADCOUNT"), nullptr, 0);
+  }
+
+  int32_t numCores = std::max<int32_t>(1, PR_GetNumberOfProcessors());
+
+  if (numCores == 1) {
+    return 1;
+  }
+  if (numCores == 2) {
+    return 2;
+  }
+  return std::min<int32_t>(kMaximumPoolThreadCount, numCores - 1);
+}
+
 bool TaskManager::
     UpdateCachesForCurrentIterationAndReportPriorityModifierChanged(
         const MutexAutoLock& aProofOfLock, IterationType aIterationType) {
   mCurrentSuspended = IsSuspended(aProofOfLock);
 
   if (aIterationType == IterationType::EVENT_LOOP_TURN) {
     int32_t oldModifier = mCurrentPriorityModifier;
     mCurrentPriorityModifier =
@@ -73,48 +94,192 @@ TaskController* TaskController::Get() {
 }
 
 bool TaskController::Initialize() {
   MOZ_ASSERT(!sSingleton);
   sSingleton = std::make_unique<TaskController>();
   return sSingleton->InitializeInternal();
 }
 
+void ThreadFuncPoolThread(TaskController* aController, size_t aIndex) {
+  mThreadPoolIndex = aIndex;
+  aController->RunPoolThread();
+}
+
 bool TaskController::InitializeInternal() {
   InputTaskManager::Init();
   mMTProcessingRunnable = NS_NewRunnableFunction(
       "TaskController::ExecutePendingMTTasks()",
       []() { TaskController::Get()->ProcessPendingMTTask(); });
   mMTBlockingProcessingRunnable = NS_NewRunnableFunction(
       "TaskController::ExecutePendingMTTasks()",
       []() { TaskController::Get()->ProcessPendingMTTask(true); });
 
   return true;
 }
 
+void TaskController::InitializeThreadPool() {
+  mPoolInitializationMutex.AssertCurrentThreadOwns();
+  MOZ_ASSERT(!mThreadPoolInitialized);
+  mThreadPoolInitialized = true;
+
+  int32_t poolSize = GetPoolThreadCount();
+  for (int32_t i = 0; i < poolSize; i++) {
+    mPoolThreads.push_back(
+        {std::make_unique<std::thread>(ThreadFuncPoolThread, this, i),
+         nullptr});
+  }
+}
+
 void TaskController::SetPerformanceCounterState(
     PerformanceCounterState* aPerformanceCounterState) {
   mPerformanceCounterState = aPerformanceCounterState;
 }
 
 /* static */
 void TaskController::Shutdown() {
   InputTaskManager::Cleanup();
   if (sSingleton) {
+    sSingleton->ShutdownThreadPoolInternal();
     sSingleton->ShutdownInternal();
   }
   MOZ_ASSERT(!sSingleton);
 }
 
+void TaskController::ShutdownThreadPoolInternal() {
+  {
+    // Prevent racecondition on mShuttingDown and wait.
+    MutexAutoLock lock(mGraphMutex);
+
+    mShuttingDown = true;
+    mThreadPoolCV.NotifyAll();
+  }
+  for (PoolThread& thread : mPoolThreads) {
+    thread.mThread->join();
+  }
+}
+
 void TaskController::ShutdownInternal() { sSingleton = nullptr; }
 
+void TaskController::RunPoolThread() {
+  IOInterposer::RegisterCurrentThread();
+
+  // This is used to hold on to a task to make sure it is released outside the
+  // lock. This is required since it's perfectly feasible for task destructors
+  // to post events themselves.
+  RefPtr<Task> lastTask;
+
+  MutexAutoLock lock(mGraphMutex);
+  while (true) {
+    if (mShuttingDown) {
+      IOInterposer::UnregisterCurrentThread();
+      return;
+    }
+    bool ranTask = false;
+
+    if (!mThreadableTasks.empty()) {
+      for (auto iter = mThreadableTasks.begin(); iter != mThreadableTasks.end();
+           ++iter) {
+        // Search for the highest priority dependency of the highest priority
+        // task.
+
+        // We work with rawptrs to avoid needless refcounting. All our tasks
+        // are always kept alive by the graph. If one is removed from the graph
+        // it is kept alive by mPoolThreads[mThreadPoolIndex].mCurrentTask.
+        Task* task = iter->get();
+
+        MOZ_ASSERT(!task->mTaskManager);
+
+        mPoolThreads[mThreadPoolIndex].mEffectiveTaskPriority =
+            task->GetPriority();
+
+        Task* nextTask;
+        while ((nextTask = task->GetHighestPriorityDependency())) {
+          task = nextTask;
+        }
+
+        if (task->IsMainThreadOnly() || task->mInProgress) {
+          continue;
+        }
+
+        mPoolThreads[mThreadPoolIndex].mCurrentTask = task;
+        mThreadableTasks.erase(task->mIterator);
+        task->mIterator = mThreadableTasks.end();
+        task->mInProgress = true;
+
+        bool taskCompleted = false;
+        {
+          MutexAutoUnlock unlock(mGraphMutex);
+          lastTask = nullptr;
+          taskCompleted = task->Run();
+          ranTask = true;
+        }
+
+        task->mInProgress = false;
+
+        if (!taskCompleted) {
+          // Presumably this task was interrupted, leave its dependencies
+          // unresolved and reinsert into the queue.
+          auto insertion = mThreadableTasks.insert(
+              mPoolThreads[mThreadPoolIndex].mCurrentTask);
+          MOZ_ASSERT(insertion.second);
+          task->mIterator = insertion.first;
+        } else {
+          task->mCompleted = true;
+#ifdef DEBUG
+          task->mIsInGraph = false;
+#endif
+          task->mDependencies.clear();
+          // This may have unblocked a main thread task. We could do this only
+          // if there was a main thread task before this one in the dependency
+          // chain.
+          mMayHaveMainThreadTask = true;
+          // Since this could have multiple dependencies thare are restricted
+          // to the main thread. Let's make sure that's awake.
+          EnsureMainThreadTasksScheduled();
+
+          MaybeInterruptTask(GetHighestPriorityMTTask());
+        }
+
+        // Store last task for release next time we release the lock or enter
+        // wait state.
+        lastTask = mPoolThreads[mThreadPoolIndex].mCurrentTask.forget();
+        break;
+      }
+    }
+
+    // Ensure the last task is released before we enter the wait state.
+    if (lastTask) {
+      MutexAutoUnlock unlock(mGraphMutex);
+      lastTask = nullptr;
+
+      // Run another loop iteration, while we were unlocked there was an
+      // opportunity for another task to be posted or shutdown to be initiated.
+      continue;
+    }
+
+    if (!ranTask) {
+      AUTO_PROFILER_LABEL("TaskController::RunPoolThread", IDLE);
+      mThreadPoolCV.Wait();
+    }
+  }
+}
+
 void TaskController::AddTask(already_AddRefed<Task>&& aTask) {
-  MutexAutoLock lock(mGraphMutex);
+  RefPtr<Task> task(aTask);
 
-  RefPtr<Task> task(aTask);
+  if (!task->IsMainThreadOnly()) {
+    MutexAutoLock lock(mPoolInitializationMutex);
+    if (!mThreadPoolInitialized) {
+      InitializeThreadPool();
+      mThreadPoolInitialized = true;
+    }
+  }
+
+  MutexAutoLock lock(mGraphMutex);
 
   if (TaskManager* manager = task->GetManager()) {
     if (manager->mTaskCount == 0) {
       mTaskManagers.insert(manager);
     }
     manager->DidQueueTask();
 
     // Set this here since if this manager's priority modifier doesn't change
@@ -132,19 +297,25 @@ void TaskController::AddTask(already_Add
   for (const RefPtr<Task>& otherTask : task->mDependencies) {
     MOZ_ASSERT(!otherTask->mTaskManager ||
                otherTask->mTaskManager == task->mTaskManager);
   }
 #endif
 
   LogTask::LogDispatch(task);
 
-  auto insertion = mMainThreadTasks.insert(std::move(task));
+  std::pair<std::set<RefPtr<Task>, Task::PriorityCompare>::iterator, bool>
+      insertion;
+  if (task->IsMainThreadOnly()) {
+    insertion = mMainThreadTasks.insert(std::move(task));
+  } else {
+    insertion = mThreadableTasks.insert(std::move(task));
+  }
+  (*insertion.first)->mIterator = insertion.first;
   MOZ_ASSERT(insertion.second);
-  (*insertion.first)->mIterator = insertion.first;
 
   MaybeInterruptTask(*insertion.first);
 }
 
 void TaskController::WaitForTaskOrMessage() {
   MutexAutoLock lock(mGraphMutex);
   while (!mMayHaveMainThreadTask) {
     AUTO_PROFILER_LABEL("TaskController::WaitForTaskOrMessage", IDLE);
@@ -190,16 +361,19 @@ void TaskController::ProcessPendingMTTas
   if (mMayHaveMainThreadTask) {
     EnsureMainThreadTasksScheduled();
   }
 }
 
 void TaskController::ReprioritizeTask(Task* aTask, uint32_t aPriority) {
   MutexAutoLock lock(mGraphMutex);
   std::set<RefPtr<Task>, Task::PriorityCompare>* queue = &mMainThreadTasks;
+  if (!aTask->IsMainThreadOnly()) {
+    queue = &mThreadableTasks;
+  }
 
   MOZ_ASSERT(aTask->mIterator != queue->end());
   queue->erase(aTask->mIterator);
 
   aTask->mPriority = aPriority;
 
   auto insertion = queue->insert(aTask);
   MOZ_ASSERT(insertion.second);
@@ -540,16 +714,25 @@ bool TaskController::DoExecuteNextTaskOn
         manager->WillRunTask();
       } else {
         task->mCompleted = true;
 #ifdef DEBUG
         task->mIsInGraph = false;
 #endif
         // Clear dependencies to release references.
         task->mDependencies.clear();
+
+        if (!mThreadableTasks.empty()) {
+          // Since this could have multiple dependencies thare are not
+          // restricted to the main thread. Let's wake up our thread pool.
+          // There is a cost to this, it's possible we will want to wake up
+          // only as many threads as we have unblocked tasks, but we currently
+          // have no way to determine that easily.
+          mThreadPoolCV.NotifyAll();
+        }
       }
 
       mCurrentTasksMT.pop();
       return true;
     }
   }
 
   mMayHaveMainThreadTask = false;
@@ -597,30 +780,60 @@ void TaskController::MaybeInterruptTask(
 
   if (finalDependency->mInProgress) {
     // No need to wake anything, we can't schedule this task right now anyway.
     return;
   }
 
   EnsureMainThreadTasksScheduled();
 
-  mMayHaveMainThreadTask = true;
+  if (aTask->IsMainThreadOnly()) {
+    mMayHaveMainThreadTask = true;
 
-  if (mCurrentTasksMT.empty()) {
-    return;
-  }
+    if (mCurrentTasksMT.empty()) {
+      return;
+    }
+
+    // We could go through the steps above here and interrupt an off main
+    // thread task in case it has a lower priority.
+    if (!finalDependency->IsMainThreadOnly()) {
+      return;
+    }
 
-  // We could go through the steps above here and interrupt an off main
-  // thread task in case it has a lower priority.
-  if (!finalDependency->IsMainThreadOnly()) {
-    return;
-  }
+    if (mCurrentTasksMT.top()->GetPriority() < aTask->GetPriority()) {
+      mCurrentTasksMT.top()->RequestInterrupt(aTask->GetPriority());
+    }
+  } else {
+    Task* lowestPriorityTask = nullptr;
+    for (PoolThread& thread : mPoolThreads) {
+      if (!thread.mCurrentTask) {
+        // There's a free thread, no need to interrupt anything.
+        return;
+      }
+
+      if (!lowestPriorityTask) {
+        lowestPriorityTask = thread.mCurrentTask.get();
+        continue;
+      }
 
-  if (mCurrentTasksMT.top()->GetPriority() < aTask->GetPriority()) {
-    mCurrentTasksMT.top()->RequestInterrupt(aTask->GetPriority());
+      // This should possibly select the lowest priority task which was started
+      // the latest. But for now we ignore that optimization.
+      // This also doesn't guarantee a task is interruptable, so that's an
+      // avenue for improvements as well.
+      if (lowestPriorityTask->GetPriority() > thread.mEffectiveTaskPriority) {
+        lowestPriorityTask = thread.mCurrentTask.get();
+      }
+    }
+
+    if (lowestPriorityTask->GetPriority() < aTask->GetPriority()) {
+      lowestPriorityTask->RequestInterrupt(aTask->GetPriority());
+    }
+
+    // We choose not to interrupt main thread tasks for tasks which may be
+    // executed off the main thread.
   }
 }
 
 Task* TaskController::GetHighestPriorityMTTask() {
   mGraphMutex.AssertCurrentThreadOwns();
 
   if (!mMainThreadTasks.empty()) {
     return mMainThreadTasks.begin()->get();
--- a/xpcom/threads/TaskController.h
+++ b/xpcom/threads/TaskController.h
@@ -220,16 +220,24 @@ class Task {
   int32_t mPriorityModifier = 0;
 #ifdef MOZ_GECKO_PROFILER
   // Time this task was inserted into the task graph, this is used by the
   // profiler.
   mozilla::TimeStamp mInsertionTime;
 #endif
 };
 
+struct PoolThread {
+  std::unique_ptr<std::thread> mThread;
+  RefPtr<Task> mCurrentTask;
+  // This may be higher than mCurrentTask's priority due to priority
+  // propagation. This is -only- valid when mCurrentTask != nullptr.
+  uint32_t mEffectiveTaskPriority;
+};
+
 // A task manager implementation for priority levels that should only
 // run during idle periods.
 class IdleTaskManager : public TaskManager {
  public:
   explicit IdleTaskManager(already_AddRefed<nsIIdlePeriod>&& aIdlePeriod)
       : mIdlePeriodState(std::move(aIdlePeriod)) {}
 
   IdlePeriodState& State() { return mIdlePeriodState; }
@@ -247,16 +255,17 @@ class IdleTaskManager : public TaskManag
 // The TaskController is the core class of the scheduler. It is used to
 // schedule tasks to be executed, as well as to reprioritize tasks that have
 // already been scheduled. The core functions to do this are AddTask and
 // ReprioritizeTask.
 class TaskController {
  public:
   TaskController()
       : mGraphMutex("TaskController::mGraphMutex"),
+        mThreadPoolCV(mGraphMutex, "TaskController::mThreadPoolCV"),
         mMainThreadCV(mGraphMutex, "TaskController::mMainThreadCV") {}
 
   static TaskController* Get();
 
   static bool Initialize();
 
   void SetThreadObserver(nsIThreadObserver* aObserver) {
     mObserver = aObserver;
@@ -266,17 +275,16 @@ class TaskController {
   }
 
   void SetIdleTaskManager(IdleTaskManager* aIdleTaskManager) {
     mIdleTaskManager = aIdleTaskManager;
   }
   IdleTaskManager* GetIdleTaskManager() { return mIdleTaskManager.get(); }
 
   // Initialization and shutdown code.
-  bool InitializeInternal();
   void SetPerformanceCounterState(
       PerformanceCounterState* aPerformanceCounterState);
 
   static void Shutdown();
 
   // This adds a task to the TaskController graph.
   // This may be called on any thread.
   void AddTask(already_AddRefed<Task>&& aTask);
@@ -302,16 +310,22 @@ class TaskController {
   nsIRunnable* GetRunnableForMTTask(bool aReallyWait);
 
   bool HasMainThreadPendingTasks();
 
   // Let users know whether the last main thread task runnable did work.
   bool MTTaskRunnableProcessedTask() { return mMTTaskRunnableProcessedTask; }
 
  private:
+  friend void ThreadFuncPoolThread(TaskController* aController, size_t aIndex);
+
+  bool InitializeInternal();
+
+  void InitializeThreadPool();
+
   // This gets the next (highest priority) task that is only allowed to execute
   // on the main thread, if any, and executes it.
   // Returns true if it succeeded.
   bool ExecuteNextTaskOnlyMainThreadInternal(const MutexAutoLock& aProofOfLock);
 
   // The guts of ExecuteNextTaskOnlyMainThreadInternal, which get idle handling
   // wrapped around them.  Returns whether a task actually ran.
   bool DoExecuteNextTaskOnlyMainThreadInternal(
@@ -320,43 +334,61 @@ class TaskController {
   Task* GetFinalDependency(Task* aTask);
   void MaybeInterruptTask(Task* aTask);
   Task* GetHighestPriorityMTTask();
 
   void EnsureMainThreadTasksScheduled();
 
   void ProcessUpdatedPriorityModifier(TaskManager* aManager);
 
+  void ShutdownThreadPoolInternal();
   void ShutdownInternal();
 
+  void RunPoolThread();
+
   static std::unique_ptr<TaskController> sSingleton;
   static StaticMutex sSingletonMutex;
 
   // This protects access to the task graph.
   Mutex mGraphMutex;
 
+  // This protects thread pool initialization. We cannot do this from within
+  // the GraphMutex, since thread creation on Windows can generate events on
+  // the main thread that need to be handled.
+  Mutex mPoolInitializationMutex =
+      Mutex("TaskController::mPoolInitializationMutex");
+
+  CondVar mThreadPoolCV;
   CondVar mMainThreadCV;
 
   // Variables below are protected by mGraphMutex.
 
+  std::vector<PoolThread> mPoolThreads;
   std::stack<RefPtr<Task>> mCurrentTasksMT;
 
   // A list of all tasks ordered by priority.
+  std::set<RefPtr<Task>, Task::PriorityCompare> mThreadableTasks;
   std::set<RefPtr<Task>, Task::PriorityCompare> mMainThreadTasks;
 
   // TaskManagers currently active.
   // We can use a raw pointer since tasks always hold on to their TaskManager.
   std::set<TaskManager*> mTaskManagers;
 
   // This ensures we keep running the main thread if we processed a task there.
   bool mMayHaveMainThreadTask = true;
+  bool mShuttingDown = false;
 
   // This stores whether the last main thread task runnable did work.
   bool mMTTaskRunnableProcessedTask = false;
 
+  // Whether our thread pool is initialized. We use this currently to avoid
+  // starting the threads in processes where it's never used. This is protected
+  // by mPoolInitializationMutex.
+  bool mThreadPoolInitialized = false;
+
   // Whether we have scheduled a runnable on the main thread event loop.
   // This is used for nsIRunnable compatibility.
   RefPtr<nsIRunnable> mMTProcessingRunnable;
   RefPtr<nsIRunnable> mMTBlockingProcessingRunnable;
 
   // XXX - Thread observer to notify when a new event has been dispatched
   nsIThreadObserver* mObserver = nullptr;
   // XXX - External condvar to notify when we have received an event