Bug 540004, part 2: Detect hangs while awaiting synchronous IPC replies (on POSIX). r=bent
--- a/ipc/glue/AsyncChannel.cpp
+++ b/ipc/glue/AsyncChannel.cpp
@@ -149,20 +149,23 @@ AsyncChannel::Open(Transport* aTransport
}
return true;
}
void
AsyncChannel::Close()
{
+ AssertWorkerThread();
+
{
MutexAutoLock lock(mMutex);
- if (ChannelError == mChannelState) {
+ if (ChannelError == mChannelState ||
+ ChannelTimeout == mChannelState) {
// See bug 538586: if the listener gets deleted while the
// IO thread's NotifyChannelError event is still enqueued
// and subsequently deletes us, then the error event will
// also be deleted and the listener will never be notified
// of the channel error.
if (mListener) {
NotifyMaybeChannelError();
}
@@ -174,33 +177,35 @@ AsyncChannel::Close()
// to relax
NS_RUNTIMEABORT("Close() called on closed channel!");
AssertWorkerThread();
// notify the other side that we're about to close our socket
SendSpecialMessage(new GoodbyeMessage());
- mChannelState = ChannelClosing;
-
- // and post the task will do the actual close
- mIOLoop->PostTask(
- FROM_HERE, NewRunnableMethod(this, &AsyncChannel::OnCloseChannel));
-
- while (ChannelClosing == mChannelState)
- mCvar.Wait();
-
- // TODO sort out Close() on this side racing with Close() on the
- // other side
- mChannelState = ChannelClosed;
+ SynchronouslyClose();
}
return NotifyChannelClosed();
}
+void
+AsyncChannel::SynchronouslyClose()
+{
+ AssertWorkerThread();
+ mMutex.AssertCurrentThreadOwns();
+
+ mIOLoop->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &AsyncChannel::OnCloseChannel));
+
+ while (ChannelClosed != mChannelState)
+ mCvar.Wait();
+}
+
bool
AsyncChannel::Send(Message* msg)
{
AssertWorkerThread();
mMutex.AssertNotCurrentThreadOwns();
NS_ABORT_IF_FALSE(MSG_ROUTING_NONE != msg->routing_id(), "need a route");
{
@@ -364,16 +369,18 @@ AsyncChannel::ReportConnectionError(cons
const char* errorMsg;
switch (mChannelState) {
case ChannelClosed:
errorMsg = "Closed channel: cannot send/recv";
break;
case ChannelOpening:
errorMsg = "Opening channel: not yet ready for send/recv";
break;
+ case ChannelTimeout:
+ errorMsg = "Channel timeout: cannot send/recv";
case ChannelError:
errorMsg = "Channel error: cannot send/recv";
break;
default:
NOTREACHED();
}
--- a/ipc/glue/AsyncChannel.h
+++ b/ipc/glue/AsyncChannel.h
@@ -70,16 +70,17 @@ class AsyncChannel : public IPC::Channel
protected:
typedef mozilla::CondVar CondVar;
typedef mozilla::Mutex Mutex;
enum ChannelState {
ChannelClosed,
ChannelOpening,
ChannelConnected,
+ ChannelTimeout,
ChannelClosing,
ChannelError
};
public:
typedef IPC::Channel Transport;
typedef IPC::Message Message;
@@ -141,16 +142,19 @@ protected:
return ChannelConnected == mChannelState;
}
// Run on the worker thread
void OnDispatchMessage(const Message& aMsg);
virtual bool OnSpecialMessage(uint16 id, const Message& msg);
void SendSpecialMessage(Message* msg);
+ // Tell the IO thread to close the channel and wait for it to ACK.
+ void SynchronouslyClose();
+
bool MaybeHandleError(Result code, const char* channelName);
void ReportConnectionError(const char* channelName);
void PrintErrorMessage(const char* channelName, const char* msg)
{
fprintf(stderr, "\n###!!! [%s][%s] Error: %s\n\n",
mChild ? "Child" : "Parent", channelName, msg);
}
--- a/ipc/glue/RPCChannel.cpp
+++ b/ipc/glue/RPCChannel.cpp
@@ -110,16 +110,30 @@ RPCChannel::~RPCChannel()
}
#ifdef OS_WIN
// static
int RPCChannel::sInnerEventLoopDepth = 0;
#endif
bool
+RPCChannel::EventOccurred()
+{
+ AssertWorkerThread();
+ mMutex.AssertCurrentThreadOwns();
+ RPC_ASSERT(StackDepth() > 0, "not in wait loop");
+
+ return (!Connected() ||
+ !mPending.empty() ||
+ (!mOutOfTurnReplies.empty() &&
+ mOutOfTurnReplies.find(mStack.top().seqno())
+ != mOutOfTurnReplies.end()));
+}
+
+bool
RPCChannel::Call(Message* msg, Message* reply)
{
AssertWorkerThread();
mMutex.AssertNotCurrentThreadOwns();
RPC_ASSERT(!ProcessingSyncMessage(),
"violation of sync handler invariant");
RPC_ASSERT(msg->is_rpc(), "can only Call() RPC messages here");
@@ -141,21 +155,27 @@ RPCChannel::Call(Message* msg, Message*
while (1) {
// now might be the time to process a message deferred because
// of race resolution
MaybeProcessDeferredIncall();
// here we're waiting for something to happen. see long
// comment about the queue in RPCChannel.h
- while (Connected() && mPending.empty() &&
- (mOutOfTurnReplies.empty() ||
- mOutOfTurnReplies.find(mStack.top().seqno())
- == mOutOfTurnReplies.end())) {
- RPCChannel::WaitForNotify();
+ while (!EventOccurred()) {
+ bool maybeTimedOut = !RPCChannel::WaitForNotify();
+
+ if (EventOccurred())
+ break;
+
+ // an event didn't occur. So we better have timed out!
+ NS_ABORT_IF_FALSE(maybeTimedOut,
+ "neither received a reply nor detected a hang!");
+ if (!ShouldContinueFromTimeout())
+ return false;
}
if (!Connected()) {
ReportConnectionError("RPCChannel");
return false;
}
Message recvd;
@@ -584,17 +604,17 @@ RPCChannel::OnMessageReceived(const Mess
mPending.push(msg);
if (0 == StackDepth() && !mBlockedOnParent)
// the worker thread might be idle, make sure it wakes up
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &RPCChannel::OnMaybeDequeueOne));
- else
+ else if (!AwaitingSyncReply())
NotifyWorkerThread();
}
void
RPCChannel::OnChannelError()
{
AssertIOThread();
--- a/ipc/glue/RPCChannel.h
+++ b/ipc/glue/RPCChannel.h
@@ -133,16 +133,18 @@ protected:
}
static int sInnerEventLoopDepth;
#endif
private:
// Called on worker thread only
+ bool EventOccurred();
+
void MaybeProcessDeferredIncall();
void EnqueuePendingMessages();
void OnMaybeDequeueOne();
void Incall(const Message& call, size_t stackDepth);
void DispatchIncall(const Message& call);
void BlockOnParent();
--- a/ipc/glue/SyncChannel.cpp
+++ b/ipc/glue/SyncChannel.cpp
@@ -49,35 +49,47 @@ struct RunnableMethodTraits<mozilla::ipc
{
static void RetainCallee(mozilla::ipc::SyncChannel* obj) { }
static void ReleaseCallee(mozilla::ipc::SyncChannel* obj) { }
};
namespace mozilla {
namespace ipc {
+const int32 SyncChannel::kNoTimeout = PR_INT32_MIN;
+
SyncChannel::SyncChannel(SyncListener* aListener)
: AsyncChannel(aListener),
mPendingReply(0),
mProcessingSyncMessage(false),
- mNextSeqno(0)
+ mNextSeqno(0),
+ mTimeoutMs(kNoTimeout)
{
MOZ_COUNT_CTOR(SyncChannel);
}
SyncChannel::~SyncChannel()
{
MOZ_COUNT_DTOR(SyncChannel);
- // FIXME/cjones: impl
}
// static
bool SyncChannel::sIsPumpingMessages = false;
bool
+SyncChannel::EventOccurred()
+{
+ AssertWorkerThread();
+ mMutex.AssertCurrentThreadOwns();
+ NS_ABORT_IF_FALSE(AwaitingSyncReply(), "not in wait loop");
+
+ return (!Connected() || 0 != mRecvd.type());
+}
+
+bool
SyncChannel::Send(Message* msg, Message* reply)
{
AssertWorkerThread();
mMutex.AssertNotCurrentThreadOwns();
NS_ABORT_IF_FALSE(!ProcessingSyncMessage(),
"violation of sync handler invariant");
NS_ABORT_IF_FALSE(msg->is_sync(), "can only Send() sync messages here");
@@ -91,26 +103,28 @@ SyncChannel::Send(Message* msg, Message*
}
mPendingReply = msg->type() + 1;
int32 msgSeqno = msg->seqno();
mIOLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncChannel::OnSend, msg));
- // NB: this is a do-while loop instead of a single wait because if
- // there's a pending RPC out- or in-call below us, and the sync
- // message handler on the other side sends us an async message,
- // the IO thread will Notify() this thread of the async message.
- // See https://bugzilla.mozilla.org/show_bug.cgi?id=538239.
- do {
- // wait for the next sync message to arrive
- SyncChannel::WaitForNotify();
- } while(Connected() &&
- mPendingReply != mRecvd.type() && !mRecvd.is_reply_error());
+ while (1) {
+ bool maybeTimedOut = !SyncChannel::WaitForNotify();
+
+ if (EventOccurred())
+ break;
+
+ // an event didn't occur. So we better have timed out!
+ NS_ABORT_IF_FALSE(maybeTimedOut,
+ "neither received a reply nor detected a hang!");
+ if (!ShouldContinueFromTimeout())
+ return false;
+ }
if (!Connected()) {
ReportConnectionError("SyncChannel");
return false;
}
// we just received a synchronous message from the other side.
// If it's not the reply we were awaiting, there's a serious
@@ -201,25 +215,74 @@ SyncChannel::OnChannelError()
if (AwaitingSyncReply())
NotifyWorkerThread();
}
//
// Synchronization between worker and IO threads
//
+namespace {
+
+bool
+IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout)
+{
+ return (aTimeout != PR_INTERVAL_NO_TIMEOUT) &&
+ (aTimeout <= (PR_IntervalNow() - aStart));
+}
+
+} // namespace <anon>
+
+bool
+SyncChannel::ShouldContinueFromTimeout()
+{
+ AssertWorkerThread();
+ mMutex.AssertCurrentThreadOwns();
+
+ bool cont = true;
+
+ if (!cont) {
+ // NB: there's a sublety here. If parents were allowed to
+ // send sync messages to children, then it would be possible
+ // for this synchronous close-on-timeout to race with async
+ // |OnMessageReceived| tasks arriving from the child, posted
+ // to the worker thread's event loop. This would complicate
+ // cleanup of the *Channel. But since IPDL forbids this (and
+ // since it doesn't support children timing out on parents),
+ // the parent can only block on RPC messages to the child, and
+ // in that case arriving async messages are enqueued to the
+ // RPC channel's special queue. They're then ignored because
+ // the channel state changes to ChannelTimeout
+ // (i.e. !Connected).
+ SynchronouslyClose();
+ mChannelState = ChannelTimeout;
+ }
+
+ return cont;
+}
+
// Windows versions of the following two functions live in
// WindowsMessageLoop.cpp.
#ifndef OS_WIN
-void
+bool
SyncChannel::WaitForNotify()
{
- mCvar.Wait();
+ PRIntervalTime timeout = (kNoTimeout == mTimeoutMs) ?
+ PR_INTERVAL_NO_TIMEOUT :
+ PR_MillisecondsToInterval(mTimeoutMs);
+ // XXX could optimize away this syscall for "no timeout" case if desired
+ PRIntervalTime waitStart = PR_IntervalNow();
+
+ mCvar.Wait(timeout);
+
+ // if the timeout didn't expire, we know we received an event.
+ // The converse is not true.
+ return !IsTimeoutExpired(waitStart, timeout);
}
void
SyncChannel::NotifyWorkerThread()
{
mCvar.Notify();
}
--- a/ipc/glue/SyncChannel.h
+++ b/ipc/glue/SyncChannel.h
@@ -35,28 +35,34 @@
* the provisions above, a recipient may use your version of this file under
* the terms of any one of the MPL, the GPL or the LGPL.
*
* ***** END LICENSE BLOCK ***** */
#ifndef ipc_glue_SyncChannel_h
#define ipc_glue_SyncChannel_h 1
+#include "base/basictypes.h"
+
+#include "prinrval.h"
+
#include "mozilla/ipc/AsyncChannel.h"
namespace mozilla {
namespace ipc {
//-----------------------------------------------------------------------------
class SyncChannel : public AsyncChannel
{
protected:
typedef uint16 MessageId;
public:
+ static const int32 kNoTimeout;
+
class /*NS_INTERFACE_CLASS*/ SyncListener :
public AsyncChannel::AsyncListener
{
public:
virtual ~SyncListener() { }
virtual void OnChannelClose() = 0;
virtual void OnChannelError() = 0;
@@ -96,17 +102,33 @@ protected:
NS_OVERRIDE
bool OnSpecialMessage(uint16 id, const Message& msg)
{
// SyncChannel doesn't care about any special messages yet
return AsyncChannel::OnSpecialMessage(id, msg);
}
- void WaitForNotify();
+ //
+ // Return true if the wait ended because a notification was
+ // received. That is, true => event received.
+ //
+ // Return false if the time elapsed from when we started the
+ // process of waiting until afterwards exceeded the currently
+ // allotted timeout. That *DOES NOT* mean false => "no event" (==
+ // timeout); there are many circumstances that could cause the
+ // measured elapsed time to exceed the timeout EVEN WHEN we were
+ // notified.
+ //
+ // So in sum: true is a meaningful return value; false isn't,
+ // necessarily.
+ //
+ bool WaitForNotify();
+
+ bool ShouldContinueFromTimeout();
// Executed on the IO thread.
void OnSendReply(Message* msg);
void NotifyWorkerThread();
// On both
bool AwaitingSyncReply() {
mMutex.AssertCurrentThreadOwns();
@@ -121,14 +143,19 @@ protected:
MessageId mPendingReply;
bool mProcessingSyncMessage;
Message mRecvd;
// This is only accessed from the worker thread; seqno's are
// completely opaque to the IO thread.
int32 mNextSeqno;
static bool sIsPumpingMessages;
+
+private:
+ bool EventOccurred();
+
+ int32 mTimeoutMs;
};
} // namespace ipc
} // namespace mozilla
#endif // ifndef ipc_glue_SyncChannel_h
--- a/ipc/glue/WindowsMessageLoop.cpp
+++ b/ipc/glue/WindowsMessageLoop.cpp
@@ -607,17 +607,17 @@ RPCChannel::IsMessagePending()
if (!mPending.empty() &&
mPending.front().seqno() == mStack.top().seqno())
return true;
return false;
}
-void
+bool
SyncChannel::WaitForNotify()
{
mMutex.AssertCurrentThreadOwns();
MutexAutoUnlock unlock(mMutex);
// Initialize global objects used in deferred messaging.
Init();
@@ -856,16 +856,18 @@ RPCChannel::WaitForNotify()
void
SyncChannel::NotifyWorkerThread()
{
mMutex.AssertCurrentThreadOwns();
NS_ASSERTION(gUIThreadId, "This should have been set already!");
if (!PostThreadMessage(gUIThreadId, gEventLoopMessage, 0, 0)) {
NS_WARNING("Failed to post thread message!");
}
+
+ return true;
}
void
DeferredSendMessage::Run()
{
AssertWindowIsNotNeutered(hWnd);
if (!IsWindow(hWnd)) {
NS_ERROR("Invalid window!");