Bug 1214710 - [1.11] Implement ReaderQueue for simultaneous decoder limit enforcement. r=jya,jwwang
authorEugen Sawin <esawin@mozilla.com>
Wed, 08 Jun 2016 14:28:24 +0200
changeset 301104 8b94fc162494c7d2c874cc9ada5814e2adf3eee4
parent 301103 051765f8237daf5da7ba0d3e97da16668ce9988c
child 301105 42747e600949d28d8d7ceee436a73b78abe33f51
push id78206
push useresawin@mozilla.com
push dateWed, 08 Jun 2016 16:05:54 +0000
treeherdermozilla-inbound@42747e600949 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersjya, jwwang
bugs1214710
milestone50.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1214710 - [1.11] Implement ReaderQueue for simultaneous decoder limit enforcement. r=jya,jwwang
dom/media/MediaDecoderReader.cpp
dom/media/MediaDecoderReader.h
dom/media/MediaFormatReader.cpp
dom/media/MediaFormatReader.h
dom/media/MediaPrefs.h
--- a/dom/media/MediaDecoderReader.cpp
+++ b/dom/media/MediaDecoderReader.cpp
@@ -4,21 +4,24 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "MediaDecoderReader.h"
 #include "AbstractMediaDecoder.h"
 #include "MediaResource.h"
 #include "VideoUtils.h"
 #include "ImageContainer.h"
+#include "MediaPrefs.h"
 
 #include "nsPrintfCString.h"
 #include "mozilla/mozalloc.h"
+#include "mozilla/Mutex.h"
 #include <stdint.h>
 #include <algorithm>
+#include <list>
 
 using namespace mozilla::media;
 
 namespace mozilla {
 
 // Un-comment to enable logging of seek bisections.
 //#define SEEK_LOGGING
 
@@ -57,38 +60,189 @@ public:
     const AudioData* audioData = static_cast<const AudioData*>(aObject);
     mSize += audioData->SizeOfIncludingThis(MallocSizeOf);
     return nullptr;
   }
 
   size_t mSize;
 };
 
+// The ReaderQueue is used to keep track of the numer of active readers to
+// enforce a given limit on the number of simultaneous active decoders.
+// Readers are added/removed during construction/destruction and are
+// suspended and resumed by the queue. The max number of active decoders is
+// controlled by the "media.decoder.limit" pref.
+class ReaderQueue
+{
+public:
+  static ReaderQueue& Instance()
+  {
+    static StaticMutex sMutex;
+    StaticMutexAutoLock lock(sMutex);
+
+    if (!sInstance) {
+      sInstance = new ReaderQueue;
+      sInstance->MaxNumActive(MediaPrefs::MediaDecoderLimit());
+      ClearOnShutdown(&sInstance);
+    }
+    MOZ_ASSERT(sInstance);
+    return *sInstance;
+  }
+
+  void MaxNumActive(int32_t aNumActive)
+  {
+    MutexAutoLock lock(mMutex);
+
+    if (aNumActive < 0) {
+      mNumMaxActive = std::numeric_limits<uint32_t>::max();
+    } else {
+      mNumMaxActive = aNumActive;
+    }
+  }
+
+  void Add(MediaDecoderReader* aReader)
+  {
+    MutexAutoLock lock(mMutex);
+
+    if (mActive.Length() < mNumMaxActive) {
+      // Below active limit, resume the new reader.
+      mActive.AppendElement(aReader);
+      DispatchResume(aReader);
+    } else if (mActive.IsEmpty()) {
+      MOZ_ASSERT(mNumMaxActive == 0);
+      mSuspended.AppendElement(aReader);
+    } else {
+      // We're past the active limit, suspend an old reader and resume the new.
+      mActive.AppendElement(aReader);
+      MediaDecoderReader* suspendReader = mActive.ElementAt(0);
+      mSuspended.AppendElement(suspendReader);
+      mActive.RemoveElementAt(0);
+      DispatchSuspendResume(suspendReader, aReader);
+    }
+  }
+
+  void Remove(MediaDecoderReader* aReader)
+  {
+    MutexAutoLock lock(mMutex);
+
+    if (aReader->IsSuspended()) {
+      // Removing suspended readers has no immediate side-effects.
+      DebugOnly<bool> result = mSuspended.RemoveElement(aReader);
+      MOZ_ASSERT(result, "Suspended reader must be in mSuspended");
+    } else {
+      // For each removed active reader, we resume a suspended one.
+      DebugOnly<bool> result = mActive.RemoveElement(aReader);
+      MOZ_ASSERT(result, "Non-suspended reader must be in mActive");
+      if (mSuspended.IsEmpty()) {
+        return;
+      }
+      MediaDecoderReader* resumeReader = mSuspended.LastElement();
+      mActive.AppendElement(resumeReader);
+      mSuspended.RemoveElementAt(mSuspended.Length() - 1);
+      DispatchResume(resumeReader);
+    }
+  }
+
+private:
+  ReaderQueue()
+    : mNumMaxActive(std::numeric_limits<uint32_t>::max())
+    , mMutex("ReaderQueue:mMutex")
+  {
+  }
+
+  static void Resume(MediaDecoderReader* aReader)
+  {
+    if (!aReader->IsSuspended()) {
+      return;
+    }
+    aReader->SetIsSuspended(false);
+  }
+
+  static void Suspend(MediaDecoderReader* aReader)
+  {
+    if (aReader->IsSuspended()) {
+      return;
+    }
+    aReader->SetIsSuspended(true);
+
+    aReader->ReleaseMediaResources();
+  }
+
+  static void DispatchResume(MediaDecoderReader* aReader)
+  {
+    RefPtr<MediaDecoderReader> reader = aReader;
+
+    nsCOMPtr<nsIRunnable> task = NS_NewRunnableFunction(
+      [reader]() {
+        Resume(reader);
+    });
+    reader->OwnerThread()->Dispatch(task.forget());
+  }
+
+  static void DispatchSuspend(MediaDecoderReader* aReader)
+  {
+    RefPtr<MediaDecoderReader> reader = aReader;
+
+    nsCOMPtr<nsIRunnable> task = NS_NewRunnableFunction(
+      [reader]() {
+        Suspend(reader);
+    });
+    reader->OwnerThread()->Dispatch(task.forget());
+  }
+
+  static void DispatchSuspendResume(MediaDecoderReader* aSuspend,
+                                    MediaDecoderReader* aResume)
+  {
+    RefPtr<MediaDecoderReader> suspend = aSuspend;
+    RefPtr<MediaDecoderReader> resume = aResume;
+
+    nsCOMPtr<nsIRunnable> task = NS_NewRunnableFunction(
+      [suspend, resume] () {
+        Suspend(suspend);
+        DispatchResume(resume);
+    });
+    suspend->OwnerThread()->Dispatch(task.forget());
+  }
+
+  static StaticAutoPtr<ReaderQueue> sInstance;
+
+  nsTArray<RefPtr<MediaDecoderReader>> mActive;
+  nsTArray<RefPtr<MediaDecoderReader>> mSuspended;
+  uint32_t mNumMaxActive;
+
+  mutable Mutex mMutex;
+};
+
+StaticAutoPtr<ReaderQueue> ReaderQueue::sInstance;
+
 MediaDecoderReader::MediaDecoderReader(AbstractMediaDecoder* aDecoder)
   : mAudioCompactor(mAudioQueue)
   , mDecoder(aDecoder)
   , mTaskQueue(new TaskQueue(GetMediaThreadPool(MediaThreadType::PLAYBACK),
                              /* aSupportsTailDispatch = */ true))
   , mWatchManager(this, mTaskQueue)
   , mBuffered(mTaskQueue, TimeIntervals(), "MediaDecoderReader::mBuffered (Canonical)")
   , mDuration(mTaskQueue, NullableTimeUnit(), "MediaDecoderReader::mDuration (Mirror)")
   , mIgnoreAudioOutputFormat(false)
   , mHitAudioDecodeError(false)
   , mShutdown(false)
   , mAudioDiscontinuity(false)
   , mVideoDiscontinuity(false)
+  , mIsSuspended(true)
 {
   MOZ_COUNT_CTOR(MediaDecoderReader);
   MOZ_ASSERT(NS_IsMainThread());
 
   if (mDecoder && mDecoder->DataArrivedEvent()) {
     mDataArrivedListener = mDecoder->DataArrivedEvent()->Connect(
       mTaskQueue, this, &MediaDecoderReader::NotifyDataArrived);
   }
 
+  ReaderQueue::Instance().Add(this);
+
   // Dispatch initialization that needs to happen on that task queue.
   mTaskQueue->Dispatch(NewRunnableMethod(this, &MediaDecoderReader::InitializationTask));
 }
 
 void
 MediaDecoderReader::InitializationTask()
 {
   if (!mDecoder) {
@@ -371,16 +525,18 @@ MediaDecoderReader::Shutdown()
 
   // Shut down the watch manager before shutting down our task queue.
   mWatchManager.Shutdown();
 
   RefPtr<ShutdownPromise> p;
 
   mDecoder = nullptr;
 
+  ReaderQueue::Instance().Remove(this);
+
   return mTaskQueue->BeginShutdown();
 }
 
 } // namespace mozilla
 
 #undef DECODER_LOG
 #undef DECODER_WARN
 #undef DECODER_WARN_HELPER
--- a/dom/media/MediaDecoderReader.h
+++ b/dom/media/MediaDecoderReader.h
@@ -285,16 +285,28 @@ public:
   TimedMetadataEventSource& TimedMetadataEvent()
   {
     return mTimedMetadataEvent;
   }
 
   // Notified by the OggReader during playback when chained ogg is detected.
   MediaEventSource<void>& OnMediaNotSeekable() { return mOnMediaNotSeekable; }
 
+  bool IsSuspended() const
+  {
+    MOZ_ASSERT(OnTaskQueue());
+    return mIsSuspended;
+  }
+
+  void SetIsSuspended(bool aState)
+  {
+    MOZ_ASSERT(OnTaskQueue());
+    mIsSuspended = aState;
+  }
+
 protected:
   virtual ~MediaDecoderReader();
 
   // Populates aBuffered with the time ranges which are buffered. This may only
   // be called on the decode task queue, and should only be used internally by
   // UpdateBuffered - mBuffered (or mirrors of it) should be used for everything
   // else.
   //
@@ -430,15 +442,16 @@ private:
   // of Request{Audio,Video}Data.
   MozPromiseHolder<MediaDataPromise> mBaseAudioPromise;
   MozPromiseHolder<MediaDataPromise> mBaseVideoPromise;
 
   // Flags whether a the next audio/video sample comes after a "gap" or
   // "discontinuity" in the stream. For example after a seek.
   bool mAudioDiscontinuity;
   bool mVideoDiscontinuity;
+  bool mIsSuspended;
 
   MediaEventListener mDataArrivedListener;
 };
 
 } // namespace mozilla
 
 #endif
--- a/dom/media/MediaFormatReader.cpp
+++ b/dom/media/MediaFormatReader.cpp
@@ -130,17 +130,16 @@ MediaFormatReader::Shutdown()
   if (mVideo.mTaskQueue) {
     mVideo.mTaskQueue->BeginShutdown();
     mVideo.mTaskQueue->AwaitShutdownAndIdle();
     mVideo.mTaskQueue = nullptr;
   }
   MOZ_ASSERT(!mVideo.HasPromise());
 
   mDemuxer = nullptr;
-
   mPlatform = nullptr;
 
   return MediaDecoderReader::Shutdown();
 }
 
 void
 MediaFormatReader::InitLayersBackendType()
 {
@@ -454,16 +453,21 @@ MediaFormatReader::EnsureDecoderInitiali
     return true;
   }
   RefPtr<MediaFormatReader> self = this;
   decoder.mInitPromise.Begin(decoder.mDecoder->Init()
        ->Then(OwnerThread(), __func__,
               [self] (TrackType aTrack) {
                 auto& decoder = self->GetDecoderData(aTrack);
                 decoder.mInitPromise.Complete();
+
+                if (self->IsSuspended()) {
+                  return;
+                }
+
                 decoder.mDecoderInitialized = true;
                 MonitorAutoLock mon(decoder.mMonitor);
                 decoder.mDescription = decoder.mDecoder->GetDescriptionName();
                 self->SetVideoDecodeThreshold();
                 self->ScheduleUpdate(aTrack);
               },
               [self, aTrack] (MediaDataDecoder::DecoderFailureReason aResult) {
                 auto& decoder = self->GetDecoderData(aTrack);
@@ -528,16 +532,20 @@ MediaFormatReader::RequestVideoData(bool
     return MediaDataPromise::CreateAndReject(CANCELED, __func__);
   }
 
   if (mShutdown) {
     NS_WARNING("RequestVideoData on shutdown MediaFormatReader!");
     return MediaDataPromise::CreateAndReject(CANCELED, __func__);
   }
 
+  if (IsSuspended()) {
+    return MediaDataPromise::CreateAndReject(CANCELED, __func__);
+  }
+
   media::TimeUnit timeThreshold{media::TimeUnit::FromMicroseconds(aTimeThreshold)};
   // Ensure we have no pending seek going as ShouldSkip could return out of date
   // information.
   if (!mVideo.HasInternalSeekPending() &&
       ShouldSkip(aSkipToNextKeyframe, timeThreshold)) {
     RefPtr<MediaDataPromise> p = mVideo.EnsurePromise(__func__);
     SkipVideoDemuxToNextKeyFrame(timeThreshold);
     return p;
@@ -619,16 +627,20 @@ MediaFormatReader::RequestAudioData()
   MOZ_DIAGNOSTIC_ASSERT(IsVideoSeeking() || !IsSeeking(), "called mid-seek");
   LOGV("");
 
   if (!HasAudio()) {
     LOG("called with no audio track");
     return MediaDataPromise::CreateAndReject(DECODE_ERROR, __func__);
   }
 
+  if (IsSuspended()) {
+    return MediaDataPromise::CreateAndReject(CANCELED, __func__);
+  }
+
   if (IsSeeking()) {
     LOG("called mid-seek. Rejecting.");
     return MediaDataPromise::CreateAndReject(CANCELED, __func__);
   }
 
   if (mShutdown) {
     NS_WARNING("RequestAudioData on shutdown MediaFormatReader!");
     return MediaDataPromise::CreateAndReject(CANCELED, __func__);
@@ -913,16 +925,17 @@ MediaFormatReader::DecodeDemuxedSamples(
   return true;
 }
 
 void
 MediaFormatReader::HandleDemuxedSamples(TrackType aTrack,
                                         AbstractMediaDecoder::AutoNotifyDecoded& aA)
 {
   MOZ_ASSERT(OnTaskQueue());
+
   auto& decoder = GetDecoderData(aTrack);
 
   if (decoder.mQueuedSamples.IsEmpty()) {
     return;
   }
 
   if (!EnsureDecoderCreated(aTrack)) {
     NS_WARNING("Error constructing decoders");
@@ -1879,16 +1892,19 @@ void MediaFormatReader::ReleaseMediaReso
 {
   // Before freeing a video codec, all video buffers needed to be released
   // even from graphics pipeline.
   if (mVideoFrameContainer) {
     mVideoFrameContainer->ClearCurrentFrame();
   }
   mVideo.mInitPromise.DisconnectIfExists();
   mVideo.ShutdownDecoder();
+
+  mAudio.mInitPromise.DisconnectIfExists();
+  mAudio.ShutdownDecoder();
 }
 
 bool
 MediaFormatReader::VideoIsHardwareAccelerated() const
 {
   return mVideo.mIsHardwareAccelerated;
 }
 
--- a/dom/media/MediaFormatReader.h
+++ b/dom/media/MediaFormatReader.h
@@ -96,16 +96,17 @@ public:
   void SetCDMProxy(CDMProxy* aProxy) override;
 #endif
 
   // Returns a string describing the state of the decoder data.
   // Used for debugging purposes.
   void GetMozDebugReaderData(nsAString& aString);
 
 private:
+
   bool HasVideo() { return mVideo.mTrackDemuxer; }
   bool HasAudio() { return mAudio.mTrackDemuxer; }
 
   bool IsWaitingOnCDMResource();
 
   bool InitDemuxer();
   // Notify the demuxer that new data has been received.
   // The next queued task calling GetBuffered() is guaranteed to have up to date
--- a/dom/media/MediaPrefs.h
+++ b/dom/media/MediaPrefs.h
@@ -115,16 +115,17 @@ private:
   DECL_MEDIA_PREF("media.webspeech.synth.force_global_queue", WebSpeechForceGlobal, bool, false);
   DECL_MEDIA_PREF("media.webspeech.test.enable",              WebSpeechTestEnabled, bool, false);
   DECL_MEDIA_PREF("media.webspeech.test.fake_fsm_events",     WebSpeechFakeFSMEvents, bool, false);
   DECL_MEDIA_PREF(TEST_PREFERENCE_FAKE_RECOGNITION_SERVICE,   WebSpeechFakeRecognitionService, bool, false);
   DECL_MEDIA_PREF("media.webspeech.recognition.enable",       WebSpeechRecognitionEnabled, bool, false);
   DECL_MEDIA_PREF("media.webspeech.recognition.force_enable", WebSpeechRecognitionForceEnabled, bool, false);
 
   DECL_MEDIA_PREF("media.num-decode-threads",                 MediaThreadPoolDefaultCount, uint32_t, 4);
+  DECL_MEDIA_PREF("media.decoder.limit",                      MediaDecoderLimit, uint32_t, -1);
 
 public:
   // Manage the singleton:
   static MediaPrefs& GetSingleton();
   static bool SingletonExists();
 
 private:
   template<class T> friend class StaticAutoPtr;