Bug 1404997 - P26. Give Await the threadpool to use. r=gerald
authorJean-Yves Avenard <jyavenard@mozilla.com>
Fri, 15 Dec 2017 20:06:38 +0100
changeset 448427 4ccc18cc509dc79d07d86a441799ba864b8e60e7
parent 448426 5b0cffc026a4c17283e64c93d885df411b62ae4d
child 448428 2f1892facca9cf9dd9b5f4180a5b0a71c47d88fb
push id8527
push userCallek@gmail.com
push dateThu, 11 Jan 2018 21:05:50 +0000
treeherdermozilla-beta@95342d212a7a [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersgerald
bugs1404997
milestone59.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1404997 - P26. Give Await the threadpool to use. r=gerald This allows to re-use the SharedThreadPool across calls, preventing the need to create a new thread on each call. MozReview-Commit-ID: CbP6OTYKhHL
dom/media/MediaStreamGraph.cpp
dom/media/MediaStreamGraphImpl.h
dom/media/systemservices/MediaUtils.h
media/webrtc/signaling/src/media-conduit/WebrtcMediaDataDecoderCodec.cpp
media/webrtc/signaling/src/media-conduit/WebrtcMediaDataDecoderCodec.h
--- a/dom/media/MediaStreamGraph.cpp
+++ b/dom/media/MediaStreamGraph.cpp
@@ -29,16 +29,17 @@
 #include "GeckoProfiler.h"
 #include "VideoFrameContainer.h"
 #include "mozilla/AbstractThread.h"
 #include "mozilla/Unused.h"
 #ifdef MOZ_WEBRTC
 #include "AudioOutputObserver.h"
 #endif
 #include "mtransport/runnable_utils.h"
+#include "VideoUtils.h"
 
 #include "webaudio/blink/DenormalDisabler.h"
 #include "webaudio/blink/HRTFDatabaseLoader.h"
 
 using namespace mozilla::layers;
 using namespace mozilla::dom;
 using namespace mozilla::gfx;
 using namespace mozilla::media;
@@ -1156,17 +1157,17 @@ MediaStreamGraphImpl::UpdateGraph(GraphT
     if (SourceMediaStream* is = stream->AsSourceStream()) {
       promises.AppendElements(
         is->PullNewData(aEndBlockingDecisions, &ensureNextIteration));
     }
   }
 
   // Wait until all PullEnabled stream's listeners have completed.
   if (!promises.IsEmpty()) {
-    AwaitAll(promises);
+    AwaitAll(do_AddRef(mThreadPool), promises);
   }
 
   for (MediaStream* stream : mStreams) {
     if (SourceMediaStream* is = stream->AsSourceStream()) {
       is->ExtractPendingInput();
     }
     if (stream->mFinished) {
       // The stream's not suspended, and since it's finished, underruns won't
@@ -1447,16 +1448,20 @@ public:
     // objects owning streams, or for expiration of mGraph->mShutdownTimer,
     // which won't otherwise release its reference on the graph until
     // nsTimerImpl::Shutdown(), which runs after xpcom-shutdown-threads.
     {
       MonitorAutoLock mon(mGraph->mMonitor);
       mGraph->SetCurrentDriver(nullptr);
     }
 
+    // Do not hold on our threadpool, global shutdown will hang otherwise as
+    // it waits for all thread pools to shutdown.
+    mGraph->mThreadPool = nullptr;
+
     // Safe to access these without the monitor since the graph isn't running.
     // We may be one of several graphs. Drop ticket to eventually unblock shutdown.
     if (mGraph->mShutdownTimer && !mGraph->mForceShutdownTicket) {
       MOZ_ASSERT(false,
         "AudioCallbackDriver took too long to shut down and we let shutdown"
         " continue - freezing and leaking");
 
       // The timer fired, so we may be deeper in shutdown now.  Block any further
@@ -3555,16 +3560,17 @@ MediaStreamGraphImpl::MediaStreamGraphIm
   , mPostedRunInStableStateEvent(false)
   , mDetectedNotRunning(false)
   , mPostedRunInStableState(false)
   , mRealtime(aDriverRequested != OFFLINE_THREAD_DRIVER)
   , mNonRealtimeProcessing(false)
   , mStreamOrderDirty(false)
   , mLatencyLog(AsyncLatencyLogger::Get())
   , mAbstractMainThread(aMainThread)
+  , mThreadPool(GetMediaThreadPool(MediaThreadType::MSG_CONTROL))
 #ifdef MOZ_WEBRTC
   , mFarendObserverRef(nullptr)
 #endif
   , mSelfRef(this)
   , mOutputChannels(std::min<uint32_t>(8, CubebUtils::MaxNumberOfChannels()))
 #ifdef DEBUG
   , mCanRunMessagesSynchronously(false)
 #endif
--- a/dom/media/MediaStreamGraphImpl.h
+++ b/dom/media/MediaStreamGraphImpl.h
@@ -812,16 +812,17 @@ public:
    */
   bool mStreamOrderDirty;
   /**
    * Hold a ref to the Latency logger
    */
   RefPtr<AsyncLatencyLogger> mLatencyLog;
   AudioMixer mMixer;
   const RefPtr<AbstractThread> mAbstractMainThread;
+  RefPtr<SharedThreadPool> mThreadPool;
 #ifdef MOZ_WEBRTC
   RefPtr<AudioOutputObserver> mFarendObserverRef;
 #endif
 
   // used to limit graph shutdown time
   // Only accessed on the main thread.
   nsCOMPtr<nsITimer> mShutdownTimer;
 
--- a/dom/media/systemservices/MediaUtils.h
+++ b/dom/media/systemservices/MediaUtils.h
@@ -14,16 +14,18 @@
 #include "mozilla/RefPtr.h"
 #include "mozilla/SharedThreadPool.h"
 #include "mozilla/UniquePtr.h"
 #include "nsCOMPtr.h"
 #include "nsIAsyncShutdown.h"
 #include "nsISupportsImpl.h"
 #include "nsThreadUtils.h"
 
+class nsIEventTarget;
+
 namespace mozilla {
 namespace media {
 
 /*
  * media::Pledge - A promise-like pattern for c++ that takes lambda functions.
  *
  * Asynchronous APIs that proxy to another thread or to the chrome process and
  * back may find it useful to return a pledge to callers who then use
@@ -424,23 +426,23 @@ private:
  * Resolve/Reject function.
  */
 template<typename ResolveValueType,
          typename RejectValueType,
          typename ResolveFunction,
          typename RejectFunction>
 void
 Await(
+  already_AddRefed<nsIEventTarget> aPool,
   RefPtr<MozPromise<ResolveValueType, RejectValueType, true>> aPromise,
   ResolveFunction&& aResolveFunction,
   RejectFunction&& aRejectFunction)
 {
   Monitor mon(__func__);
-  RefPtr<AutoTaskQueue> taskQueue = new AutoTaskQueue(
-    SharedThreadPool::Get(NS_LITERAL_CSTRING("AwaitMozPromise")));
+  RefPtr<AutoTaskQueue> taskQueue = new AutoTaskQueue(Move(aPool));
   bool done = false;
 
   aPromise->Then(taskQueue,
                  __func__,
                  [&](ResolveValueType&& aResolveValue) {
                    MonitorAutoLock lock(mon);
                    aResolveFunction(Forward<ResolveValueType>(aResolveValue));
                    done = true;
@@ -457,21 +459,21 @@ Await(
   while (!done) {
     mon.Wait();
   }
 }
 
 template<typename ResolveValueType, typename RejectValueType, bool Excl>
 typename MozPromise<ResolveValueType, RejectValueType, Excl>::
   ResolveOrRejectValue
-Await(RefPtr<MozPromise<ResolveValueType, RejectValueType, Excl>> aPromise)
+Await(already_AddRefed<nsIEventTarget> aPool,
+      RefPtr<MozPromise<ResolveValueType, RejectValueType, Excl>> aPromise)
 {
   Monitor mon(__func__);
-  RefPtr<AutoTaskQueue> taskQueue = new AutoTaskQueue(
-    SharedThreadPool::Get(NS_LITERAL_CSTRING("AwaitMozPromise")));
+  RefPtr<AutoTaskQueue> taskQueue = new AutoTaskQueue(Move(aPool));
   bool done = false;
 
   typename MozPromise<ResolveValueType, RejectValueType, Excl>::ResolveOrRejectValue val;
   aPromise->Then(taskQueue,
                  __func__,
                  [&](ResolveValueType aResolveValue) {
                    val.SetResolve(Move(aResolveValue));
                    MonitorAutoLock lock(mon);
@@ -497,41 +499,43 @@ Await(RefPtr<MozPromise<ResolveValueType
  * Similar to Await, takes an array of promises of the same type.
  * MozPromise::All is used to handle the resolution/rejection of the promises.
  */
 template<typename ResolveValueType,
          typename RejectValueType,
          typename ResolveFunction,
          typename RejectFunction>
 void
-AwaitAll(nsTArray<RefPtr<MozPromise<ResolveValueType, RejectValueType, true>>>&
+AwaitAll(already_AddRefed<nsIEventTarget> aPool,
+         nsTArray<RefPtr<MozPromise<ResolveValueType, RejectValueType, true>>>&
            aPromises,
          ResolveFunction&& aResolveFunction,
          RejectFunction&& aRejectFunction)
 {
   typedef MozPromise<ResolveValueType, RejectValueType, true> Promise;
-  RefPtr<AutoTaskQueue> taskQueue = new AutoTaskQueue(
-    SharedThreadPool::Get(NS_LITERAL_CSTRING("AwaitMozPromise")));
+  RefPtr<nsIEventTarget> pool = aPool;
+  RefPtr<AutoTaskQueue> taskQueue = new AutoTaskQueue(do_AddRef(pool));
   RefPtr<typename Promise::AllPromiseType> p = Promise::All(taskQueue, aPromises);
-  Await(p, Move(aResolveFunction), Move(aRejectFunction));
+  Await(pool.forget(), p, Move(aResolveFunction), Move(aRejectFunction));
 }
 
 // Note: only works with exclusive MozPromise, as Promise::All would attempt
 // to perform copy of nsTArrays which are disallowed.
 template<typename ResolveValueType, typename RejectValueType>
 typename MozPromise<ResolveValueType,
                     RejectValueType,
                     true>::AllPromiseType::ResolveOrRejectValue
-AwaitAll(nsTArray<RefPtr<MozPromise<ResolveValueType, RejectValueType, true>>>&
+AwaitAll(already_AddRefed<nsIEventTarget> aPool,
+         nsTArray<RefPtr<MozPromise<ResolveValueType, RejectValueType, true>>>&
            aPromises)
 {
   typedef MozPromise<ResolveValueType, RejectValueType, true> Promise;
-  RefPtr<AutoTaskQueue> taskQueue = new AutoTaskQueue(
-    SharedThreadPool::Get(NS_LITERAL_CSTRING("AwaitMozPromise")));
+  RefPtr<nsIEventTarget> pool = aPool;
+  RefPtr<AutoTaskQueue> taskQueue = new AutoTaskQueue(do_AddRef(pool));
   RefPtr<typename Promise::AllPromiseType> p =
     Promise::All(taskQueue, aPromises);
-  return Await(p);
+  return Await(pool.forget(), p);
 }
 
 } // namespace media
 } // namespace mozilla
 
 #endif // mozilla_MediaUtils_h
--- a/media/webrtc/signaling/src/media-conduit/WebrtcMediaDataDecoderCodec.cpp
+++ b/media/webrtc/signaling/src/media-conduit/WebrtcMediaDataDecoderCodec.cpp
@@ -9,19 +9,19 @@
 #include "VideoUtils.h"
 #include "mozilla/media/MediaUtils.h"
 #include "mozilla/layers/ImageBridgeChild.h"
 #include "webrtc/base/keep_ref_until_done.h"
 
 namespace mozilla {
 
 WebrtcMediaDataDecoder::WebrtcMediaDataDecoder()
-  : mTaskQueue(
-      new TaskQueue(GetMediaThreadPool(MediaThreadType::PLATFORM_DECODER),
-                    "WebrtcMediaDataDecoder::mTaskQueue"))
+  : mThreadPool(GetMediaThreadPool(MediaThreadType::PLATFORM_DECODER))
+  , mTaskQueue(new TaskQueue(do_AddRef(mThreadPool),
+                             "WebrtcMediaDataDecoder::mTaskQueue"))
   , mImageContainer(layers::LayerManager::CreateImageContainer(
       layers::ImageContainer::ASYNCHRONOUS))
   , mFactory(new PDMFactory())
 {
 }
 
 WebrtcMediaDataDecoder::~WebrtcMediaDataDecoder()
 {
@@ -63,17 +63,18 @@ WebrtcMediaDataDecoder::InitDecode(const
       mTrackType,
       mImageContainer,
       knowsCompositor });
 
   if (!mDecoder) {
     return WEBRTC_VIDEO_CODEC_ERROR;
   }
 
-  media::Await(mDecoder->Init(),
+  media::Await(do_AddRef(mThreadPool),
+               mDecoder->Init(),
                [](TrackInfo::TrackType) {},
                [&](const MediaResult& aError) { mError = aError; });
 
   return NS_SUCCEEDED(mError) ? WEBRTC_VIDEO_CODEC_OK : WEBRTC_VIDEO_CODEC_ERROR;
 }
 
 int32_t
 WebrtcMediaDataDecoder::Decode(
@@ -111,17 +112,18 @@ WebrtcMediaDataDecoder::Decode(
 
   compressedFrame->mTime =
     media::TimeUnit::FromMicroseconds(aInputImage._timeStamp);
   compressedFrame->mTimecode =
     media::TimeUnit::FromMicroseconds(aRenderTimeMs * 1000);
   compressedFrame->mKeyframe =
     aInputImage._frameType == webrtc::FrameType::kVideoFrameKey;
   {
-    media::Await(mDecoder->Decode(compressedFrame),
+    media::Await(do_AddRef(mThreadPool),
+                 mDecoder->Decode(compressedFrame),
                  [&](const MediaDataDecoder::DecodedData& aResults) {
                    mResults = aResults;
                  },
                  [&](const MediaResult& aError) { mError = aError; });
 
     for (auto& frame : mResults) {
       MOZ_ASSERT(frame->mType == MediaData::VIDEO_DATA);
       RefPtr<VideoData> video = frame->As<VideoData>();
@@ -156,17 +158,17 @@ WebrtcMediaDataDecoder::RegisterDecodeCo
 int32_t
 WebrtcMediaDataDecoder::Release()
 {
   RefPtr<ShutdownPromise> p =
     mDecoder->Flush()->Then(mTaskQueue,
                             __func__,
                             [this]() { return mDecoder->Shutdown(); },
                             [this]() { return mDecoder->Shutdown(); });
-  media::Await(p);
+  media::Await(do_AddRef(mThreadPool), p);
 
   mDecoder = nullptr;
   mNeedKeyframe = true;
 
   return WEBRTC_VIDEO_CODEC_OK;
 }
 
 bool
--- a/media/webrtc/signaling/src/media-conduit/WebrtcMediaDataDecoderCodec.h
+++ b/media/webrtc/signaling/src/media-conduit/WebrtcMediaDataDecoderCodec.h
@@ -17,16 +17,17 @@ namespace webrtc {
 }
 namespace mozilla {
 namespace layers {
   class Image;
   class ImageContainer;
 }
 
 class PDMFactory;
+class SharedThreadPool;
 class TaskQueue;
 
 class ImageBuffer : public webrtc::NativeHandleBuffer
 {
 public:
   explicit ImageBuffer(RefPtr<layers::Image>&& aImage);
   rtc::scoped_refptr<VideoFrameBuffer> NativeToI420Buffer() override;
 
@@ -57,16 +58,17 @@ public:
   int32_t Release() override;
 
 private:
   ~WebrtcMediaDataDecoder();
   void QueueFrame(MediaRawData* aFrame);
   AbstractThread* OwnerThread() const { return mTaskQueue; }
   bool OnTaskQueue() const;
 
+  const RefPtr<SharedThreadPool> mThreadPool;
   const RefPtr<TaskQueue> mTaskQueue;
   const RefPtr<layers::ImageContainer> mImageContainer;
   const RefPtr<PDMFactory> mFactory;
   RefPtr<MediaDataDecoder> mDecoder;
   webrtc::DecodedImageCallback* mCallback = nullptr;
   VideoInfo mInfo;
   TrackInfo::TrackType mTrackType;
   bool mNeedKeyframe = true;