Bug 1399760. P3 - keep ID of the loading channel so we check whether the data callback is from an old channel. draft
authorJW Wang <jwwang@mozilla.com>
Thu, 14 Sep 2017 12:11:36 +0800
changeset 666825 a93bebe4c84d646e19eeca05200db1dffab62018
parent 666824 ee1b60d2f0f859fd4de578b2aa699cb016a8322e
child 666828 cc8cca41cbc6af32e418b7b9428d5da6eb40bf19
push id80502
push userjwwang@mozilla.com
push dateTue, 19 Sep 2017 07:13:04 +0000
bugs1399760
milestone57.0a1
Bug 1399760. P3 - keep ID of the loading channel so we check whether the data callback is from an old channel. The load ID works as follows: 1. A load ID is passed to MediaCacheStream::NotifyDataStarted() when loading a new channel. 2. Each MediaCacheStream::NotifyDataReceived() call is also associated with a load ID from which the data is received. 3. If |mLoadID != aLoadID| tests to be true in NotifyDataReceived(), it means the data is from an old channel and should be discarded. 4. MediaCache::Update() reset mLoadID for the stream before calling CacheClientSeek() to prevent data from the old channel from being stored to the wrong position. MozReview-Commit-ID: 9kBoublLlln
dom/media/MediaCache.cpp
dom/media/MediaCache.h
dom/media/MediaResource.cpp
dom/media/MediaResource.h
--- a/dom/media/MediaCache.cpp
+++ b/dom/media/MediaCache.cpp
@@ -1382,16 +1382,20 @@ MediaCache::Update()
         NS_ASSERTION(stream->mIsTransportSeekable || desiredOffset == 0,
                      "Trying to seek in a non-seekable stream!");
         // Round seek offset down to the start of the block. This is essential
         // because we don't want to think we have part of a block already
         // in mPartialBlockBuffer.
         stream->mChannelOffset =
           OffsetToBlockIndexUnchecked(desiredOffset) * BLOCK_SIZE;
         actions[i] = stream->mCacheSuspended ? SEEK_AND_RESUME : SEEK;
+        // mChannelOffset is updated to a new position. We don't want data from
+        // the old channel to be written to the wrong position. 0 is a sentinel
+        // value which will not match any ID passed to NotifyDataReceived().
+        stream->mLoadID = 0;
       } else if (enableReading && stream->mCacheSuspended) {
         actions[i] = RESUME;
       } else if (!enableReading && !stream->mCacheSuspended) {
         actions[i] = SUSPEND;
       }
     }
 #ifdef DEBUG
     mInUpdate = false;
@@ -1849,67 +1853,88 @@ MediaCache::NoteSeek(MediaCacheStream* a
     }
   }
 }
 
 void
 MediaCacheStream::NotifyDataLength(int64_t aLength)
 {
   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
+  LOG("Stream %p DataLength: %" PRId64, this, aLength);
 
   ReentrantMonitorAutoEnter mon(mMediaCache->GetReentrantMonitor());
   mStreamLength = aLength;
 }
 
 void
-MediaCacheStream::NotifyDataStarted(int64_t aOffset)
+MediaCacheStream::NotifyDataStarted(uint32_t aLoadID, int64_t aOffset)
 {
   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
+  MOZ_ASSERT(aLoadID > 0);
+  LOG("Stream %p DataStarted: %" PRId64 " aLoadID=%u", this, aOffset, aLoadID);
 
   ReentrantMonitorAutoEnter mon(mMediaCache->GetReentrantMonitor());
   NS_WARNING_ASSERTION(aOffset == mChannelOffset,
                        "Server is giving us unexpected offset");
   MOZ_ASSERT(aOffset >= 0);
   mChannelOffset = aOffset;
   if (mStreamLength >= 0) {
     // If we started reading at a certain offset, then for sure
     // the stream is at least that long.
     mStreamLength = std::max(mStreamLength, mChannelOffset);
   }
+  mLoadID = aLoadID;
 }
 
 void
 MediaCacheStream::UpdatePrincipal(nsIPrincipal* aPrincipal)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MediaCache::ResourceStreamIterator iter(mMediaCache, mResourceID);
   while (MediaCacheStream* stream = iter.Next()) {
     if (nsContentUtils::CombineResourcePrincipals(&stream->mPrincipal,
                                                   aPrincipal)) {
       stream->mClient->CacheClientNotifyPrincipalChanged();
     }
   }
 }
 
 void
-MediaCacheStream::NotifyDataReceived(int64_t aSize, const char* aData)
+MediaCacheStream::NotifyDataReceived(uint32_t aLoadID,
+                                     int64_t aSize,
+                                     const char* aData)
 {
+  MOZ_ASSERT(aLoadID > 0);
   // This might happen off the main thread.
 
   // It is safe to read mClosed without holding the monitor because this
   // function is guaranteed to happen before Close().
   MOZ_DIAGNOSTIC_ASSERT(!mClosed);
 
   ReentrantMonitorAutoEnter mon(mMediaCache->GetReentrantMonitor());
+  LOG("Stream %p DataReceived at %" PRId64 " count=%" PRId64 " aLoadID=%u",
+      this,
+      mChannelOffset,
+      aSize,
+      aLoadID);
+
+  // TODO: For now NotifyDataReceived() always runs on the main thread. This
+  // assertion is to make sure our load ID algorithm doesn't go wrong. Remove it
+  // when OMT data delievery is enabled.
+  MOZ_DIAGNOSTIC_ASSERT(mLoadID == aLoadID);
+
+  if (mLoadID != aLoadID) {
+    // mChannelOffset is updated to a new position when loading a new channel.
+    // We should discard the data coming from the old channel so it won't be
+    // stored to the wrong positoin.
+    return;
+  }
   int64_t size = aSize;
   const char* data = aData;
 
-  LOG("Stream %p DataReceived at %" PRId64 " count=%" PRId64,
-      this, mChannelOffset, aSize);
-
   // We process the data one block (or part of a block) at a time
   while (size > 0) {
     uint32_t blockIndex = OffsetToBlockIndexUnchecked(mChannelOffset);
     int32_t blockOffset = int32_t(mChannelOffset - blockIndex*BLOCK_SIZE);
     int32_t chunkSize = std::min<int64_t>(BLOCK_SIZE - blockOffset, size);
 
     if (blockOffset == 0) {
       // We've just started filling this buffer so now is a good time
--- a/dom/media/MediaCache.h
+++ b/dom/media/MediaCache.h
@@ -251,23 +251,23 @@ public:
   void NotifyDataLength(int64_t aLength);
   // Notifies the cache that a load has begun. We pass the offset
   // because in some cases the offset might not be what the cache
   // requested. In particular we might unexpectedly start providing
   // data at offset 0. This need not be called if the offset is the
   // offset that the cache requested in
   // ChannelMediaResource::CacheClientSeek. This can be called at any
   // time by the client, not just after a CacheClientSeek.
-  void NotifyDataStarted(int64_t aOffset);
+  void NotifyDataStarted(uint32_t aLoadID, int64_t aOffset);
   // Notifies the cache that data has been received. The stream already
   // knows the offset because data is received in sequence and
   // the starting offset is known via NotifyDataStarted or because
   // the cache requested the offset in
   // ChannelMediaResource::CacheClientSeek, or because it defaulted to 0.
-  void NotifyDataReceived(int64_t aSize, const char* aData);
+  void NotifyDataReceived(uint32_t aLoadID, int64_t aSize, const char* aData);
   // Notifies the cache that the current bytes should be written to disk.
   // Called on the main thread.
   void FlushPartialBlock();
   // Notifies the cache that the channel has closed with the given status.
   void NotifyDataEnded(nsresult aStatus);
 
   // Notifies the stream that the channel is reopened. The stream should
   // reset variables such as |mDidNotifyDataEnded|.
@@ -489,16 +489,19 @@ private:
   uint32_t          mPinCount;
   // The status used when we did CacheClientNotifyDataEnded. Only valid
   // when mDidNotifyDataEnded is true.
   nsresult          mNotifyDataEndedStatus;
   // The last reported read mode
   ReadMode          mCurrentMode;
   // True if some data in mPartialBlockBuffer has been read as metadata
   bool              mMetadataInPartialBlockBuffer;
+  // The load ID of the current channel. Used to check whether the data is
+  // coming from an old channel and should be discarded.
+  uint32_t mLoadID = 0;
 
   // The following field is protected by the cache's monitor but are
   // only written on the main thread.
 
   // Data received for the block containing mChannelOffset. Data needs
   // to wait here so we can write back a complete block. The first
   // mChannelOffset%BLOCK_SIZE bytes have been filled in with good data,
   // the rest are garbage.
--- a/dom/media/MediaResource.cpp
+++ b/dom/media/MediaResource.cpp
@@ -141,17 +141,17 @@ nsresult
 ChannelMediaResource::Listener::OnDataAvailable(nsIRequest* aRequest,
                                                 nsISupports* aContext,
                                                 nsIInputStream* aStream,
                                                 uint64_t aOffset,
                                                 uint32_t aCount)
 {
   // This might happen off the main thread.
   MOZ_DIAGNOSTIC_ASSERT(mResource);
-  return mResource->OnDataAvailable(aRequest, aStream, aCount);
+  return mResource->OnDataAvailable(mLoadID, aStream, aCount);
 }
 
 nsresult
 ChannelMediaResource::Listener::AsyncOnChannelRedirect(
   nsIChannel* aOld,
   nsIChannel* aNew,
   uint32_t aFlags,
   nsIAsyncVerifyRedirectCallback* cb)
@@ -313,17 +313,17 @@ ChannelMediaResource::OnStartRequest(nsI
     // and the server isn't sending Accept-Ranges:bytes then we don't
     // support seeking. We also can't seek in compressed streams.
     seekable = !isCompressed && acceptsRanges;
   } else {
     // Not an HTTP channel. Assume data will be sent from position zero.
     startOffset = 0;
   }
 
-  mCacheStream.NotifyDataStarted(startOffset);
+  mCacheStream.NotifyDataStarted(mLoadID, startOffset);
   mCacheStream.SetTransportSeekable(seekable);
   mChannelStatistics.Start();
   mReopenOnError = false;
 
   mSuspendAgent.UpdateSuspendedStatusIfNeeded();
 
   // Fires an initial progress event.
   owner->DownloadProgressed();
@@ -440,52 +440,55 @@ ChannelMediaResource::OnChannelRedirect(
 {
   mChannel = aNew;
   mSuspendAgent.NotifyChannelOpened(mChannel);
   return SetupChannelHeaders(aOffset);
 }
 
 nsresult
 ChannelMediaResource::CopySegmentToCache(nsIInputStream* aInStream,
-                                         void* aResource,
+                                         Closure* aClosure,
                                          const char* aFromSegment,
                                          uint32_t aToOffset,
                                          uint32_t aCount,
                                          uint32_t* aWriteCount)
 {
-  ChannelMediaResource* res = static_cast<ChannelMediaResource*>(aResource);
-  res->mCacheStream.NotifyDataReceived(aCount, aFromSegment);
+  aClosure->mResource->mCacheStream.NotifyDataReceived(
+    aClosure->mLoadID, aCount, aFromSegment);
   *aWriteCount = aCount;
   return NS_OK;
 }
 
 nsresult
-ChannelMediaResource::OnDataAvailable(nsIRequest* aRequest,
+ChannelMediaResource::OnDataAvailable(uint32_t aLoadID,
                                       nsIInputStream* aStream,
                                       uint32_t aCount)
 {
   // This might happen off the main thread.
-  NS_ASSERTION(mChannel.get() == aRequest, "Wrong channel!");
+  // Don't assert |mChannel.get() == aRequest| since reading mChannel here off
+  // the main thread is a data race.
 
   // Update principals before putting the data in the cache. This is important,
   // we want to make sure all principals are updated before any consumer can see
   // the new data.
   // TODO: Handle the case where OnDataAvailable() runs off the main thread.
   UpdatePrincipal();
 
   RefPtr<ChannelMediaResource> self = this;
   nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
     "ChannelMediaResource::OnDataAvailable",
     [self, aCount]() { self->mChannelStatistics.AddBytes(aCount); });
   mCallback->AbstractMainThread()->Dispatch(r.forget());
 
+  Closure closure{ aLoadID, this };
   uint32_t count = aCount;
   while (count > 0) {
     uint32_t read;
-    nsresult rv = aStream->ReadSegments(CopySegmentToCache, this, count, &read);
+    nsresult rv = aStream->ReadSegments(
+      (nsWriteSegmentFun)CopySegmentToCache, &closure, count, &read);
     if (NS_FAILED(rv))
       return rv;
     NS_ASSERTION(read > 0, "Read 0 bytes while data was available?");
     count -= read;
   }
 
   return NS_OK;
 }
@@ -506,30 +509,30 @@ ChannelMediaResource::Open(nsIStreamList
   }
 
   nsresult rv = mCacheStream.Init(cl);
   if (NS_FAILED(rv)) {
     return rv;
   }
 
   MOZ_ASSERT(GetOffset() == 0, "Who set offset already?");
-  mListener = new Listener(this, 0);
+  mListener = new Listener(this, 0, ++mLoadID);
   *aStreamListener = mListener;
   NS_ADDREF(*aStreamListener);
   return NS_OK;
 }
 
 nsresult
 ChannelMediaResource::OpenChannel(int64_t aOffset)
 {
   MOZ_ASSERT(NS_IsMainThread());
   MOZ_ASSERT(mChannel);
   MOZ_ASSERT(!mListener, "Listener should have been removed by now");
 
-  mListener = new Listener(this, aOffset);
+  mListener = new Listener(this, aOffset, ++mLoadID);
   nsresult rv = mChannel->SetNotificationCallbacks(mListener.get());
   NS_ENSURE_SUCCESS(rv, rv);
 
   rv = SetupChannelHeaders(aOffset);
   NS_ENSURE_SUCCESS(rv, rv);
 
   rv = mChannel->AsyncOpen2(mListener);
   NS_ENSURE_SUCCESS(rv, rv);
--- a/dom/media/MediaResource.h
+++ b/dom/media/MediaResource.h
@@ -503,44 +503,46 @@ public:
   class Listener final
     : public nsIStreamListener
     , public nsIInterfaceRequestor
     , public nsIChannelEventSink
     , public nsIThreadRetargetableStreamListener
   {
     ~Listener() {}
   public:
-    Listener(ChannelMediaResource* aResource, int64_t aOffset)
+    Listener(ChannelMediaResource* aResource, int64_t aOffset, uint32_t aLoadID)
       : mResource(aResource)
       , mOffset(aOffset)
+      , mLoadID(aLoadID)
     {}
 
     NS_DECL_ISUPPORTS
     NS_DECL_NSIREQUESTOBSERVER
     NS_DECL_NSISTREAMLISTENER
     NS_DECL_NSICHANNELEVENTSINK
     NS_DECL_NSIINTERFACEREQUESTOR
     NS_DECL_NSITHREADRETARGETABLESTREAMLISTENER
 
     void Revoke() { mResource = nullptr; }
 
   private:
     RefPtr<ChannelMediaResource> mResource;
     const int64_t mOffset;
+    const uint32_t mLoadID;
   };
   friend class Listener;
 
   nsresult GetCachedRanges(MediaByteRangeSet& aRanges) override;
 
 protected:
   bool IsSuspendedByCache();
   // These are called on the main thread by Listener.
   nsresult OnStartRequest(nsIRequest* aRequest, int64_t aRequestOffset);
   nsresult OnStopRequest(nsIRequest* aRequest, nsresult aStatus);
-  nsresult OnDataAvailable(nsIRequest* aRequest,
+  nsresult OnDataAvailable(uint32_t aLoadID,
                            nsIInputStream* aStream,
                            uint32_t aCount);
   nsresult OnChannelRedirect(nsIChannel* aOld,
                              nsIChannel* aNew,
                              uint32_t aFlags,
                              int64_t aOffset);
 
   // Opens the channel, using an HTTP byte range request to start at aOffset
@@ -559,25 +561,33 @@ protected:
   // Parses 'Content-Range' header and returns results via parameters.
   // Returns error if header is not available, values are not parse-able or
   // values are out of range.
   nsresult ParseContentRangeHeader(nsIHttpChannel * aHttpChan,
                                    int64_t& aRangeStart,
                                    int64_t& aRangeEnd,
                                    int64_t& aRangeTotal);
 
+  struct Closure
+  {
+    uint32_t mLoadID;
+    ChannelMediaResource* mResource;
+  };
+
   static nsresult CopySegmentToCache(nsIInputStream* aInStream,
-                                     void* aResource,
+                                     Closure* aClosure,
                                      const char* aFromSegment,
                                      uint32_t aToOffset,
                                      uint32_t aCount,
                                      uint32_t* aWriteCount);
 
   // Main thread access only
   RefPtr<Listener> mListener;
+  // A mono-increasing integer to uniquely identify the channel we are loading.
+  uint32_t mLoadID = 0;
   // When this flag is set, if we get a network error we should silently
   // reopen the stream.
   bool               mReopenOnError;
 
   // Any thread access
   MediaCacheStream mCacheStream;
 
   MediaChannelStatistics mChannelStatistics;