author | Jonathan Hao <jhao@mozilla.com> |
Fri, 24 Apr 2015 15:25:09 +0800 | |
changeset 241261 | 6b935b2fd11c27b102cb87c80d9eed2b9056ad7f |
parent 241260 | b2666b51ce73acbfec9688bdc1f2e4f05f3e51bb |
child 241262 | bc2b86afab42b77a1e486b8d809b3d3060facd0c |
push id | 59066 |
push user | ryanvm@gmail.com |
push date | Mon, 27 Apr 2015 19:20:17 +0000 |
treeherder | mozilla-inbound@92cfbfb1a464 [default view] [failures only] |
perfherder | [talos] [build metrics] [platform microbench] (compared to previous push) |
reviewers | ettseng |
bugs | 1151760 |
milestone | 40.0a1 |
first release with | nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
|
last release without | nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
|
--- a/netwerk/protocol/rtsp/rtsp/RTSPConnectionHandler.h +++ b/netwerk/protocol/rtsp/rtsp/RTSPConnectionHandler.h @@ -41,19 +41,25 @@ #include "prlog.h" #include "prio.h" #include "prnetdb.h" extern PRLogModuleInfo* gRtspLog; -// If no access units are received within 2 secs, assume that the rtp -// stream has ended and signal end of stream. -static int64_t kAccessUnitTimeoutUs = 2000000ll; +// If no access units are received within 10 secs, assume that the rtp +// stream has ended and abort. +static int64_t kAccessUnitTimeoutUs = 10000000ll; + +// The end-of-stream timer will be running in the last 2 seconds of duration. +static int64_t kActivateEndOfStreamTimerUs = 2000000ll; + +// The end-of-stream timer will timeout in 2 seconds. +static int64_t kEndOfStreamTimeoutUs = 2000000ll; // If no access units arrive for the first 10 secs after starting the // stream, assume none ever will and signal EOS or switch transports. static int64_t kPlayTimeoutUs = 10000000ll; static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; namespace android { @@ -97,17 +103,18 @@ struct RtspConnectionHandler : public AH kWhatPlay, kWhatResume, kWhatKeepAlive, kWhatOptions, kWhatEndOfStream, kWhatAbort, kWhatTeardown, kWhatQuit, - kWhatCheck, + kWhatAccessUnitTimeoutCheck, + kWhatEndOfStreamCheck, kWhatSeekDone, kWhatPausedDone, kWhatAccessUnitComplete, kWhatAccessUnit, kWhatSeek1, kWhatSeek2, kWhatBinary, kWhatTimeout, @@ -134,16 +141,18 @@ struct RtspConnectionHandler : public AH mSetupTracksSuccessful(false), mSeekPending(false), mPausePending(false), mAborted(false), mFirstAccessUnit(true), mNTPAnchorUs(-1), mMediaAnchorUs(-1), mLastMediaTimeUs(0), + mNumAccessUnitsReceived(0), + mCheckPending(false), mCheckGeneration(0), mTryTCPInterleaving(false), mTryFakeRTCP(false), mReceivedFirstRTCPPacket(false), mReceivedFirstRTPPacket(false), mSeekable(false), mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), mKeepAliveGeneration(0), @@ -192,29 +201,30 @@ struct RtspConnectionHandler : public AH void seek(int64_t timeUs) { sp<AMessage> msg = new AMessage(kWhatSeek, id()); msg->setInt64("time", timeUs); msg->post(); } void setCheckPending(bool flag) { + mCheckPending = flag; for (size_t i = 0; i < mTracks.size(); ++i) { - setCheckPending(i, flag); + setEndOfStreamCheckPending(i, flag); } } - void setCheckPending(size_t trackIndex, bool flag) { + void setEndOfStreamCheckPending(size_t trackIndex, bool flag) { TrackInfo *info = &mTracks.editItemAt(trackIndex); if (info) { info->mCheckPendings = flag; } } - bool getCheckPending(size_t trackIndex) { + bool getEndOfStreamCheckPending(size_t trackIndex) { TrackInfo *info = &mTracks.editItemAt(trackIndex); return info->mCheckPendings; } void play(uint64_t timeUs) { AString request = "PLAY "; request.append(mSessionURL); request.append(" RTSP/1.0\r\n"); @@ -944,46 +954,63 @@ struct RtspConnectionHandler : public AH CHECK(msg->findInt32("result", &result)); sp<AMessage> msg = mNotify->dup(); msg->setInt32("what", kWhatDisconnected); msg->setInt32("result", result); msg->post(); break; } - case kWhatCheck: + case kWhatAccessUnitTimeoutCheck: + { + int32_t generation; + CHECK(msg->findInt32("generation", &generation)); + if (generation != mCheckGeneration) { + // This is an outdated message. Ignore. + break; + } + + if (mNumAccessUnitsReceived == 0) { + LOGI("stream ended? aborting."); + disconnect(); + break; + } + mNumAccessUnitsReceived = 0; + msg->post(kAccessUnitTimeoutUs); + break; + } + + case kWhatEndOfStreamCheck: { int32_t generation; CHECK(msg->findInt32("generation", &generation)); if (generation != mCheckGeneration) { // This is an outdated message. Ignore. break; } size_t trackIndex; msg->findSize("trackIndex", &trackIndex); TrackInfo *track = &mTracks.editItemAt(trackIndex); if (!track) { break; } if (track->mNumAccessUnitsReceiveds == 0) { - LOGI("stream ended? aborting."); if (gIOService->IsOffline()) { - sp<AMessage> reply = new AMessage(kWhatDisconnected, id()); - reply->setInt32("result", ERROR_CONNECTION_LOST); - mConn->disconnect(reply); + LOGI("stream ended? aborting."); + disconnect(); break; } sp<AMessage> endStreamMsg = new AMessage(kWhatEndOfStream, id()); endStreamMsg->setSize("trackIndex", trackIndex); endStreamMsg->post(); break; } track->mNumAccessUnitsReceiveds = 0; - msg->post(kAccessUnitTimeoutUs); + msg->post(kEndOfStreamTimeoutUs); break; } case kWhatAccessUnit: { int32_t timeUpdate; if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { size_t trackIndex; @@ -1004,29 +1031,29 @@ struct RtspConnectionHandler : public AH break; } if (msg->findInt32("first-rtp", &first)) { mReceivedFirstRTPPacket = true; break; } + mNumAccessUnitsReceived++; + postAccessUnitTimeoutCheck(); + size_t trackIndex; CHECK(msg->findSize("track-index", &trackIndex)); if (trackIndex >= mTracks.size()) { LOGV("late packets ignored."); break; } TrackInfo *track = &mTracks.editItemAt(trackIndex); - track->mNumAccessUnitsReceiveds++; - postAccessUnitTimeoutCheck(trackIndex); - int32_t eos; if (msg->findInt32("eos", &eos)) { LOGI("received BYE on track index %d", trackIndex); return; } sp<RefBase> obj; CHECK(msg->findObject("access-unit", &obj)); @@ -1056,16 +1083,27 @@ struct RtspConnectionHandler : public AH break; } if (track->mNewSegment) { track->mNewSegment = false; } onAccessUnitComplete(trackIndex, accessUnit); + + // This code is put here because now accessUnit has timestamp. + track->mNumAccessUnitsReceiveds++; + int64_t duration, timeUs; + mSessionDesc->getDurationUs(&duration); + accessUnit->meta()->findInt64("timeUs", &timeUs); + + // Start a timer to detect end-of-stream if close to the end. + if (timeUs >= duration - kActivateEndOfStreamTimerUs) { + postEndOfStreamCheck(trackIndex); + } break; } case kWhatSeek: { if (!mSeekable) { LOGW("This is a live stream, ignoring seek request."); @@ -1146,19 +1184,22 @@ struct RtspConnectionHandler : public AH LOGI("PLAY completed with result %d (%s)", result, strerror(-result)); if (mAborted || !mSeekPending) { LOGV("We're aborted, dropping stale packet."); break; } + mCheckPending = false; + postAccessUnitTimeoutCheck(); + for (size_t i = 0; i < mTracks.size(); i++) { - setCheckPending(i, false); - postAccessUnitTimeoutCheck(i); + setEndOfStreamCheckPending(i, false); + postEndOfStreamCheck(i); } if (result == OK) { sp<RefBase> obj; CHECK(msg->findObject("response", &obj)); sp<ARTSPResponse> response = static_cast<ARTSPResponse *>(obj.get()); @@ -1258,24 +1299,34 @@ struct RtspConnectionHandler : public AH } void postKeepAlive() { sp<AMessage> msg = new AMessage(kWhatKeepAlive, id()); msg->setInt32("generation", mKeepAliveGeneration); msg->post((mKeepAliveTimeoutUs * 9) / 10); } - void postAccessUnitTimeoutCheck(size_t trackIndex) { - if (getCheckPending(trackIndex)) { + void postEndOfStreamCheck(size_t trackIndex) { + if (getEndOfStreamCheckPending(trackIndex)) { return; } - setCheckPending(trackIndex, true); - sp<AMessage> check = new AMessage(kWhatCheck, id()); + setEndOfStreamCheckPending(trackIndex, true); + sp<AMessage> check = new AMessage(kWhatEndOfStreamCheck, id()); check->setInt32("generation", mCheckGeneration); check->setSize("trackIndex", trackIndex); + check->post(kEndOfStreamTimeoutUs); + } + + void postAccessUnitTimeoutCheck() { + if (mCheckPending) { + return; + } + mCheckPending = true; + sp<AMessage> check = new AMessage(kWhatAccessUnitTimeoutCheck, id()); + check->setInt32("generation", mCheckGeneration); check->post(kAccessUnitTimeoutUs); } static void SplitString( const AString &s, const char *separator, List<AString> *items) { items->clear(); size_t start = 0; while (start < s.size()) { @@ -1452,16 +1503,18 @@ private: bool mPausePending; bool mAborted; bool mFirstAccessUnit; int64_t mNTPAnchorUs; int64_t mMediaAnchorUs; int64_t mLastMediaTimeUs; + int64_t mNumAccessUnitsReceived; + bool mCheckPending; int32_t mCheckGeneration; bool mTryTCPInterleaving; bool mTryFakeRTCP; bool mReceivedFirstRTCPPacket; bool mReceivedFirstRTPPacket; bool mSeekable; int64_t mKeepAliveTimeoutUs; int32_t mKeepAliveGeneration;