bug 1072780: patch 4 - Use atomics for EnsureNextIteration to close races around CurrentDriver r=roc
--- a/content/media/GraphDriver.cpp
+++ b/content/media/GraphDriver.cpp
@@ -48,17 +48,16 @@ struct AutoProfilerUnregisterThread
GraphDriver::GraphDriver(MediaStreamGraphImpl* aGraphImpl)
: mIterationStart(0),
mIterationEnd(0),
mStateComputedTime(0),
mNextStateComputedTime(0),
mGraphImpl(aGraphImpl),
mWaitState(WAITSTATE_RUNNING),
- mNeedAnotherIteration(false),
mCurrentTimeStamp(TimeStamp::Now()),
mPreviousDriver(nullptr),
mNextDriver(nullptr)
{ }
void GraphDriver::SetGraphTime(GraphDriver* aPreviousDriver,
GraphTime aLastSwitchNextIterationStart,
GraphTime aLastSwitchNextIterationEnd,
@@ -90,16 +89,17 @@ void GraphDriver::SwitchAtNextIteration(
MOZ_ASSERT(!mNextDriver || mNextDriver->AsAudioCallbackDriver());
mNextDriver = aNextDriver;
}
void GraphDriver::EnsureImmediateWakeUpLocked()
{
mGraphImpl->GetMonitor().AssertCurrentThreadOwns();
mWaitState = WAITSTATE_WAKING_UP;
+ mGraphImpl->mGraphDriverAsleep = false; // atomic
mGraphImpl->GetMonitor().Notify();
}
void GraphDriver::UpdateStateComputedTime(GraphTime aStateComputedTime)
{
MOZ_ASSERT(aStateComputedTime > mIterationEnd);
// The next state computed time can be the same as the previous, here: it
// means the driver would be have been blocking indefinitly, but the graph has
@@ -108,32 +108,17 @@ void GraphDriver::UpdateStateComputedTim
printf("State time can't go backward %ld < %ld.\n", static_cast<long>(aStateComputedTime), static_cast<long>(mStateComputedTime));
}
mStateComputedTime = aStateComputedTime;
}
void GraphDriver::EnsureNextIteration()
{
- MonitorAutoLock lock(mGraphImpl->GetMonitor());
- EnsureNextIterationLocked();
-}
-
-void GraphDriver::EnsureNextIterationLocked()
-{
- mGraphImpl->GetMonitor().AssertCurrentThreadOwns();
-
- if (IsWaitingIndefinitly()) {
- WakeUp();
- }
-
- if (mNeedAnotherIteration) {
- return;
- }
- mNeedAnotherIteration = true;
+ mGraphImpl->EnsureNextIteration();
}
class MediaStreamGraphShutdownThreadRunnable : public nsRunnable {
public:
explicit MediaStreamGraphShutdownThreadRunnable(GraphDriver* aDriver)
: mDriver(aDriver)
{
}
@@ -356,44 +341,52 @@ OfflineClockDriver::GetCurrentTimeStamp(
void
SystemClockDriver::WaitForNextIteration()
{
mGraphImpl->GetMonitor().AssertCurrentThreadOwns();
PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT;
TimeStamp now = TimeStamp::Now();
- if (mNeedAnotherIteration) {
+ if (mGraphImpl->mNeedAnotherIteration) {
int64_t timeoutMS = MEDIA_GRAPH_TARGET_PERIOD_MS -
int64_t((now - mCurrentTimeStamp).ToMilliseconds());
// Make sure timeoutMS doesn't overflow 32 bits by waking up at
// least once a minute, if we need to wake up at all
timeoutMS = std::max<int64_t>(0, std::min<int64_t>(timeoutMS, 60*1000));
timeout = PR_MillisecondsToInterval(uint32_t(timeoutMS));
STREAM_LOG(PR_LOG_DEBUG+1, ("Waiting for next iteration; at %f, timeout=%f", (now - mInitialTimeStamp).ToSeconds(), timeoutMS/1000.0));
+ if (mWaitState == WAITSTATE_WAITING_INDEFINITELY) {
+ mGraphImpl->mGraphDriverAsleep = false; // atomic
+ }
mWaitState = WAITSTATE_WAITING_FOR_NEXT_ITERATION;
} else {
+ mGraphImpl->mGraphDriverAsleep = true; // atomic
mWaitState = WAITSTATE_WAITING_INDEFINITELY;
}
if (timeout > 0) {
mGraphImpl->GetMonitor().Wait(timeout);
STREAM_LOG(PR_LOG_DEBUG+1, ("Resuming after timeout; at %f, elapsed=%f",
(TimeStamp::Now() - mInitialTimeStamp).ToSeconds(),
(TimeStamp::Now() - now).ToSeconds()));
}
+ if (mWaitState == WAITSTATE_WAITING_INDEFINITELY) {
+ mGraphImpl->mGraphDriverAsleep = false; // atomic
+ }
mWaitState = WAITSTATE_RUNNING;
- mNeedAnotherIteration = false;
+ mGraphImpl->mNeedAnotherIteration = false;
}
void
SystemClockDriver::WakeUp()
{
mGraphImpl->GetMonitor().AssertCurrentThreadOwns();
mWaitState = WAITSTATE_WAKING_UP;
+ mGraphImpl->mGraphDriverAsleep = false; // atomic
mGraphImpl->GetMonitor().Notify();
}
OfflineClockDriver::OfflineClockDriver(MediaStreamGraphImpl* aGraphImpl, GraphTime aSlice)
: ThreadedDriver(aGraphImpl),
mSlice(aSlice)
{
@@ -505,22 +498,24 @@ AsyncCubebTask::Run()
mDriver = nullptr;
mShutdownGrip = nullptr;
break;
case AsyncCubebOperation::SLEEP: {
{
LIFECYCLE_LOG("AsyncCubebOperation::SLEEP\n");
MonitorAutoLock mon(mDriver->mGraphImpl->GetMonitor());
// We might just have been awoken
- if (mDriver->mNeedAnotherIteration) {
+ if (mDriver->mGraphImpl->mNeedAnotherIteration) {
mDriver->mPauseRequested = false;
mDriver->mWaitState = AudioCallbackDriver::WAITSTATE_RUNNING;
+ mDriver->mGraphImpl->mGraphDriverAsleep = false ; // atomic
break;
}
mDriver->Stop();
+ mDriver->mGraphImpl->mGraphDriverAsleep = true; // atomic
mDriver->mWaitState = AudioCallbackDriver::WAITSTATE_WAITING_INDEFINITELY;
mDriver->mPauseRequested = false;
mDriver->mGraphImpl->GetMonitor().Wait(PR_INTERVAL_NO_TIMEOUT);
}
STREAM_LOG(PR_LOG_DEBUG, ("Restarting audio stream from sleep."));
mDriver->StartStream();
break;
}
@@ -713,17 +708,17 @@ AudioCallbackDriver::GetCurrentTime()
void AudioCallbackDriver::WaitForNextIteration()
{
#if 0
mGraphImpl->GetMonitor().AssertCurrentThreadOwns();
// We can't block on the monitor in the audio callback, so we kick off a new
// thread that will pause the audio stream, and restart it when unblocked.
// We don't want to sleep when we haven't started the driver yet.
- if (!mNeedAnotherIteration && mAudioStream && mGraphImpl->Running()) {
+ if (!mGraphImpl->mNeedAnotherIteration && mAudioStream && mGraphImpl->Running()) {
STREAM_LOG(PR_LOG_DEBUG+1, ("AudioCallbackDriver going to sleep"));
mPauseRequested = true;
nsRefPtr<AsyncCubebTask> sleepEvent =
new AsyncCubebTask(this, AsyncCubebTask::SLEEP);
sleepEvent->Dispatch();
}
#endif
}
--- a/content/media/GraphDriver.h
+++ b/content/media/GraphDriver.h
@@ -223,18 +223,16 @@ protected:
// RunThread() is paused indefinitely waiting for something to change
WAITSTATE_WAITING_INDEFINITELY,
// Something has signaled RunThread() to wake up immediately,
// but it hasn't done so yet
WAITSTATE_WAKING_UP
};
WaitState mWaitState;
- // True if the graph needs another iteration after the current iteration.
- bool mNeedAnotherIteration;
TimeStamp mCurrentTimeStamp;
// This is non-null only when this driver has recently switched from an other
// driver, and has not cleaned it up yet (for example because the audio stream
// is currently calling the callback during initialization).
nsRefPtr<GraphDriver> mPreviousDriver;
// This is non-null only when this driver is going to switch to an other
// driver at the end of this iteration.
nsRefPtr<GraphDriver> mNextDriver;
--- a/content/media/MediaStreamGraph.cpp
+++ b/content/media/MediaStreamGraph.cpp
@@ -90,17 +90,17 @@ MediaStreamGraphImpl::FinishStream(Media
if (aStream->mFinished)
return;
STREAM_LOG(PR_LOG_DEBUG, ("MediaStream %p will finish", aStream));
aStream->mFinished = true;
aStream->mBuffer.AdvanceKnownTracksTime(STREAM_TIME_MAX);
// Force at least one more iteration of the control loop, since we rely
// on UpdateCurrentTimeForStreams to notify our listeners once the stream end
// has been reached.
- CurrentDriver()->EnsureNextIteration();
+ EnsureNextIteration();
SetStreamOrderDirty();
}
void
MediaStreamGraphImpl::AddStream(MediaStream* aStream)
{
aStream->mBufferStartTime = IterationEnd();
@@ -773,17 +773,17 @@ MediaStreamGraphImpl::RecomputeBlocking(
STREAM_LOG(PR_LOG_DEBUG+1, ("Media graph %p computed blocking for interval %f to %f",
this, MediaTimeToSeconds(CurrentDriver()->StateComputedTime()),
MediaTimeToSeconds(aEndBlockingDecisions)));
CurrentDriver()->UpdateStateComputedTime(aEndBlockingDecisions);
if (blockingDecisionsWillChange) {
// Make sure we wake up to notify listeners about these changes.
- CurrentDriver()->EnsureNextIteration();
+ EnsureNextIteration();
}
}
void
MediaStreamGraphImpl::AddBlockingRelatedStreamsToSet(nsTArray<MediaStream*>* aStreams,
MediaStream* aStream)
{
if (aStream->mInBlockingSet)
@@ -1283,17 +1283,17 @@ MediaStreamGraphImpl::UpdateGraph(GraphT
// The loop is woken up so soon that IterationEnd() barely advances and we
// end up having aEndBlockingDecision == CurrentDriver()->StateComputedTime().
// Since stream blocking is computed in the interval of
// [CurrentDriver()->StateComputedTime(), aEndBlockingDecision), it won't be computed at all.
// We should ensure next iteration so that pending blocking changes will be
// computed in next loop.
if (ensureNextIteration ||
aEndBlockingDecision == CurrentDriver()->StateComputedTime()) {
- CurrentDriver()->EnsureNextIteration();
+ EnsureNextIteration();
}
// Figure out which streams are blocked and when.
RecomputeBlocking(aEndBlockingDecision);
}
void
MediaStreamGraphImpl::Process(GraphTime aFrom, GraphTime aTo)
@@ -1377,17 +1377,17 @@ MediaStreamGraphImpl::Process(GraphTime
isStarted = CurrentDriver()->AsAudioCallbackDriver()->IsStarted();
}
if (isStarted) {
mMixer.RemoveCallback(CurrentDriver()->AsAudioCallbackDriver());
}
}
if (!allBlockedForever) {
- CurrentDriver()->EnsureNextIteration();
+ EnsureNextIteration();
}
}
bool
MediaStreamGraphImpl::OneIteration(GraphTime aFrom, GraphTime aTo,
GraphTime aStateFrom, GraphTime aStateEnd)
{
{
@@ -1463,17 +1463,17 @@ MediaStreamGraphImpl::ApplyStreamUpdate(
void
MediaStreamGraphImpl::ForceShutDown()
{
NS_ASSERTION(NS_IsMainThread(), "Must be called on main thread");
STREAM_LOG(PR_LOG_DEBUG, ("MediaStreamGraph %p ForceShutdown", this));
{
MonitorAutoLock lock(mMonitor);
mForceShutDown = true;
- CurrentDriver()->EnsureNextIterationLocked();
+ EnsureNextIterationLocked();
}
}
namespace {
class MediaStreamGraphShutDownRunnable : public nsRunnable {
public:
explicit MediaStreamGraphShutDownRunnable(MediaStreamGraphImpl* aGraph)
@@ -1639,17 +1639,17 @@ MediaStreamGraphImpl::RunInStableState(b
}
}
} else {
if (mLifecycleState <= LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP) {
MessageBlock* block = mBackMessageQueue.AppendElement();
block->mMessages.SwapElements(mCurrentTaskMessageQueue);
block->mGraphUpdateIndex = mNextGraphUpdateIndex;
++mNextGraphUpdateIndex;
- CurrentDriver()->EnsureNextIterationLocked();
+ EnsureNextIterationLocked();
}
// If the MediaStreamGraph has more messages going to it, try to revive
// it to process those messages. Don't do this if we're in a forced
// shutdown or it's a non-realtime graph that has already terminated
// processing.
if (mLifecycleState == LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP &&
mRealtime && !mForceShutDown) {
@@ -2707,16 +2707,18 @@ ProcessedMediaStream::DestroyImpl()
}
MediaStreamGraphImpl::MediaStreamGraphImpl(bool aRealtime,
TrackRate aSampleRate,
DOMMediaStream::TrackTypeHints aHint= DOMMediaStream::HINT_CONTENTS_UNKNOWN,
dom::AudioChannel aChannel)
: mProcessingGraphUpdateIndex(0)
, mPortCount(0)
+ , mNeedAnotherIteration(false)
+ , mGraphDriverAsleep(false)
, mMonitor("MediaStreamGraphImpl")
, mLifecycleState(LIFECYCLE_THREAD_NOT_STARTED)
, mEndTime(GRAPH_TIME_MAX)
, mSampleRate(aSampleRate)
, mForceShutDown(false)
, mPostedRunInStableStateEvent(false)
, mDetectedNotRunning(false)
, mPostedRunInStableState(false)
--- a/content/media/MediaStreamGraphImpl.h
+++ b/content/media/MediaStreamGraphImpl.h
@@ -450,24 +450,29 @@ public:
#endif
mDriver = aDriver;
}
Monitor& GetMonitor() {
return mMonitor;
}
- /**
- * Must implement here to avoid dangerous data races around CurrentDriver() -
- * we don't want stuff off MSG thread using "graph->CurrentDriver()->EnsureNextIteration()"
- * because CurrentDriver may change (and it's a TSAN data race)
- */
void EnsureNextIteration() {
- MonitorAutoLock mon(mMonitor);
- CurrentDriver()->EnsureNextIterationLocked();
+ mNeedAnotherIteration = true; // atomic
+ if (mGraphDriverAsleep) { // atomic
+ MonitorAutoLock mon(mMonitor);
+ CurrentDriver()->WakeUp(); // Might not be the same driver; might have woken already
+ }
+ }
+
+ void EnsureNextIterationLocked() {
+ mNeedAnotherIteration = true; // atomic
+ if (mGraphDriverAsleep) { // atomic
+ CurrentDriver()->WakeUp(); // Might not be the same driver; might have woken already
+ }
}
// Data members
//
/**
* Graphs own owning references to their driver, until shutdown. When a driver
* switch occur, previous driver is either deleted, or it's ownership is
* passed to a event that will take care of the asynchronous cleanup, as
@@ -499,25 +504,30 @@ public:
* Which update batch we are currently processing.
*/
int64_t mProcessingGraphUpdateIndex;
/**
* Number of active MediaInputPorts
*/
int32_t mPortCount;
+ // True if the graph needs another iteration after the current iteration.
+ Atomic<bool> mNeedAnotherIteration;
+ // GraphDriver may need a WakeUp() if something changes
+ Atomic<bool> mGraphDriverAsleep;
+
// mMonitor guards the data below.
// MediaStreamGraph normally does its work without holding mMonitor, so it is
// not safe to just grab mMonitor from some thread and start monkeying with
// the graph. Instead, communicate with the graph thread using provided
// mechanisms such as the ControlMessage queue.
Monitor mMonitor;
// Data guarded by mMonitor (must always be accessed with mMonitor held,
- // regardless of the value of mLifecycleState.
+ // regardless of the value of mLifecycleState).
/**
* State to copy to main thread
*/
nsTArray<StreamUpdate> mStreamUpdates;
/**
* Runnables to run after the next update to main thread state.
*/
@@ -588,20 +598,16 @@ public:
/**
* Sample rate at which this graph runs. For real time graphs, this is
* the rate of the audio mixer. For offline graphs, this is the rate specified
* at construction.
*/
TrackRate mSampleRate;
/**
- * True when another iteration of the control loop is required.
- */
- bool mNeedAnotherIteration;
- /**
* True when we need to do a forced shutdown during application shutdown.
*/
bool mForceShutDown;
/**
* True when we have posted an event to the main thread to run
* RunInStableState() and the event hasn't run yet.
*/
bool mPostedRunInStableStateEvent;