Bug 513144. Allow streams that are related by mozLoadFrom to share data loaded after the initial clone. r=doublec
authorRobert O'Callahan <robert@ocallahan.org>
Tue, 15 Sep 2009 14:30:45 +1200
changeset 32905 643b222c2d83749c495bee8c2f5343fd11dc9586
parent 32904 aea350f323885028993916ee73b7ae7e2e3e6d0c
child 32906 0fd888645cf353aa82bc92cde59098c8f2a419e3
push idunknown
push userunknown
push dateunknown
reviewersdoublec
bugs513144
milestone1.9.3a1pre
Bug 513144. Allow streams that are related by mozLoadFrom to share data loaded after the initial clone. r=doublec
content/media/nsMediaCache.cpp
content/media/nsMediaCache.h
content/media/nsMediaStream.cpp
content/media/nsMediaStream.h
content/media/ogg/nsOggDecoder.cpp
content/media/wave/nsWaveDecoder.cpp
--- a/content/media/nsMediaCache.cpp
+++ b/content/media/nsMediaCache.cpp
@@ -79,25 +79,31 @@ static const PRUint32 FREE_BLOCK_SCAN_LI
 using mozilla::TimeStamp;
 using mozilla::TimeDuration;
 
 #ifdef DEBUG
 // Turn this on to do very expensive cache state validation
 // #define DEBUG_VERIFY_CACHE
 #endif
 
+// There is at most one media cache (although that could quite easily be
+// relaxed if we wanted to manage multiple caches with independent
+// size limits).
+static nsMediaCache* gMediaCache;
+
 class nsMediaCache {
 public:
   friend class nsMediaCacheStream::BlockList;
   typedef nsMediaCacheStream::BlockList BlockList;
   enum {
     BLOCK_SIZE = nsMediaCacheStream::BLOCK_SIZE
   };
 
-  nsMediaCache() : mMonitor(nsAutoMonitor::NewMonitor("media.cache")),
+  nsMediaCache() : mNextResourceID(1),
+    mMonitor(nsAutoMonitor::NewMonitor("media.cache")),
     mFD(nsnull), mFDCurrentPos(0), mUpdateQueued(PR_FALSE)
 #ifdef DEBUG
     , mInUpdate(PR_FALSE)
 #endif
   {
     MOZ_COUNT_CTOR(nsMediaCache);
   }
   ~nsMediaCache() {
@@ -179,16 +185,35 @@ public:
   // Verify invariants, especially block list invariants
   void Verify();
 #else
   void Verify() {}
 #endif
 
   PRMonitor* Monitor() { return mMonitor; }
 
+  class ResourceStreamIterator {
+  public:
+    ResourceStreamIterator(PRInt64 aResourceID) :
+      mResourceID(aResourceID), mNext(0) {}
+    nsMediaCacheStream* Next()
+    {
+      while (mNext < gMediaCache->mStreams.Length()) {
+        nsMediaCacheStream* stream = gMediaCache->mStreams[mNext];
+        ++mNext;
+        if (stream->GetResourceID() == mResourceID)
+          return stream;
+      }
+      return nsnull;
+    }
+  private:
+    PRInt64  mResourceID;
+    PRUint32 mNext;
+  };
+
 protected:
   // Find a free or reusable block and return its index. If there are no
   // free blocks and no reusable blocks, add a new block to the cache
   // and return it. Can return -1 on OOM.
   PRInt32 FindBlockForIncomingData(TimeStamp aNow, nsMediaCacheStream* aStream);
   // Find a reusable block --- a free block, if there is one, otherwise
   // the reusable block with the latest predicted-next-use, or -1 if
   // there aren't any freeable blocks. Only block indices less than
@@ -268,16 +293,19 @@ protected:
   TimeDuration PredictNextUse(TimeStamp aNow, PRInt32 aBlock);
   // Guess the duration until the next incoming data on aStream will be used
   TimeDuration PredictNextUseForIncomingData(nsMediaCacheStream* aStream);
 
   // Truncate the file and index array if there are free blocks at the
   // end
   void Truncate();
 
+  // This member is main-thread only. It's used to allocate unique
+  // resource IDs to streams.
+  PRInt64                       mNextResourceID;
   // This member is main-thread only. It contains all the streams.
   nsTArray<nsMediaCacheStream*> mStreams;
 
   // The monitor protects all the data members here. Also, off-main-thread
   // readers that need to block will Wait() on this monitor. When new
   // data becomes available in the cache, we NotifyAll() on this monitor.
   PRMonitor*      mMonitor;
   // The Blocks describing the cache entries.
@@ -291,21 +319,16 @@ protected:
   BlockList       mFreeBlocks;
   // True if an event to run Update() has been queued but not processed
   PRPackedBool    mUpdateQueued;
 #ifdef DEBUG
   PRPackedBool    mInUpdate;
 #endif
 };
 
-// There is at most one media cache (although that could quite easily be
-// relaxed if we wanted to manage multiple caches with independent
-// size limits).
-static nsMediaCache* gMediaCache;
-
 void nsMediaCacheStream::BlockList::AddFirstBlock(PRInt32 aBlock)
 {
   NS_ASSERTION(!mEntries.GetEntry(aBlock), "Block already in list");
   Entry* entry = mEntries.PutEntry(aBlock);
 
   if (mFirstBlock < 0) {
     entry->mNextBlock = entry->mPrevBlock = aBlock;
   } else {
@@ -994,17 +1017,17 @@ nsMediaCache::Update()
   // Now try to move overflowing blocks to the main part of the cache.
   for (PRInt32 blockIndex = mIndex.Length() - 1; blockIndex >= maxBlocks;
        --blockIndex) {
     if (IsBlockFree(blockIndex))
       continue;
 
     Block* block = &mIndex[blockIndex];
     // Try to relocate the block close to other blocks for the first stream.
-    // There is no point in trying ot make it close to other blocks in
+    // There is no point in trying to make it close to other blocks in
     // *all* the streams it might belong to.
     PRInt32 destinationBlockIndex =
       FindReusableBlock(now, block->mOwners[0].mStream,
                         block->mOwners[0].mStreamBlock, maxBlocks);
     if (destinationBlockIndex < 0) {
       // Nowhere to place this overflow block. We won't be able to
       // place any more overflow blocks.
       break;
@@ -1145,16 +1168,30 @@ nsMediaCache::Update()
       // Read ahead if the data we expect to read is more valuable than
       // the least valuable block in the main part of the cache
       TimeDuration predictedNewDataUse = PredictNextUseForIncomingData(stream);
       LOG(PR_LOG_DEBUG, ("Stream %p predict next data in %f, current worst block is %f",
           stream, predictedNewDataUse.ToSeconds(), latestNextUse.ToSeconds()));
       enableReading = predictedNewDataUse < latestNextUse;
     }
 
+    if (enableReading) {
+      for (PRUint32 j = 0; j < i; ++j) {
+        nsMediaCacheStream* other = mStreams[j];
+        if (other->mResourceID == stream->mResourceID &&
+            !other->mCacheSuspended &&
+            other->mChannelOffset/BLOCK_SIZE == stream->mChannelOffset/BLOCK_SIZE) {
+          // This block is already going to be read by the other stream.
+          // So don't try to read it from this stream as well.
+          enableReading = PR_FALSE;
+          break;
+        }
+      }
+    }
+
     nsresult rv = NS_OK;
     if (stream->mChannelOffset != desiredOffset && enableReading) {
       // We need to seek now.
       NS_ASSERTION(stream->mIsSeekable || desiredOffset == 0,
                    "Trying to seek in a non-seekable stream!");
       // Round seek offset down to the start of the block
       stream->mChannelOffset = (desiredOffset/BLOCK_SIZE)*BLOCK_SIZE;
       LOG(PR_LOG_DEBUG, ("Stream %p CacheSeek to %lld (resume=%d)", stream,
@@ -1276,59 +1313,71 @@ nsMediaCache::InsertReadaheadBlock(Block
 
 void
 nsMediaCache::AllocateAndWriteBlock(nsMediaCacheStream* aStream, const void* aData,
                                     nsMediaCacheStream::ReadMode aMode)
 {
   PR_ASSERT_CURRENT_THREAD_IN_MONITOR(mMonitor);
 
   PRInt32 streamBlockIndex = aStream->mChannelOffset/BLOCK_SIZE;
-  // Extend the mBlocks array as necessary
-  while (streamBlockIndex >= PRInt32(aStream->mBlocks.Length())) {
-    aStream->mBlocks.AppendElement(-1);
+
+  // Remove all cached copies of this block
+  ResourceStreamIterator iter(aStream->mResourceID);
+  while (nsMediaCacheStream* stream = iter.Next()) {
+    while (streamBlockIndex >= PRInt32(stream->mBlocks.Length())) {
+      stream->mBlocks.AppendElement(-1);
+    }
+    if (stream->mBlocks[streamBlockIndex] >= 0) {
+      // We no longer want to own this block
+      PRInt32 globalBlockIndex = stream->mBlocks[streamBlockIndex];
+      LOG(PR_LOG_DEBUG, ("Released block %d from stream %p block %d(%lld)",
+          globalBlockIndex, stream, streamBlockIndex, (long long)streamBlockIndex*BLOCK_SIZE));
+      RemoveBlockOwner(globalBlockIndex, stream);
+    }
   }
-  if (aStream->mBlocks[streamBlockIndex] >= 0) {
-    // We no longer want to own this block
-    PRInt32 globalBlockIndex = aStream->mBlocks[streamBlockIndex];
-    LOG(PR_LOG_DEBUG, ("Released block %d from stream %p block %d(%lld)",
-        globalBlockIndex, aStream, streamBlockIndex, (long long)streamBlockIndex*BLOCK_SIZE));
-    RemoveBlockOwner(globalBlockIndex, aStream);
-  }
+
+  // Extend the mBlocks array as necessary
 
   TimeStamp now = TimeStamp::Now();
   PRInt32 blockIndex = FindBlockForIncomingData(now, aStream);
   if (blockIndex >= 0) {
     FreeBlock(blockIndex);
 
     Block* block = &mIndex[blockIndex];    
     LOG(PR_LOG_DEBUG, ("Allocated block %d to stream %p block %d(%lld)",
         blockIndex, aStream, streamBlockIndex, (long long)streamBlockIndex*BLOCK_SIZE));
-    BlockOwner* bo = block->mOwners.AppendElement();
-    if (!bo)
-      return;
 
-    bo->mStream = aStream;
-    bo->mStreamBlock = streamBlockIndex;
-    bo->mLastUseTime = now;
-    aStream->mBlocks[streamBlockIndex] = blockIndex;
     mFreeBlocks.RemoveBlock(blockIndex);
-    if (streamBlockIndex*BLOCK_SIZE < aStream->mStreamOffset) {
-      bo->mClass = aMode == nsMediaCacheStream::MODE_PLAYBACK
-        ? PLAYED_BLOCK : METADATA_BLOCK;
-      // This must be the most-recently-used block, since we
-      // marked it as used now (which may be slightly bogus, but we'll
-      // treat it as used for simplicity).
-      GetListForBlock(bo)->AddFirstBlock(blockIndex);
-      Verify();
-    } else {
-      // This may not be the latest readahead block, although it usually
-      // will be. We may have to scan for the right place to insert
-      // the block in the list.
-      bo->mClass = READAHEAD_BLOCK;
-      InsertReadaheadBlock(bo, blockIndex);
+
+    // Tell each stream using this resource about the new block.
+    ResourceStreamIterator iter(aStream->mResourceID);
+    while (nsMediaCacheStream* stream = iter.Next()) {
+      BlockOwner* bo = block->mOwners.AppendElement();
+      if (!bo)
+        return;
+
+      bo->mStream = stream;
+      bo->mStreamBlock = streamBlockIndex;
+      bo->mLastUseTime = now;
+      stream->mBlocks[streamBlockIndex] = blockIndex;
+      if (streamBlockIndex*BLOCK_SIZE < stream->mStreamOffset) {
+        bo->mClass = aMode == nsMediaCacheStream::MODE_PLAYBACK
+          ? PLAYED_BLOCK : METADATA_BLOCK;
+        // This must be the most-recently-used block, since we
+        // marked it as used now (which may be slightly bogus, but we'll
+        // treat it as used for simplicity).
+        GetListForBlock(bo)->AddFirstBlock(blockIndex);
+        Verify();
+      } else {
+        // This may not be the latest readahead block, although it usually
+        // will be. We may have to scan for the right place to insert
+        // the block in the list.
+        bo->mClass = READAHEAD_BLOCK;
+        InsertReadaheadBlock(bo, blockIndex);
+      }
     }
 
     nsresult rv = WriteCacheFile(blockIndex*BLOCK_SIZE, aData, BLOCK_SIZE);
     if (NS_FAILED(rv)) {
       LOG(PR_LOG_DEBUG, ("Released block %d from stream %p block %d(%lld)",
           blockIndex, aStream, streamBlockIndex, (long long)streamBlockIndex*BLOCK_SIZE));
       FreeBlock(blockIndex);
     }
@@ -1341,16 +1390,17 @@ nsMediaCache::AllocateAndWriteBlock(nsMe
 
 void
 nsMediaCache::OpenStream(nsMediaCacheStream* aStream)
 {
   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
 
   nsAutoMonitor mon(mMonitor);
   mStreams.AppendElement(aStream);
+  aStream->mResourceID = mNextResourceID++;
 }
 
 void
 nsMediaCache::ReleaseStream(nsMediaCacheStream* aStream)
 {
   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
 
   nsAutoMonitor mon(mMonitor);
@@ -1543,18 +1593,16 @@ nsMediaCacheStream::UpdatePrincipal(nsIP
 }
 
 void
 nsMediaCacheStream::NotifyDataReceived(PRInt64 aSize, const char* aData,
     nsIPrincipal* aPrincipal)
 {
   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
 
-  UpdatePrincipal(aPrincipal);
-
   nsAutoMonitor mon(gMediaCache->Monitor());
   PRInt64 size = aSize;
   const char* data = aData;
 
   LOG(PR_LOG_DEBUG, ("Stream %p DataReceived at %lld count=%lld",
       this, (long long)mChannelOffset, (long long)aSize));
 
   // We process the data one block (or part of a block) at a time
@@ -1589,51 +1637,62 @@ nsMediaCacheStream::NotifyDataReceived(P
       }
     }
 
     if (blockDataToStore) {
       gMediaCache->AllocateAndWriteBlock(this, blockDataToStore, mode);
     }
 
     mChannelOffset += chunkSize;
-    if (mStreamLength >= 0) {
-      // The stream is at least as long as what we've read
-      mStreamLength = PR_MAX(mStreamLength, mChannelOffset);
-    }
     size -= chunkSize;
     data += chunkSize;
   }
 
+  nsMediaCache::ResourceStreamIterator iter(mResourceID);
+  while (nsMediaCacheStream* stream = iter.Next()) {
+    if (stream->mStreamLength >= 0) {
+      // The stream is at least as long as what we've read
+      stream->mStreamLength = PR_MAX(stream->mStreamLength, mChannelOffset);
+    }
+    stream->UpdatePrincipal(aPrincipal);
+    stream->mClient->CacheClientNotifyDataReceived();
+  }
+
   // Notify in case there's a waiting reader
   // XXX it would be fairly easy to optimize things a lot more to
   // avoid waking up reader threads unnecessarily
   mon.NotifyAll();
 }
 
 void
 nsMediaCacheStream::NotifyDataEnded(nsresult aStatus) 
 {
   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
 
   nsAutoMonitor mon(gMediaCache->Monitor());
-  if (NS_SUCCEEDED(aStatus)) {
-    // We read the whole stream, so remember the true length
-    mStreamLength = mChannelOffset;
-  }
 
   PRInt32 blockOffset = PRInt32(mChannelOffset%BLOCK_SIZE);
   if (blockOffset > 0) {
     // Write back the partial block
     memset(reinterpret_cast<char*>(mPartialBlockBuffer) + blockOffset, 0,
            BLOCK_SIZE - blockOffset);
     gMediaCache->AllocateAndWriteBlock(this, mPartialBlockBuffer,
         mMetadataInPartialBlockBuffer ? MODE_METADATA : MODE_PLAYBACK);
     // Wake up readers who may be waiting for this data
     mon.NotifyAll();
   }
+
+  nsMediaCache::ResourceStreamIterator iter(mResourceID);
+  while (nsMediaCacheStream* stream = iter.Next()) {
+    if (NS_SUCCEEDED(aStatus)) {
+      // We read the whole stream, so remember the true length
+      stream->mStreamLength = mChannelOffset;
+    }
+    stream->mClient->CacheClientNotifyDataEnded(aStatus);
+  }
 }
 
 nsMediaCacheStream::~nsMediaCacheStream()
 {
   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
   NS_ASSERTION(mClosed, "Stream was not closed");
   NS_ASSERTION(!mPinCount, "Unbalanced Pin");
 
@@ -2031,16 +2090,17 @@ nsresult
 nsMediaCacheStream::InitAsClone(nsMediaCacheStream* aOriginal)
 {
   if (mInitialized)
     return NS_OK;
 
   nsresult rv = Init();
   if (NS_FAILED(rv))
     return rv;
+  mResourceID = aOriginal->mResourceID;
 
   // Grab cache blocks from aOriginal as readahead blocks for our stream
   nsAutoMonitor mon(gMediaCache->Monitor());
 
   mPrincipal = aOriginal->mPrincipal;
   mStreamLength = aOriginal->mStreamLength;
   mIsSeekable = aOriginal->mIsSeekable;
 
--- a/content/media/nsMediaCache.h
+++ b/content/media/nsMediaCache.h
@@ -215,17 +215,17 @@ public:
   enum ReadMode {
     MODE_METADATA,
     MODE_PLAYBACK
   };
 
   // aClient provides the underlying transport that cache will use to read
   // data for this stream.
   nsMediaCacheStream(nsMediaChannelStream* aClient)
-    : mClient(aClient), mChannelOffset(0),
+    : mClient(aClient), mResourceID(0), mChannelOffset(0),
       mStreamOffset(0), mStreamLength(-1), mPlaybackBytesPerSecond(10000),
       mPinCount(0), mCurrentMode(MODE_PLAYBACK),
       mInitialized(PR_FALSE), mClosed(PR_FALSE),
       mIsSeekable(PR_FALSE), mCacheSuspended(PR_FALSE),
       mMetadataInPartialBlockBuffer(PR_FALSE),
       mUsingNullPrincipal(PR_FALSE) {}
   ~nsMediaCacheStream();
 
@@ -296,16 +296,18 @@ public:
   void Unpin();
   // See comments above for NotifyDataLength about how the length
   // can vary over time. Returns -1 if no length is known. Returns the
   // reported length if we haven't got any better information. If
   // the stream ended normally we return the length we actually got.
   // If we've successfully read data beyond the originally reported length,
   // we return the end of the data we've read.
   PRInt64 GetLength();
+  // Returns the unique resource ID
+  PRInt64 GetResourceID() { return mResourceID; }
   // Returns the end of the bytes starting at the given offset
   // which are in cache.
   PRInt64 GetCachedDataEnd(PRInt64 aOffset);
   // Returns the offset of the first byte of cached data at or after aOffset,
   // or -1 if there is no such cached data.
   PRInt64 GetNextCachedData(PRInt64 aOffset);
 
   // Reads from buffered data only. Will fail if not all data to be read is
@@ -421,16 +423,20 @@ private:
   // blocked on reading from this stream.
   void CloseInternal(nsAutoMonitor* aMonitor);
   // Update mPrincipal given that data has been received from aPrincipal
   void UpdatePrincipal(nsIPrincipal* aPrincipal);
 
   // These fields are main-thread-only.
   nsMediaChannelStream*  mClient;
   nsCOMPtr<nsIPrincipal> mPrincipal;
+  // This is a unique ID representing the resource we're loading.
+  // All streams with the same mResourceID are loading the same
+  // underlying resource and should share data.
+  PRInt64                mResourceID;
 
   // All other fields are all protected by the cache's monitor and
   // can be accessed by by any thread.
   // The offset where the next data from the channel will arrive
   PRInt64           mChannelOffset;
   // The offset where the reader is positioned in the stream
   PRInt64           mStreamOffset;
   // The reported or discovered length of the data, or -1 if nothing is
--- a/content/media/nsMediaStream.cpp
+++ b/content/media/nsMediaStream.cpp
@@ -266,19 +266,16 @@ nsMediaChannelStream::OnStopRequest(nsIR
     if (NS_SUCCEEDED(rv))
       return rv;
     // If the reopen/reseek fails, just fall through and treat this
     // error as fatal.
   }
 
   if (!mIgnoreClose) {
     mCacheStream.NotifyDataEnded(aStatus);
-    if (mDecoder) {
-      mDecoder->NotifyDownloadEnded(aStatus);
-    }
   }
 
   return NS_OK;
 }
 
 nsresult
 nsMediaChannelStream::OnChannelRedirect(nsIChannel* aOld, nsIChannel* aNew,
                                         PRUint32 aFlags)
@@ -334,21 +331,17 @@ nsMediaChannelStream::OnDataAvailable(ns
     PRUint32 read;
     nsresult rv = aStream->ReadSegments(CopySegmentToCache, &closure, count, 
                                         &read);
     if (NS_FAILED(rv))
       return rv;
     NS_ASSERTION(read > 0, "Read 0 bytes while data was available?");
     count -= read;
   }
-  mDecoder->NotifyBytesDownloaded();
 
-  // Fire a progress events according to the time and byte constraints outlined
-  // in the spec.
-  mDecoder->Progress(PR_FALSE);
   return NS_OK;
 }
 
 nsresult nsMediaChannelStream::Open(nsIStreamListener **aStreamListener)
 {
   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
 
   if (!mLock)
@@ -588,16 +581,58 @@ nsMediaChannelStream::RecreateChannel()
   return NS_NewChannel(getter_AddRefs(mChannel),
                        mURI,
                        nsnull,
                        loadGroup,
                        nsnull,
                        loadFlags);
 }
 
+void
+nsMediaChannelStream::DoNotifyDataReceived()
+{
+  mDataReceivedEvent.Revoke();
+  mDecoder->NotifyBytesDownloaded();
+}
+
+void
+nsMediaChannelStream::CacheClientNotifyDataReceived()
+{
+  NS_ASSERTION(NS_IsMainThread(), "Don't call on main thread");
+
+  if (mDataReceivedEvent.IsPending())
+    return;
+
+  mDataReceivedEvent =
+    new nsNonOwningRunnableMethod<nsMediaChannelStream>(this, &nsMediaChannelStream::DoNotifyDataReceived);
+  NS_DispatchToMainThread(mDataReceivedEvent.get(), NS_DISPATCH_NORMAL);
+}
+
+class DataEnded : public nsRunnable {
+public:
+  DataEnded(nsMediaDecoder* aDecoder, nsresult aStatus) :
+    mDecoder(aDecoder), mStatus(aStatus) {}
+  NS_IMETHOD Run() {
+    mDecoder->NotifyDownloadEnded(mStatus);
+    return NS_OK;
+  }
+private:
+  nsRefPtr<nsMediaDecoder> mDecoder;
+  nsresult                 mStatus;
+};
+
+void
+nsMediaChannelStream::CacheClientNotifyDataEnded(nsresult aStatus)
+{
+  NS_ASSERTION(NS_IsMainThread(), "Don't call on main thread");
+
+  nsCOMPtr<nsIRunnable> event = new DataEnded(mDecoder, aStatus);
+  NS_DispatchToMainThread(event, NS_DISPATCH_NORMAL);
+}
+
 nsresult
 nsMediaChannelStream::CacheClientSeek(PRInt64 aOffset, PRBool aResume)
 {
   NS_ASSERTION(NS_IsMainThread(), "Don't call on main thread");
 
   CloseChannel();
 
   if (aResume) {
--- a/content/media/nsMediaStream.h
+++ b/content/media/nsMediaStream.h
@@ -304,16 +304,24 @@ protected:
 class nsMediaChannelStream : public nsMediaStream
 {
 public:
   nsMediaChannelStream(nsMediaDecoder* aDecoder, nsIChannel* aChannel, nsIURI* aURI);
   ~nsMediaChannelStream();
 
   // These are called on the main thread by nsMediaCache. These must
   // not block or grab locks.
+  // Notify that data is available from the cache. This can happen even
+  // if this stream didn't read any data, since another stream might have
+  // received data for the same resource.
+  void CacheClientNotifyDataReceived();
+  // Notify that we reached the end of the stream. This can happen even
+  // if this stream didn't read any data, since another stream might have
+  // received data for the same resource.
+  void CacheClientNotifyDataEnded(nsresult aStatus);
   // Start a new load at the given aOffset. The old load is cancelled
   // and no more data from the old load will be notified via
   // nsMediaCacheStream::NotifyDataReceived/Ended.
   // This can fail.
   nsresult CacheClientSeek(PRInt64 aOffset, PRBool aResume);
   // Suspend the current load since data is currently not wanted
   nsresult CacheClientSuspend();
   // Resume the current load since data is wanted again
@@ -341,17 +349,16 @@ public:
   virtual void    Unpin();
   virtual double  GetDownloadRate(PRPackedBool* aIsReliable);
   virtual PRInt64 GetLength();
   virtual PRInt64 GetNextCachedData(PRInt64 aOffset);
   virtual PRInt64 GetCachedDataEnd(PRInt64 aOffset);
   virtual PRBool  IsDataCachedToEndOfStream(PRInt64 aOffset);
   virtual PRBool  IsSuspendedByCache();
 
-protected:
   class Listener : public nsIStreamListener,
                    public nsIInterfaceRequestor,
                    public nsIChannelEventSink
   {
   public:
     Listener(nsMediaChannelStream* aStream) : mStream(aStream) {}
 
     NS_DECL_ISUPPORTS
@@ -362,42 +369,48 @@ protected:
 
     void Revoke() { mStream = nsnull; }
 
   private:
     nsMediaChannelStream* mStream;
   };
   friend class Listener;
 
+protected:
   // These are called on the main thread by Listener.
   nsresult OnStartRequest(nsIRequest* aRequest);
   nsresult OnStopRequest(nsIRequest* aRequest, nsresult aStatus);
   nsresult OnDataAvailable(nsIRequest* aRequest,
                            nsIInputStream* aStream,
                            PRUint32 aCount);
   nsresult OnChannelRedirect(nsIChannel* aOld, nsIChannel* aNew, PRUint32 aFlags);
 
   // Opens the channel, using an HTTP byte range request to start at mOffset
   // if possible. Main thread only.
   nsresult OpenChannel(nsIStreamListener** aStreamListener);
   nsresult RecreateChannel();
   void SetupChannelHeaders();
   // Closes the channel. Main thread only.
   void CloseChannel();
 
+  void DoNotifyDataReceived();
+
   static NS_METHOD CopySegmentToCache(nsIInputStream *aInStream,
                                       void *aClosure,
                                       const char *aFromSegment,
                                       PRUint32 aToOffset,
                                       PRUint32 aCount,
                                       PRUint32 *aWriteCount);
 
   // Main thread access only
   PRInt64            mOffset;
   nsRefPtr<Listener> mListener;
+  // A data received event for the decoder that has been dispatched but has
+  // not yet been processed.
+  nsRevocableEventPtr<nsNonOwningRunnableMethod<nsMediaChannelStream> > mDataReceivedEvent;
   PRUint32           mSuspendCount;
   // When this flag is set, if we get a network error we should silently
   // reopen the stream.
   PRPackedBool       mReopenOnError;
   // When this flag is set, we should not report the next close of the
   // channel.
   PRPackedBool       mIgnoreClose;
 
--- a/content/media/ogg/nsOggDecoder.cpp
+++ b/content/media/ogg/nsOggDecoder.cpp
@@ -2264,16 +2264,17 @@ void nsOggDecoder::NotifySuspendedStatus
   }
 }
 
 void nsOggDecoder::NotifyBytesDownloaded()
 {
   NS_ASSERTION(NS_IsMainThread(),
                "nsOggDecoder::NotifyBytesDownloaded called on non-main thread");   
   UpdateReadyStateForData();
+  Progress(PR_FALSE);
 }
 
 void nsOggDecoder::NotifyDownloadEnded(nsresult aStatus)
 {
   if (aStatus == NS_BINDING_ABORTED)
     return;
 
   {
--- a/content/media/wave/nsWaveDecoder.cpp
+++ b/content/media/wave/nsWaveDecoder.cpp
@@ -1413,16 +1413,17 @@ nsWaveDecoder::NotifySuspendedStatusChan
     mElement->NotifyAutoplayDataReady();
   }
 }
 
 void
 nsWaveDecoder::NotifyBytesDownloaded()
 {
   UpdateReadyStateForData();
+  Progress(PR_FALSE);
 }
 
 void
 nsWaveDecoder::NotifyDownloadEnded(nsresult aStatus)
 {
   if (NS_SUCCEEDED(aStatus)) {
     ResourceLoaded();
   } else if (aStatus != NS_BASE_STREAM_CLOSED &&