Bug 1214710 - [1.11] Implement ReaderQueue for simultaneous decoder limit enforcement. r=jya,jwwang
authorEugen Sawin <esawin@mozilla.com>
Wed, 08 Jun 2016 14:28:24 +0200
changeset 301170 8b94fc162494c7d2c874cc9ada5814e2adf3eee4
parent 301158 051765f8237daf5da7ba0d3e97da16668ce9988c
child 301171 42747e600949d28d8d7ceee436a73b78abe33f51
push id19621
push usercbook@mozilla.com
push dateThu, 09 Jun 2016 10:14:20 +0000
treeherderfx-team@ac4dd317658c [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersjya, jwwang
bugs1214710
milestone50.0a1
Bug 1214710 - [1.11] Implement ReaderQueue for simultaneous decoder limit enforcement. r=jya,jwwang
dom/media/MediaDecoderReader.cpp
dom/media/MediaDecoderReader.h
dom/media/MediaFormatReader.cpp
dom/media/MediaFormatReader.h
dom/media/MediaPrefs.h
--- a/dom/media/MediaDecoderReader.cpp
+++ b/dom/media/MediaDecoderReader.cpp
@@ -4,21 +4,24 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "MediaDecoderReader.h"
 #include "AbstractMediaDecoder.h"
 #include "MediaResource.h"
 #include "VideoUtils.h"
 #include "ImageContainer.h"
+#include "MediaPrefs.h"
 
 #include "nsPrintfCString.h"
 #include "mozilla/mozalloc.h"
+#include "mozilla/Mutex.h"
 #include <stdint.h>
 #include <algorithm>
+#include <list>
 
 using namespace mozilla::media;
 
 namespace mozilla {
 
 // Un-comment to enable logging of seek bisections.
 //#define SEEK_LOGGING
 
@@ -57,38 +60,189 @@ public:
     const AudioData* audioData = static_cast<const AudioData*>(aObject);
     mSize += audioData->SizeOfIncludingThis(MallocSizeOf);
     return nullptr;
   }
 
   size_t mSize;
 };
 
+// The ReaderQueue is used to keep track of the numer of active readers to
+// enforce a given limit on the number of simultaneous active decoders.
+// Readers are added/removed during construction/destruction and are
+// suspended and resumed by the queue. The max number of active decoders is
+// controlled by the "media.decoder.limit" pref.
+class ReaderQueue
+{
+public:
+  static ReaderQueue& Instance()
+  {
+    static StaticMutex sMutex;
+    StaticMutexAutoLock lock(sMutex);
+
+    if (!sInstance) {
+      sInstance = new ReaderQueue;
+      sInstance->MaxNumActive(MediaPrefs::MediaDecoderLimit());
+      ClearOnShutdown(&sInstance);
+    }
+    MOZ_ASSERT(sInstance);
+    return *sInstance;
+  }
+
+  void MaxNumActive(int32_t aNumActive)
+  {
+    MutexAutoLock lock(mMutex);
+
+    if (aNumActive < 0) {
+      mNumMaxActive = std::numeric_limits<uint32_t>::max();
+    } else {
+      mNumMaxActive = aNumActive;
+    }
+  }
+
+  void Add(MediaDecoderReader* aReader)
+  {
+    MutexAutoLock lock(mMutex);
+
+    if (mActive.Length() < mNumMaxActive) {
+      // Below active limit, resume the new reader.
+      mActive.AppendElement(aReader);
+      DispatchResume(aReader);
+    } else if (mActive.IsEmpty()) {
+      MOZ_ASSERT(mNumMaxActive == 0);
+      mSuspended.AppendElement(aReader);
+    } else {
+      // We're past the active limit, suspend an old reader and resume the new.
+      mActive.AppendElement(aReader);
+      MediaDecoderReader* suspendReader = mActive.ElementAt(0);
+      mSuspended.AppendElement(suspendReader);
+      mActive.RemoveElementAt(0);
+      DispatchSuspendResume(suspendReader, aReader);
+    }
+  }
+
+  void Remove(MediaDecoderReader* aReader)
+  {
+    MutexAutoLock lock(mMutex);
+
+    if (aReader->IsSuspended()) {
+      // Removing suspended readers has no immediate side-effects.
+      DebugOnly<bool> result = mSuspended.RemoveElement(aReader);
+      MOZ_ASSERT(result, "Suspended reader must be in mSuspended");
+    } else {
+      // For each removed active reader, we resume a suspended one.
+      DebugOnly<bool> result = mActive.RemoveElement(aReader);
+      MOZ_ASSERT(result, "Non-suspended reader must be in mActive");
+      if (mSuspended.IsEmpty()) {
+        return;
+      }
+      MediaDecoderReader* resumeReader = mSuspended.LastElement();
+      mActive.AppendElement(resumeReader);
+      mSuspended.RemoveElementAt(mSuspended.Length() - 1);
+      DispatchResume(resumeReader);
+    }
+  }
+
+private:
+  ReaderQueue()
+    : mNumMaxActive(std::numeric_limits<uint32_t>::max())
+    , mMutex("ReaderQueue:mMutex")
+  {
+  }
+
+  static void Resume(MediaDecoderReader* aReader)
+  {
+    if (!aReader->IsSuspended()) {
+      return;
+    }
+    aReader->SetIsSuspended(false);
+  }
+
+  static void Suspend(MediaDecoderReader* aReader)
+  {
+    if (aReader->IsSuspended()) {
+      return;
+    }
+    aReader->SetIsSuspended(true);
+
+    aReader->ReleaseMediaResources();
+  }
+
+  static void DispatchResume(MediaDecoderReader* aReader)
+  {
+    RefPtr<MediaDecoderReader> reader = aReader;
+
+    nsCOMPtr<nsIRunnable> task = NS_NewRunnableFunction(
+      [reader]() {
+        Resume(reader);
+    });
+    reader->OwnerThread()->Dispatch(task.forget());
+  }
+
+  static void DispatchSuspend(MediaDecoderReader* aReader)
+  {
+    RefPtr<MediaDecoderReader> reader = aReader;
+
+    nsCOMPtr<nsIRunnable> task = NS_NewRunnableFunction(
+      [reader]() {
+        Suspend(reader);
+    });
+    reader->OwnerThread()->Dispatch(task.forget());
+  }
+
+  static void DispatchSuspendResume(MediaDecoderReader* aSuspend,
+                                    MediaDecoderReader* aResume)
+  {
+    RefPtr<MediaDecoderReader> suspend = aSuspend;
+    RefPtr<MediaDecoderReader> resume = aResume;
+
+    nsCOMPtr<nsIRunnable> task = NS_NewRunnableFunction(
+      [suspend, resume] () {
+        Suspend(suspend);
+        DispatchResume(resume);
+    });
+    suspend->OwnerThread()->Dispatch(task.forget());
+  }
+
+  static StaticAutoPtr<ReaderQueue> sInstance;
+
+  nsTArray<RefPtr<MediaDecoderReader>> mActive;
+  nsTArray<RefPtr<MediaDecoderReader>> mSuspended;
+  uint32_t mNumMaxActive;
+
+  mutable Mutex mMutex;
+};
+
+StaticAutoPtr<ReaderQueue> ReaderQueue::sInstance;
+
 MediaDecoderReader::MediaDecoderReader(AbstractMediaDecoder* aDecoder)
   : mAudioCompactor(mAudioQueue)
   , mDecoder(aDecoder)
   , mTaskQueue(new TaskQueue(GetMediaThreadPool(MediaThreadType::PLAYBACK),
                              /* aSupportsTailDispatch = */ true))
   , mWatchManager(this, mTaskQueue)
   , mBuffered(mTaskQueue, TimeIntervals(), "MediaDecoderReader::mBuffered (Canonical)")
   , mDuration(mTaskQueue, NullableTimeUnit(), "MediaDecoderReader::mDuration (Mirror)")
   , mIgnoreAudioOutputFormat(false)
   , mHitAudioDecodeError(false)
   , mShutdown(false)
   , mAudioDiscontinuity(false)
   , mVideoDiscontinuity(false)
+  , mIsSuspended(true)
 {
   MOZ_COUNT_CTOR(MediaDecoderReader);
   MOZ_ASSERT(NS_IsMainThread());
 
   if (mDecoder && mDecoder->DataArrivedEvent()) {
     mDataArrivedListener = mDecoder->DataArrivedEvent()->Connect(
       mTaskQueue, this, &MediaDecoderReader::NotifyDataArrived);
   }
 
+  ReaderQueue::Instance().Add(this);
+
   // Dispatch initialization that needs to happen on that task queue.
   mTaskQueue->Dispatch(NewRunnableMethod(this, &MediaDecoderReader::InitializationTask));
 }
 
 void
 MediaDecoderReader::InitializationTask()
 {
   if (!mDecoder) {
@@ -371,16 +525,18 @@ MediaDecoderReader::Shutdown()
 
   // Shut down the watch manager before shutting down our task queue.
   mWatchManager.Shutdown();
 
   RefPtr<ShutdownPromise> p;
 
   mDecoder = nullptr;
 
+  ReaderQueue::Instance().Remove(this);
+
   return mTaskQueue->BeginShutdown();
 }
 
 } // namespace mozilla
 
 #undef DECODER_LOG
 #undef DECODER_WARN
 #undef DECODER_WARN_HELPER
--- a/dom/media/MediaDecoderReader.h
+++ b/dom/media/MediaDecoderReader.h
@@ -285,16 +285,28 @@ public:
   TimedMetadataEventSource& TimedMetadataEvent()
   {
     return mTimedMetadataEvent;
   }
 
   // Notified by the OggReader during playback when chained ogg is detected.
   MediaEventSource<void>& OnMediaNotSeekable() { return mOnMediaNotSeekable; }
 
+  bool IsSuspended() const
+  {
+    MOZ_ASSERT(OnTaskQueue());
+    return mIsSuspended;
+  }
+
+  void SetIsSuspended(bool aState)
+  {
+    MOZ_ASSERT(OnTaskQueue());
+    mIsSuspended = aState;
+  }
+
 protected:
   virtual ~MediaDecoderReader();
 
   // Populates aBuffered with the time ranges which are buffered. This may only
   // be called on the decode task queue, and should only be used internally by
   // UpdateBuffered - mBuffered (or mirrors of it) should be used for everything
   // else.
   //
@@ -430,15 +442,16 @@ private:
   // of Request{Audio,Video}Data.
   MozPromiseHolder<MediaDataPromise> mBaseAudioPromise;
   MozPromiseHolder<MediaDataPromise> mBaseVideoPromise;
 
   // Flags whether a the next audio/video sample comes after a "gap" or
   // "discontinuity" in the stream. For example after a seek.
   bool mAudioDiscontinuity;
   bool mVideoDiscontinuity;
+  bool mIsSuspended;
 
   MediaEventListener mDataArrivedListener;
 };
 
 } // namespace mozilla
 
 #endif
--- a/dom/media/MediaFormatReader.cpp
+++ b/dom/media/MediaFormatReader.cpp
@@ -130,17 +130,16 @@ MediaFormatReader::Shutdown()
   if (mVideo.mTaskQueue) {
     mVideo.mTaskQueue->BeginShutdown();
     mVideo.mTaskQueue->AwaitShutdownAndIdle();
     mVideo.mTaskQueue = nullptr;
   }
   MOZ_ASSERT(!mVideo.HasPromise());
 
   mDemuxer = nullptr;
-
   mPlatform = nullptr;
 
   return MediaDecoderReader::Shutdown();
 }
 
 void
 MediaFormatReader::InitLayersBackendType()
 {
@@ -454,16 +453,21 @@ MediaFormatReader::EnsureDecoderInitiali
     return true;
   }
   RefPtr<MediaFormatReader> self = this;
   decoder.mInitPromise.Begin(decoder.mDecoder->Init()
        ->Then(OwnerThread(), __func__,
               [self] (TrackType aTrack) {
                 auto& decoder = self->GetDecoderData(aTrack);
                 decoder.mInitPromise.Complete();
+
+                if (self->IsSuspended()) {
+                  return;
+                }
+
                 decoder.mDecoderInitialized = true;
                 MonitorAutoLock mon(decoder.mMonitor);
                 decoder.mDescription = decoder.mDecoder->GetDescriptionName();
                 self->SetVideoDecodeThreshold();
                 self->ScheduleUpdate(aTrack);
               },
               [self, aTrack] (MediaDataDecoder::DecoderFailureReason aResult) {
                 auto& decoder = self->GetDecoderData(aTrack);
@@ -528,16 +532,20 @@ MediaFormatReader::RequestVideoData(bool
     return MediaDataPromise::CreateAndReject(CANCELED, __func__);
   }
 
   if (mShutdown) {
     NS_WARNING("RequestVideoData on shutdown MediaFormatReader!");
     return MediaDataPromise::CreateAndReject(CANCELED, __func__);
   }
 
+  if (IsSuspended()) {
+    return MediaDataPromise::CreateAndReject(CANCELED, __func__);
+  }
+
   media::TimeUnit timeThreshold{media::TimeUnit::FromMicroseconds(aTimeThreshold)};
   // Ensure we have no pending seek going as ShouldSkip could return out of date
   // information.
   if (!mVideo.HasInternalSeekPending() &&
       ShouldSkip(aSkipToNextKeyframe, timeThreshold)) {
     RefPtr<MediaDataPromise> p = mVideo.EnsurePromise(__func__);
     SkipVideoDemuxToNextKeyFrame(timeThreshold);
     return p;
@@ -619,16 +627,20 @@ MediaFormatReader::RequestAudioData()
   MOZ_DIAGNOSTIC_ASSERT(IsVideoSeeking() || !IsSeeking(), "called mid-seek");
   LOGV("");
 
   if (!HasAudio()) {
     LOG("called with no audio track");
     return MediaDataPromise::CreateAndReject(DECODE_ERROR, __func__);
   }
 
+  if (IsSuspended()) {
+    return MediaDataPromise::CreateAndReject(CANCELED, __func__);
+  }
+
   if (IsSeeking()) {
     LOG("called mid-seek. Rejecting.");
     return MediaDataPromise::CreateAndReject(CANCELED, __func__);
   }
 
   if (mShutdown) {
     NS_WARNING("RequestAudioData on shutdown MediaFormatReader!");
     return MediaDataPromise::CreateAndReject(CANCELED, __func__);
@@ -913,16 +925,17 @@ MediaFormatReader::DecodeDemuxedSamples(
   return true;
 }
 
 void
 MediaFormatReader::HandleDemuxedSamples(TrackType aTrack,
                                         AbstractMediaDecoder::AutoNotifyDecoded& aA)
 {
   MOZ_ASSERT(OnTaskQueue());
+
   auto& decoder = GetDecoderData(aTrack);
 
   if (decoder.mQueuedSamples.IsEmpty()) {
     return;
   }
 
   if (!EnsureDecoderCreated(aTrack)) {
     NS_WARNING("Error constructing decoders");
@@ -1879,16 +1892,19 @@ void MediaFormatReader::ReleaseMediaReso
 {
   // Before freeing a video codec, all video buffers needed to be released
   // even from graphics pipeline.
   if (mVideoFrameContainer) {
     mVideoFrameContainer->ClearCurrentFrame();
   }
   mVideo.mInitPromise.DisconnectIfExists();
   mVideo.ShutdownDecoder();
+
+  mAudio.mInitPromise.DisconnectIfExists();
+  mAudio.ShutdownDecoder();
 }
 
 bool
 MediaFormatReader::VideoIsHardwareAccelerated() const
 {
   return mVideo.mIsHardwareAccelerated;
 }
 
--- a/dom/media/MediaFormatReader.h
+++ b/dom/media/MediaFormatReader.h
@@ -96,16 +96,17 @@ public:
   void SetCDMProxy(CDMProxy* aProxy) override;
 #endif
 
   // Returns a string describing the state of the decoder data.
   // Used for debugging purposes.
   void GetMozDebugReaderData(nsAString& aString);
 
 private:
+
   bool HasVideo() { return mVideo.mTrackDemuxer; }
   bool HasAudio() { return mAudio.mTrackDemuxer; }
 
   bool IsWaitingOnCDMResource();
 
   bool InitDemuxer();
   // Notify the demuxer that new data has been received.
   // The next queued task calling GetBuffered() is guaranteed to have up to date
--- a/dom/media/MediaPrefs.h
+++ b/dom/media/MediaPrefs.h
@@ -115,16 +115,17 @@ private:
   DECL_MEDIA_PREF("media.webspeech.synth.force_global_queue", WebSpeechForceGlobal, bool, false);
   DECL_MEDIA_PREF("media.webspeech.test.enable",              WebSpeechTestEnabled, bool, false);
   DECL_MEDIA_PREF("media.webspeech.test.fake_fsm_events",     WebSpeechFakeFSMEvents, bool, false);
   DECL_MEDIA_PREF(TEST_PREFERENCE_FAKE_RECOGNITION_SERVICE,   WebSpeechFakeRecognitionService, bool, false);
   DECL_MEDIA_PREF("media.webspeech.recognition.enable",       WebSpeechRecognitionEnabled, bool, false);
   DECL_MEDIA_PREF("media.webspeech.recognition.force_enable", WebSpeechRecognitionForceEnabled, bool, false);
 
   DECL_MEDIA_PREF("media.num-decode-threads",                 MediaThreadPoolDefaultCount, uint32_t, 4);
+  DECL_MEDIA_PREF("media.decoder.limit",                      MediaDecoderLimit, uint32_t, -1);
 
 public:
   // Manage the singleton:
   static MediaPrefs& GetSingleton();
   static bool SingletonExists();
 
 private:
   template<class T> friend class StaticAutoPtr;