Bug 1151760 - Separate timeout timer with end of stream. r=ettseng
authorJonathan Hao <jhao@mozilla.com>
Fri, 24 Apr 2015 15:25:09 +0800
changeset 241261 6b935b2fd11c27b102cb87c80d9eed2b9056ad7f
parent 241260 b2666b51ce73acbfec9688bdc1f2e4f05f3e51bb
child 241262 bc2b86afab42b77a1e486b8d809b3d3060facd0c
push id59066
push userryanvm@gmail.com
push dateMon, 27 Apr 2015 19:20:17 +0000
treeherdermozilla-inbound@92cfbfb1a464 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersettseng
bugs1151760
milestone40.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1151760 - Separate timeout timer with end of stream. r=ettseng
netwerk/protocol/rtsp/rtsp/RTSPConnectionHandler.h
--- a/netwerk/protocol/rtsp/rtsp/RTSPConnectionHandler.h
+++ b/netwerk/protocol/rtsp/rtsp/RTSPConnectionHandler.h
@@ -41,19 +41,25 @@
 
 #include "prlog.h"
 
 #include "prio.h"
 #include "prnetdb.h"
 
 extern PRLogModuleInfo* gRtspLog;
 
-// If no access units are received within 2 secs, assume that the rtp
-// stream has ended and signal end of stream.
-static int64_t kAccessUnitTimeoutUs = 2000000ll;
+// If no access units are received within 10 secs, assume that the rtp
+// stream has ended and abort.
+static int64_t kAccessUnitTimeoutUs = 10000000ll;
+
+// The end-of-stream timer will be running in the last 2 seconds of duration.
+static int64_t kActivateEndOfStreamTimerUs = 2000000ll;
+
+// The end-of-stream timer will timeout in 2 seconds.
+static int64_t kEndOfStreamTimeoutUs = 2000000ll;
 
 // If no access units arrive for the first 10 secs after starting the
 // stream, assume none ever will and signal EOS or switch transports.
 static int64_t kPlayTimeoutUs = 10000000ll;
 
 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
 
 namespace android {
@@ -97,17 +103,18 @@ struct RtspConnectionHandler : public AH
         kWhatPlay,
         kWhatResume,
         kWhatKeepAlive,
         kWhatOptions,
         kWhatEndOfStream,
         kWhatAbort,
         kWhatTeardown,
         kWhatQuit,
-        kWhatCheck,
+        kWhatAccessUnitTimeoutCheck,
+        kWhatEndOfStreamCheck,
         kWhatSeekDone,
         kWhatPausedDone,
         kWhatAccessUnitComplete,
         kWhatAccessUnit,
         kWhatSeek1,
         kWhatSeek2,
         kWhatBinary,
         kWhatTimeout,
@@ -134,16 +141,18 @@ struct RtspConnectionHandler : public AH
           mSetupTracksSuccessful(false),
           mSeekPending(false),
           mPausePending(false),
           mAborted(false),
           mFirstAccessUnit(true),
           mNTPAnchorUs(-1),
           mMediaAnchorUs(-1),
           mLastMediaTimeUs(0),
+          mNumAccessUnitsReceived(0),
+          mCheckPending(false),
           mCheckGeneration(0),
           mTryTCPInterleaving(false),
           mTryFakeRTCP(false),
           mReceivedFirstRTCPPacket(false),
           mReceivedFirstRTPPacket(false),
           mSeekable(false),
           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
           mKeepAliveGeneration(0),
@@ -192,29 +201,30 @@ struct RtspConnectionHandler : public AH
 
     void seek(int64_t timeUs) {
         sp<AMessage> msg = new AMessage(kWhatSeek, id());
         msg->setInt64("time", timeUs);
         msg->post();
     }
 
     void setCheckPending(bool flag) {
+        mCheckPending = flag;
         for (size_t i = 0; i < mTracks.size(); ++i) {
-            setCheckPending(i, flag);
+            setEndOfStreamCheckPending(i, flag);
         }
     }
 
-    void setCheckPending(size_t trackIndex, bool flag) {
+    void setEndOfStreamCheckPending(size_t trackIndex, bool flag) {
         TrackInfo *info = &mTracks.editItemAt(trackIndex);
         if (info) {
             info->mCheckPendings = flag;
         }
     }
 
-    bool getCheckPending(size_t trackIndex) {
+    bool getEndOfStreamCheckPending(size_t trackIndex) {
         TrackInfo *info = &mTracks.editItemAt(trackIndex);
         return info->mCheckPendings;
     }
 
     void play(uint64_t timeUs) {
         AString request = "PLAY ";
         request.append(mSessionURL);
         request.append(" RTSP/1.0\r\n");
@@ -944,46 +954,63 @@ struct RtspConnectionHandler : public AH
                 CHECK(msg->findInt32("result", &result));
                 sp<AMessage> msg = mNotify->dup();
                 msg->setInt32("what", kWhatDisconnected);
                 msg->setInt32("result", result);
                 msg->post();
                 break;
             }
 
-            case kWhatCheck:
+            case kWhatAccessUnitTimeoutCheck:
+            {
+                int32_t generation;
+                CHECK(msg->findInt32("generation", &generation));
+                if (generation != mCheckGeneration) {
+                    // This is an outdated message. Ignore.
+                    break;
+                }
+
+                if (mNumAccessUnitsReceived == 0) {
+                    LOGI("stream ended? aborting.");
+                    disconnect();
+                    break;
+                }
+                mNumAccessUnitsReceived = 0;
+                msg->post(kAccessUnitTimeoutUs);
+                break;
+            }
+
+            case kWhatEndOfStreamCheck:
             {
                 int32_t generation;
                 CHECK(msg->findInt32("generation", &generation));
                 if (generation != mCheckGeneration) {
                     // This is an outdated message. Ignore.
                     break;
                 }
                 size_t trackIndex;
                 msg->findSize("trackIndex", &trackIndex);
                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
                 if (!track) {
                   break;
                 }
 
                 if (track->mNumAccessUnitsReceiveds == 0) {
-                    LOGI("stream ended? aborting.");
                     if (gIOService->IsOffline()) {
-                        sp<AMessage> reply = new AMessage(kWhatDisconnected, id());
-                        reply->setInt32("result", ERROR_CONNECTION_LOST);
-                        mConn->disconnect(reply);
+                        LOGI("stream ended? aborting.");
+                        disconnect();
                         break;
                     }
                     sp<AMessage> endStreamMsg = new AMessage(kWhatEndOfStream, id());
                     endStreamMsg->setSize("trackIndex", trackIndex);
                     endStreamMsg->post();
                     break;
                 }
                 track->mNumAccessUnitsReceiveds = 0;
-                msg->post(kAccessUnitTimeoutUs);
+                msg->post(kEndOfStreamTimeoutUs);
                 break;
             }
 
             case kWhatAccessUnit:
             {
                 int32_t timeUpdate;
                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
                     size_t trackIndex;
@@ -1004,29 +1031,29 @@ struct RtspConnectionHandler : public AH
                     break;
                 }
 
                 if (msg->findInt32("first-rtp", &first)) {
                     mReceivedFirstRTPPacket = true;
                     break;
                 }
 
+                mNumAccessUnitsReceived++;
+                postAccessUnitTimeoutCheck();
+
                 size_t trackIndex;
                 CHECK(msg->findSize("track-index", &trackIndex));
 
                 if (trackIndex >= mTracks.size()) {
                     LOGV("late packets ignored.");
                     break;
                 }
 
                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
 
-                track->mNumAccessUnitsReceiveds++;
-                postAccessUnitTimeoutCheck(trackIndex);
-
                 int32_t eos;
                 if (msg->findInt32("eos", &eos)) {
                     LOGI("received BYE on track index %d", trackIndex);
                     return;
                 }
 
                 sp<RefBase> obj;
                 CHECK(msg->findObject("access-unit", &obj));
@@ -1056,16 +1083,27 @@ struct RtspConnectionHandler : public AH
                     break;
                 }
 
                 if (track->mNewSegment) {
                     track->mNewSegment = false;
                 }
 
                 onAccessUnitComplete(trackIndex, accessUnit);
+
+                // This code is put here because now accessUnit has timestamp.
+                track->mNumAccessUnitsReceiveds++;
+                int64_t duration, timeUs;
+                mSessionDesc->getDurationUs(&duration);
+                accessUnit->meta()->findInt64("timeUs", &timeUs);
+
+                // Start a timer to detect end-of-stream if close to the end.
+                if (timeUs >= duration - kActivateEndOfStreamTimerUs) {
+                    postEndOfStreamCheck(trackIndex);
+                }
                 break;
             }
 
             case kWhatSeek:
             {
                 if (!mSeekable) {
                     LOGW("This is a live stream, ignoring seek request.");
 
@@ -1146,19 +1184,22 @@ struct RtspConnectionHandler : public AH
 
                 LOGI("PLAY completed with result %d (%s)",
                      result, strerror(-result));
                 if (mAborted || !mSeekPending) {
                     LOGV("We're aborted, dropping stale packet.");
                     break;
                 }
 
+                mCheckPending = false;
+                postAccessUnitTimeoutCheck();
+
                 for (size_t i = 0; i < mTracks.size(); i++) {
-                    setCheckPending(i, false);
-                    postAccessUnitTimeoutCheck(i);
+                    setEndOfStreamCheckPending(i, false);
+                    postEndOfStreamCheck(i);
                 }
 
                 if (result == OK) {
                     sp<RefBase> obj;
                     CHECK(msg->findObject("response", &obj));
                     sp<ARTSPResponse> response =
                         static_cast<ARTSPResponse *>(obj.get());
 
@@ -1258,24 +1299,34 @@ struct RtspConnectionHandler : public AH
     }
 
     void postKeepAlive() {
         sp<AMessage> msg = new AMessage(kWhatKeepAlive, id());
         msg->setInt32("generation", mKeepAliveGeneration);
         msg->post((mKeepAliveTimeoutUs * 9) / 10);
     }
 
-    void postAccessUnitTimeoutCheck(size_t trackIndex) {
-        if (getCheckPending(trackIndex)) {
+    void postEndOfStreamCheck(size_t trackIndex) {
+        if (getEndOfStreamCheckPending(trackIndex)) {
             return;
         }
-        setCheckPending(trackIndex, true);
-        sp<AMessage> check = new AMessage(kWhatCheck, id());
+        setEndOfStreamCheckPending(trackIndex, true);
+        sp<AMessage> check = new AMessage(kWhatEndOfStreamCheck, id());
         check->setInt32("generation", mCheckGeneration);
         check->setSize("trackIndex", trackIndex);
+        check->post(kEndOfStreamTimeoutUs);
+    }
+
+    void postAccessUnitTimeoutCheck() {
+        if (mCheckPending) {
+            return;
+        }
+        mCheckPending = true;
+        sp<AMessage> check = new AMessage(kWhatAccessUnitTimeoutCheck, id());
+        check->setInt32("generation", mCheckGeneration);
         check->post(kAccessUnitTimeoutUs);
     }
 
     static void SplitString(
             const AString &s, const char *separator, List<AString> *items) {
         items->clear();
         size_t start = 0;
         while (start < s.size()) {
@@ -1452,16 +1503,18 @@ private:
     bool mPausePending;
     bool mAborted;
     bool mFirstAccessUnit;
 
     int64_t mNTPAnchorUs;
     int64_t mMediaAnchorUs;
     int64_t mLastMediaTimeUs;
 
+    int64_t mNumAccessUnitsReceived;
+    bool mCheckPending;
     int32_t mCheckGeneration;
     bool mTryTCPInterleaving;
     bool mTryFakeRTCP;
     bool mReceivedFirstRTCPPacket;
     bool mReceivedFirstRTPPacket;
     bool mSeekable;
     int64_t mKeepAliveTimeoutUs;
     int32_t mKeepAliveGeneration;