Bug 1399760. P3 - keep ID of the loading channel so we check whether the data callback is from an old channel. r=gerald
authorJW Wang <jwwang@mozilla.com>
Wed, 20 Sep 2017 11:41:08 +0800
changeset 433802 58c5b9d3eab336c3508544ca41f443e8c5e71084
parent 433801 723726a9d8c3d26d5b3cdf7484183f9669fca6ca
child 433803 84c535b614d5f75fa4c805c0ef39b8f39bd25f48
push id1567
push userjlorenzo@mozilla.com
push dateThu, 02 Nov 2017 12:36:05 +0000
treeherdermozilla-release@e512c14a0406 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersgerald
bugs1399760
milestone57.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1399760. P3 - keep ID of the loading channel so we check whether the data callback is from an old channel. r=gerald The load ID works as follows: 1. A load ID is passed to MediaCacheStream::NotifyDataStarted() when loading a new channel. 2. Each MediaCacheStream::NotifyDataReceived() call is also associated with a load ID from which the data is received. 3. If |mLoadID != aLoadID| tests to be true in NotifyDataReceived(), it means the data is from an old channel and should be discarded. 4. MediaCache::Update() reset mLoadID for the stream before calling CacheClientSeek() to prevent data from the old channel from being stored to the wrong position. MozReview-Commit-ID: 9kBoublLlln
dom/media/ChannelMediaResource.cpp
dom/media/ChannelMediaResource.h
dom/media/MediaCache.cpp
dom/media/MediaCache.h
--- a/dom/media/ChannelMediaResource.cpp
+++ b/dom/media/ChannelMediaResource.cpp
@@ -87,17 +87,17 @@ nsresult
 ChannelMediaResource::Listener::OnDataAvailable(nsIRequest* aRequest,
                                                 nsISupports* aContext,
                                                 nsIInputStream* aStream,
                                                 uint64_t aOffset,
                                                 uint32_t aCount)
 {
   // This might happen off the main thread.
   MOZ_DIAGNOSTIC_ASSERT(mResource);
-  return mResource->OnDataAvailable(aRequest, aStream, aCount);
+  return mResource->OnDataAvailable(mLoadID, aStream, aCount);
 }
 
 nsresult
 ChannelMediaResource::Listener::AsyncOnChannelRedirect(
   nsIChannel* aOld,
   nsIChannel* aNew,
   uint32_t aFlags,
   nsIAsyncVerifyRedirectCallback* cb)
@@ -259,17 +259,17 @@ ChannelMediaResource::OnStartRequest(nsI
     // and the server isn't sending Accept-Ranges:bytes then we don't
     // support seeking. We also can't seek in compressed streams.
     seekable = !isCompressed && acceptsRanges;
   } else {
     // Not an HTTP channel. Assume data will be sent from position zero.
     startOffset = 0;
   }
 
-  mCacheStream.NotifyDataStarted(startOffset);
+  mCacheStream.NotifyDataStarted(mLoadID, startOffset);
   mCacheStream.SetTransportSeekable(seekable);
   mChannelStatistics.Start();
   mReopenOnError = false;
 
   mSuspendAgent.UpdateSuspendedStatusIfNeeded();
 
   // Fires an initial progress event.
   owner->DownloadProgressed();
@@ -386,52 +386,56 @@ ChannelMediaResource::OnChannelRedirect(
 {
   mChannel = aNew;
   mSuspendAgent.NotifyChannelOpened(mChannel);
   return SetupChannelHeaders(aOffset);
 }
 
 nsresult
 ChannelMediaResource::CopySegmentToCache(nsIInputStream* aInStream,
-                                         void* aResource,
+                                         void* aClosure,
                                          const char* aFromSegment,
                                          uint32_t aToOffset,
                                          uint32_t aCount,
                                          uint32_t* aWriteCount)
 {
-  ChannelMediaResource* res = static_cast<ChannelMediaResource*>(aResource);
-  res->mCacheStream.NotifyDataReceived(aCount, aFromSegment);
+  Closure* closure = static_cast<Closure*>(aClosure);
+  closure->mResource->mCacheStream.NotifyDataReceived(
+    closure->mLoadID, aCount, aFromSegment);
   *aWriteCount = aCount;
   return NS_OK;
 }
 
 nsresult
-ChannelMediaResource::OnDataAvailable(nsIRequest* aRequest,
+ChannelMediaResource::OnDataAvailable(uint32_t aLoadID,
                                       nsIInputStream* aStream,
                                       uint32_t aCount)
 {
   // This might happen off the main thread.
-  NS_ASSERTION(mChannel.get() == aRequest, "Wrong channel!");
+  // Don't assert |mChannel.get() == aRequest| since reading mChannel here off
+  // the main thread is a data race.
 
   // Update principals before putting the data in the cache. This is important,
   // we want to make sure all principals are updated before any consumer can see
   // the new data.
   // TODO: Handle the case where OnDataAvailable() runs off the main thread.
   UpdatePrincipal();
 
   RefPtr<ChannelMediaResource> self = this;
   nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
     "ChannelMediaResource::OnDataAvailable",
     [self, aCount]() { self->mChannelStatistics.AddBytes(aCount); });
   mCallback->AbstractMainThread()->Dispatch(r.forget());
 
+  Closure closure{ aLoadID, this };
   uint32_t count = aCount;
   while (count > 0) {
     uint32_t read;
-    nsresult rv = aStream->ReadSegments(CopySegmentToCache, this, count, &read);
+    nsresult rv =
+      aStream->ReadSegments(CopySegmentToCache, &closure, count, &read);
     if (NS_FAILED(rv))
       return rv;
     NS_ASSERTION(read > 0, "Read 0 bytes while data was available?");
     count -= read;
   }
 
   return NS_OK;
 }
@@ -452,30 +456,30 @@ ChannelMediaResource::Open(nsIStreamList
   }
 
   nsresult rv = mCacheStream.Init(cl);
   if (NS_FAILED(rv)) {
     return rv;
   }
 
   MOZ_ASSERT(GetOffset() == 0, "Who set offset already?");
-  mListener = new Listener(this, 0);
+  mListener = new Listener(this, 0, ++mLoadID);
   *aStreamListener = mListener;
   NS_ADDREF(*aStreamListener);
   return NS_OK;
 }
 
 nsresult
 ChannelMediaResource::OpenChannel(int64_t aOffset)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mChannel);
   MOZ_ASSERT(!mListener, "Listener should have been removed by now");
 
-  mListener = new Listener(this, aOffset);
+  mListener = new Listener(this, aOffset, ++mLoadID);
   nsresult rv = mChannel->SetNotificationCallbacks(mListener.get());
   NS_ENSURE_SUCCESS(rv, rv);
 
   rv = SetupChannelHeaders(aOffset);
   NS_ENSURE_SUCCESS(rv, rv);
 
   rv = mChannel->AsyncOpen2(mListener);
   NS_ENSURE_SUCCESS(rv, rv);
--- a/dom/media/ChannelMediaResource.h
+++ b/dom/media/ChannelMediaResource.h
@@ -110,44 +110,46 @@ public:
   class Listener final
     : public nsIStreamListener
     , public nsIInterfaceRequestor
     , public nsIChannelEventSink
     , public nsIThreadRetargetableStreamListener
   {
     ~Listener() {}
   public:
-    Listener(ChannelMediaResource* aResource, int64_t aOffset)
+    Listener(ChannelMediaResource* aResource, int64_t aOffset, uint32_t aLoadID)
       : mResource(aResource)
       , mOffset(aOffset)
+      , mLoadID(aLoadID)
     {}
 
     NS_DECL_ISUPPORTS
     NS_DECL_NSIREQUESTOBSERVER
     NS_DECL_NSISTREAMLISTENER
     NS_DECL_NSICHANNELEVENTSINK
     NS_DECL_NSIINTERFACEREQUESTOR
     NS_DECL_NSITHREADRETARGETABLESTREAMLISTENER
 
     void Revoke() { mResource = nullptr; }
 
   private:
     RefPtr<ChannelMediaResource> mResource;
     const int64_t mOffset;
+    const uint32_t mLoadID;
   };
   friend class Listener;
 
   nsresult GetCachedRanges(MediaByteRangeSet& aRanges) override;
 
 protected:
   bool IsSuspendedByCache();
   // These are called on the main thread by Listener.
   nsresult OnStartRequest(nsIRequest* aRequest, int64_t aRequestOffset);
   nsresult OnStopRequest(nsIRequest* aRequest, nsresult aStatus);
-  nsresult OnDataAvailable(nsIRequest* aRequest,
+  nsresult OnDataAvailable(uint32_t aLoadID,
                            nsIInputStream* aStream,
                            uint32_t aCount);
   nsresult OnChannelRedirect(nsIChannel* aOld,
                              nsIChannel* aNew,
                              uint32_t aFlags,
                              int64_t aOffset);
 
   // Opens the channel, using an HTTP byte range request to start at aOffset
@@ -166,25 +168,33 @@ protected:
   // Parses 'Content-Range' header and returns results via parameters.
   // Returns error if header is not available, values are not parse-able or
   // values are out of range.
   nsresult ParseContentRangeHeader(nsIHttpChannel * aHttpChan,
                                    int64_t& aRangeStart,
                                    int64_t& aRangeEnd,
                                    int64_t& aRangeTotal);
 
+  struct Closure
+  {
+    uint32_t mLoadID;
+    ChannelMediaResource* mResource;
+  };
+
   static nsresult CopySegmentToCache(nsIInputStream* aInStream,
-                                     void* aResource,
+                                     void* aClosure,
                                      const char* aFromSegment,
                                      uint32_t aToOffset,
                                      uint32_t aCount,
                                      uint32_t* aWriteCount);
 
   // Main thread access only
   RefPtr<Listener> mListener;
+  // A mono-increasing integer to uniquely identify the channel we are loading.
+  uint32_t mLoadID = 0;
   // When this flag is set, if we get a network error we should silently
   // reopen the stream.
   bool               mReopenOnError;
 
   // Any thread access
   MediaCacheStream mCacheStream;
 
   MediaChannelStatistics mChannelStatistics;
--- a/dom/media/MediaCache.cpp
+++ b/dom/media/MediaCache.cpp
@@ -1384,16 +1384,20 @@ MediaCache::Update()
         NS_ASSERTION(stream->mIsTransportSeekable || desiredOffset == 0,
                      "Trying to seek in a non-seekable stream!");
         // Round seek offset down to the start of the block. This is essential
         // because we don't want to think we have part of a block already
         // in mPartialBlockBuffer.
         stream->mChannelOffset =
           OffsetToBlockIndexUnchecked(desiredOffset) * BLOCK_SIZE;
         actions[i] = stream->mCacheSuspended ? SEEK_AND_RESUME : SEEK;
+        // mChannelOffset is updated to a new position. We don't want data from
+        // the old channel to be written to the wrong position. 0 is a sentinel
+        // value which will not match any ID passed to NotifyDataReceived().
+        stream->mLoadID = 0;
       } else if (enableReading && stream->mCacheSuspended) {
         actions[i] = RESUME;
       } else if (!enableReading && !stream->mCacheSuspended) {
         actions[i] = SUSPEND;
       }
     }
 #ifdef DEBUG
     mInUpdate = false;
@@ -1851,67 +1855,88 @@ MediaCache::NoteSeek(MediaCacheStream* a
     }
   }
 }
 
 void
 MediaCacheStream::NotifyDataLength(int64_t aLength)
 {
   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
+  LOG("Stream %p DataLength: %" PRId64, this, aLength);
 
   ReentrantMonitorAutoEnter mon(mMediaCache->GetReentrantMonitor());
   mStreamLength = aLength;
 }
 
 void
-MediaCacheStream::NotifyDataStarted(int64_t aOffset)
+MediaCacheStream::NotifyDataStarted(uint32_t aLoadID, int64_t aOffset)
 {
   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
+  MOZ_ASSERT(aLoadID > 0);
+  LOG("Stream %p DataStarted: %" PRId64 " aLoadID=%u", this, aOffset, aLoadID);
 
   ReentrantMonitorAutoEnter mon(mMediaCache->GetReentrantMonitor());
   NS_WARNING_ASSERTION(aOffset == mChannelOffset,
                        "Server is giving us unexpected offset");
   MOZ_ASSERT(aOffset >= 0);
   mChannelOffset = aOffset;
   if (mStreamLength >= 0) {
     // If we started reading at a certain offset, then for sure
     // the stream is at least that long.
     mStreamLength = std::max(mStreamLength, mChannelOffset);
   }
+  mLoadID = aLoadID;
 }
 
 void
 MediaCacheStream::UpdatePrincipal(nsIPrincipal* aPrincipal)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MediaCache::ResourceStreamIterator iter(mMediaCache, mResourceID);
   while (MediaCacheStream* stream = iter.Next()) {
     if (nsContentUtils::CombineResourcePrincipals(&stream->mPrincipal,
                                                   aPrincipal)) {
       stream->mClient->CacheClientNotifyPrincipalChanged();
     }
   }
 }
 
 void
-MediaCacheStream::NotifyDataReceived(int64_t aSize, const char* aData)
+MediaCacheStream::NotifyDataReceived(uint32_t aLoadID,
+                                     int64_t aSize,
+                                     const char* aData)
 {
+  MOZ_ASSERT(aLoadID > 0);
   // This might happen off the main thread.
 
   // It is safe to read mClosed without holding the monitor because this
   // function is guaranteed to happen before Close().
   MOZ_DIAGNOSTIC_ASSERT(!mClosed);
 
   ReentrantMonitorAutoEnter mon(mMediaCache->GetReentrantMonitor());
+  LOG("Stream %p DataReceived at %" PRId64 " count=%" PRId64 " aLoadID=%u",
+      this,
+      mChannelOffset,
+      aSize,
+      aLoadID);
+
+  // TODO: For now NotifyDataReceived() always runs on the main thread. This
+  // assertion is to make sure our load ID algorithm doesn't go wrong. Remove it
+  // when OMT data delievery is enabled.
+  MOZ_DIAGNOSTIC_ASSERT(mLoadID == aLoadID);
+
+  if (mLoadID != aLoadID) {
+    // mChannelOffset is updated to a new position when loading a new channel.
+    // We should discard the data coming from the old channel so it won't be
+    // stored to the wrong positoin.
+    return;
+  }
   int64_t size = aSize;
   const char* data = aData;
 
-  LOG("Stream %p DataReceived at %" PRId64 " count=%" PRId64,
-      this, mChannelOffset, aSize);
-
   // We process the data one block (or part of a block) at a time
   while (size > 0) {
     uint32_t blockIndex = OffsetToBlockIndexUnchecked(mChannelOffset);
     int32_t blockOffset = int32_t(mChannelOffset - blockIndex*BLOCK_SIZE);
     int32_t chunkSize = std::min<int64_t>(BLOCK_SIZE - blockOffset, size);
 
     if (blockOffset == 0) {
       // We've just started filling this buffer so now is a good time
--- a/dom/media/MediaCache.h
+++ b/dom/media/MediaCache.h
@@ -251,23 +251,23 @@ public:
   void NotifyDataLength(int64_t aLength);
   // Notifies the cache that a load has begun. We pass the offset
   // because in some cases the offset might not be what the cache
   // requested. In particular we might unexpectedly start providing
   // data at offset 0. This need not be called if the offset is the
   // offset that the cache requested in
   // ChannelMediaResource::CacheClientSeek. This can be called at any
   // time by the client, not just after a CacheClientSeek.
-  void NotifyDataStarted(int64_t aOffset);
+  void NotifyDataStarted(uint32_t aLoadID, int64_t aOffset);
   // Notifies the cache that data has been received. The stream already
   // knows the offset because data is received in sequence and
   // the starting offset is known via NotifyDataStarted or because
   // the cache requested the offset in
   // ChannelMediaResource::CacheClientSeek, or because it defaulted to 0.
-  void NotifyDataReceived(int64_t aSize, const char* aData);
+  void NotifyDataReceived(uint32_t aLoadID, int64_t aSize, const char* aData);
   // Notifies the cache that the current bytes should be written to disk.
   // Called on the main thread.
   void FlushPartialBlock();
   // Notifies the cache that the channel has closed with the given status.
   void NotifyDataEnded(nsresult aStatus);
 
   // Notifies the stream that the channel is reopened. The stream should
   // reset variables such as |mDidNotifyDataEnded|.
@@ -489,16 +489,19 @@ private:
   uint32_t          mPinCount;
   // The status used when we did CacheClientNotifyDataEnded. Only valid
   // when mDidNotifyDataEnded is true.
   nsresult          mNotifyDataEndedStatus;
   // The last reported read mode
   ReadMode          mCurrentMode;
   // True if some data in mPartialBlockBuffer has been read as metadata
   bool              mMetadataInPartialBlockBuffer;
+  // The load ID of the current channel. Used to check whether the data is
+  // coming from an old channel and should be discarded.
+  uint32_t mLoadID = 0;
 
   // The following field is protected by the cache's monitor but are
   // only written on the main thread.
 
   // Data received for the block containing mChannelOffset. Data needs
   // to wait here so we can write back a complete block. The first
   // mChannelOffset%BLOCK_SIZE bytes have been filled in with good data,
   // the rest are garbage.