Bug 540004, part 2: Detect hangs while awaiting synchronous IPC replies (on POSIX). r=bent
authorChris Jones <jones.chris.g@gmail.com>
Tue, 09 Feb 2010 18:02:54 -0600
changeset 46647 b85532205c1b0685f423dacc77c7cbd1f29d161a
parent 46646 d0e79b58e9b82ea1d6ccc934b6010f39b2f0ccac
child 46648 6d5e409f594aef499fe525738e1aafbe51017661
push id1
push userroot
push dateMon, 20 Oct 2014 17:29:22 +0000
reviewersbent
bugs540004
milestone1.9.3a2pre
Bug 540004, part 2: Detect hangs while awaiting synchronous IPC replies (on POSIX). r=bent
ipc/glue/AsyncChannel.cpp
ipc/glue/AsyncChannel.h
ipc/glue/RPCChannel.cpp
ipc/glue/RPCChannel.h
ipc/glue/SyncChannel.cpp
ipc/glue/SyncChannel.h
ipc/glue/WindowsMessageLoop.cpp
--- a/ipc/glue/AsyncChannel.cpp
+++ b/ipc/glue/AsyncChannel.cpp
@@ -149,20 +149,23 @@ AsyncChannel::Open(Transport* aTransport
     }
 
     return true;
 }
 
 void
 AsyncChannel::Close()
 {
+    AssertWorkerThread();
+
     {
         MutexAutoLock lock(mMutex);
 
-        if (ChannelError == mChannelState) {
+        if (ChannelError == mChannelState ||
+            ChannelTimeout == mChannelState) {
             // See bug 538586: if the listener gets deleted while the
             // IO thread's NotifyChannelError event is still enqueued
             // and subsequently deletes us, then the error event will
             // also be deleted and the listener will never be notified
             // of the channel error.
             if (mListener) {
                 NotifyMaybeChannelError();
             }
@@ -174,33 +177,35 @@ AsyncChannel::Close()
             // to relax
             NS_RUNTIMEABORT("Close() called on closed channel!");
 
         AssertWorkerThread();
 
         // notify the other side that we're about to close our socket
         SendSpecialMessage(new GoodbyeMessage());
 
-        mChannelState = ChannelClosing;
-
-        // and post the task will do the actual close
-        mIOLoop->PostTask(
-            FROM_HERE, NewRunnableMethod(this, &AsyncChannel::OnCloseChannel));
-
-        while (ChannelClosing == mChannelState)
-            mCvar.Wait();
-
-        // TODO sort out Close() on this side racing with Close() on the
-        // other side
-        mChannelState = ChannelClosed;
+        SynchronouslyClose();
     }
 
     return NotifyChannelClosed();
 }
 
+void 
+AsyncChannel::SynchronouslyClose()
+{
+    AssertWorkerThread();
+    mMutex.AssertCurrentThreadOwns();
+
+    mIOLoop->PostTask(
+        FROM_HERE, NewRunnableMethod(this, &AsyncChannel::OnCloseChannel));
+
+    while (ChannelClosed != mChannelState)
+        mCvar.Wait();
+}
+
 bool
 AsyncChannel::Send(Message* msg)
 {
     AssertWorkerThread();
     mMutex.AssertNotCurrentThreadOwns();
     NS_ABORT_IF_FALSE(MSG_ROUTING_NONE != msg->routing_id(), "need a route");
 
     {
@@ -364,16 +369,18 @@ AsyncChannel::ReportConnectionError(cons
     const char* errorMsg;
     switch (mChannelState) {
     case ChannelClosed:
         errorMsg = "Closed channel: cannot send/recv";
         break;
     case ChannelOpening:
         errorMsg = "Opening channel: not yet ready for send/recv";
         break;
+    case ChannelTimeout:
+        errorMsg = "Channel timeout: cannot send/recv";
     case ChannelError:
         errorMsg = "Channel error: cannot send/recv";
         break;
 
     default:
         NOTREACHED();
     }
 
--- a/ipc/glue/AsyncChannel.h
+++ b/ipc/glue/AsyncChannel.h
@@ -70,16 +70,17 @@ class AsyncChannel : public IPC::Channel
 protected:
     typedef mozilla::CondVar CondVar;
     typedef mozilla::Mutex Mutex;
 
     enum ChannelState {
         ChannelClosed,
         ChannelOpening,
         ChannelConnected,
+        ChannelTimeout,
         ChannelClosing,
         ChannelError
     };
 
 public:
     typedef IPC::Channel Transport;
     typedef IPC::Message Message;
 
@@ -141,16 +142,19 @@ protected:
         return ChannelConnected == mChannelState;
     }
 
     // Run on the worker thread
     void OnDispatchMessage(const Message& aMsg);
     virtual bool OnSpecialMessage(uint16 id, const Message& msg);
     void SendSpecialMessage(Message* msg);
 
+    // Tell the IO thread to close the channel and wait for it to ACK.
+    void SynchronouslyClose();
+
     bool MaybeHandleError(Result code, const char* channelName);
     void ReportConnectionError(const char* channelName);
 
     void PrintErrorMessage(const char* channelName, const char* msg)
     {
         fprintf(stderr, "\n###!!! [%s][%s] Error: %s\n\n",
                 mChild ? "Child" : "Parent", channelName, msg);
     }
--- a/ipc/glue/RPCChannel.cpp
+++ b/ipc/glue/RPCChannel.cpp
@@ -110,16 +110,30 @@ RPCChannel::~RPCChannel()
 }
 
 #ifdef OS_WIN
 // static
 int RPCChannel::sInnerEventLoopDepth = 0;
 #endif
 
 bool
+RPCChannel::EventOccurred()
+{
+    AssertWorkerThread();
+    mMutex.AssertCurrentThreadOwns();
+    RPC_ASSERT(StackDepth() > 0, "not in wait loop");
+
+    return (!Connected() ||
+            !mPending.empty() ||
+            (!mOutOfTurnReplies.empty() &&
+             mOutOfTurnReplies.find(mStack.top().seqno())
+             != mOutOfTurnReplies.end()));
+}
+
+bool
 RPCChannel::Call(Message* msg, Message* reply)
 {
     AssertWorkerThread();
     mMutex.AssertNotCurrentThreadOwns();
     RPC_ASSERT(!ProcessingSyncMessage(),
                "violation of sync handler invariant");
     RPC_ASSERT(msg->is_rpc(), "can only Call() RPC messages here");
 
@@ -141,21 +155,27 @@ RPCChannel::Call(Message* msg, Message* 
 
     while (1) {
         // now might be the time to process a message deferred because
         // of race resolution
         MaybeProcessDeferredIncall();
 
         // here we're waiting for something to happen. see long
         // comment about the queue in RPCChannel.h
-        while (Connected() && mPending.empty() &&
-               (mOutOfTurnReplies.empty() ||
-                mOutOfTurnReplies.find(mStack.top().seqno())
-                == mOutOfTurnReplies.end())) {
-            RPCChannel::WaitForNotify();
+        while (!EventOccurred()) {
+            bool maybeTimedOut = !RPCChannel::WaitForNotify();
+
+            if (EventOccurred())
+                break;
+
+            // an event didn't occur. So we better have timed out!
+            NS_ABORT_IF_FALSE(maybeTimedOut,
+                              "neither received a reply nor detected a hang!");
+            if (!ShouldContinueFromTimeout())
+                return false;
         }
 
         if (!Connected()) {
             ReportConnectionError("RPCChannel");
             return false;
         }
 
         Message recvd;
@@ -584,17 +604,17 @@ RPCChannel::OnMessageReceived(const Mess
 
     mPending.push(msg);
 
     if (0 == StackDepth() && !mBlockedOnParent)
         // the worker thread might be idle, make sure it wakes up
         mWorkerLoop->PostTask(
             FROM_HERE,
             NewRunnableMethod(this, &RPCChannel::OnMaybeDequeueOne));
-    else
+    else if (!AwaitingSyncReply())
         NotifyWorkerThread();
 }
 
 
 void
 RPCChannel::OnChannelError()
 {
     AssertIOThread();
--- a/ipc/glue/RPCChannel.h
+++ b/ipc/glue/RPCChannel.h
@@ -133,16 +133,18 @@ protected:
     }
 
     static int sInnerEventLoopDepth;
 #endif
 
   private:
     // Called on worker thread only
 
+    bool EventOccurred();
+
     void MaybeProcessDeferredIncall();
     void EnqueuePendingMessages();
 
     void OnMaybeDequeueOne();
     void Incall(const Message& call, size_t stackDepth);
     void DispatchIncall(const Message& call);
 
     void BlockOnParent();
--- a/ipc/glue/SyncChannel.cpp
+++ b/ipc/glue/SyncChannel.cpp
@@ -49,35 +49,47 @@ struct RunnableMethodTraits<mozilla::ipc
 {
     static void RetainCallee(mozilla::ipc::SyncChannel* obj) { }
     static void ReleaseCallee(mozilla::ipc::SyncChannel* obj) { }
 };
 
 namespace mozilla {
 namespace ipc {
 
+const int32 SyncChannel::kNoTimeout = PR_INT32_MIN;
+
 SyncChannel::SyncChannel(SyncListener* aListener)
   : AsyncChannel(aListener),
     mPendingReply(0),
     mProcessingSyncMessage(false),
-    mNextSeqno(0)
+    mNextSeqno(0),
+    mTimeoutMs(kNoTimeout)
 {
   MOZ_COUNT_CTOR(SyncChannel);
 }
 
 SyncChannel::~SyncChannel()
 {
     MOZ_COUNT_DTOR(SyncChannel);
-    // FIXME/cjones: impl
 }
 
 // static
 bool SyncChannel::sIsPumpingMessages = false;
 
 bool
+SyncChannel::EventOccurred()
+{
+    AssertWorkerThread();
+    mMutex.AssertCurrentThreadOwns();
+    NS_ABORT_IF_FALSE(AwaitingSyncReply(), "not in wait loop");
+
+    return (!Connected() || 0 != mRecvd.type());
+}
+
+bool
 SyncChannel::Send(Message* msg, Message* reply)
 {
     AssertWorkerThread();
     mMutex.AssertNotCurrentThreadOwns();
     NS_ABORT_IF_FALSE(!ProcessingSyncMessage(),
                       "violation of sync handler invariant");
     NS_ABORT_IF_FALSE(msg->is_sync(), "can only Send() sync messages here");
 
@@ -91,26 +103,28 @@ SyncChannel::Send(Message* msg, Message*
     }
 
     mPendingReply = msg->type() + 1;
     int32 msgSeqno = msg->seqno();
     mIOLoop->PostTask(
         FROM_HERE,
         NewRunnableMethod(this, &SyncChannel::OnSend, msg));
 
-    // NB: this is a do-while loop instead of a single wait because if
-    // there's a pending RPC out- or in-call below us, and the sync
-    // message handler on the other side sends us an async message,
-    // the IO thread will Notify() this thread of the async message.
-    // See https://bugzilla.mozilla.org/show_bug.cgi?id=538239.
-    do {
-        // wait for the next sync message to arrive
-        SyncChannel::WaitForNotify();
-    } while(Connected() &&
-            mPendingReply != mRecvd.type() && !mRecvd.is_reply_error());
+    while (1) {
+        bool maybeTimedOut = !SyncChannel::WaitForNotify();
+
+        if (EventOccurred())
+            break;
+
+        // an event didn't occur. So we better have timed out!
+        NS_ABORT_IF_FALSE(maybeTimedOut,
+                          "neither received a reply nor detected a hang!");
+        if (!ShouldContinueFromTimeout())
+            return false;
+    }
 
     if (!Connected()) {
         ReportConnectionError("SyncChannel");
         return false;
     }
 
     // we just received a synchronous message from the other side.
     // If it's not the reply we were awaiting, there's a serious
@@ -201,25 +215,74 @@ SyncChannel::OnChannelError()
     if (AwaitingSyncReply())
         NotifyWorkerThread();
 }
 
 //
 // Synchronization between worker and IO threads
 //
 
+namespace {
+
+bool
+IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout)
+{
+    return (aTimeout != PR_INTERVAL_NO_TIMEOUT) &&
+        (aTimeout <= (PR_IntervalNow() - aStart));
+}
+
+} // namespace <anon>
+
+bool
+SyncChannel::ShouldContinueFromTimeout()
+{
+    AssertWorkerThread();
+    mMutex.AssertCurrentThreadOwns();
+
+    bool cont = true;
+
+    if (!cont) {
+        // NB: there's a sublety here.  If parents were allowed to
+        // send sync messages to children, then it would be possible
+        // for this synchronous close-on-timeout to race with async
+        // |OnMessageReceived| tasks arriving from the child, posted
+        // to the worker thread's event loop.  This would complicate
+        // cleanup of the *Channel.  But since IPDL forbids this (and
+        // since it doesn't support children timing out on parents),
+        // the parent can only block on RPC messages to the child, and
+        // in that case arriving async messages are enqueued to the
+        // RPC channel's special queue.  They're then ignored because
+        // the channel state changes to ChannelTimeout
+        // (i.e. !Connected).
+        SynchronouslyClose();
+        mChannelState = ChannelTimeout;
+    }
+        
+    return cont;
+}
+
 // Windows versions of the following two functions live in
 // WindowsMessageLoop.cpp.
 
 #ifndef OS_WIN
 
-void
+bool
 SyncChannel::WaitForNotify()
 {
-    mCvar.Wait();
+    PRIntervalTime timeout = (kNoTimeout == mTimeoutMs) ?
+                             PR_INTERVAL_NO_TIMEOUT :
+                             PR_MillisecondsToInterval(mTimeoutMs);
+    // XXX could optimize away this syscall for "no timeout" case if desired
+    PRIntervalTime waitStart = PR_IntervalNow();
+
+    mCvar.Wait(timeout);
+
+    // if the timeout didn't expire, we know we received an event.
+    // The converse is not true.
+    return !IsTimeoutExpired(waitStart, timeout);
 }
 
 void
 SyncChannel::NotifyWorkerThread()
 {
     mCvar.Notify();
 }
 
--- a/ipc/glue/SyncChannel.h
+++ b/ipc/glue/SyncChannel.h
@@ -35,28 +35,34 @@
  * the provisions above, a recipient may use your version of this file under
  * the terms of any one of the MPL, the GPL or the LGPL.
  *
  * ***** END LICENSE BLOCK ***** */
 
 #ifndef ipc_glue_SyncChannel_h
 #define ipc_glue_SyncChannel_h 1
 
+#include "base/basictypes.h"
+
+#include "prinrval.h"
+
 #include "mozilla/ipc/AsyncChannel.h"
 
 namespace mozilla {
 namespace ipc {
 //-----------------------------------------------------------------------------
 
 class SyncChannel : public AsyncChannel
 {
 protected:
     typedef uint16 MessageId;
 
 public:
+    static const int32 kNoTimeout;
+
     class /*NS_INTERFACE_CLASS*/ SyncListener : 
         public AsyncChannel::AsyncListener
     {
     public:
         virtual ~SyncListener() { }
 
         virtual void OnChannelClose() = 0;
         virtual void OnChannelError() = 0;
@@ -96,17 +102,33 @@ protected:
 
     NS_OVERRIDE
     bool OnSpecialMessage(uint16 id, const Message& msg)
     {
         // SyncChannel doesn't care about any special messages yet
         return AsyncChannel::OnSpecialMessage(id, msg);
     }
 
-    void WaitForNotify();
+    //
+    // Return true if the wait ended because a notification was
+    // received.  That is, true => event received.
+    //
+    // Return false if the time elapsed from when we started the
+    // process of waiting until afterwards exceeded the currently
+    // allotted timeout.  That *DOES NOT* mean false => "no event" (==
+    // timeout); there are many circumstances that could cause the
+    // measured elapsed time to exceed the timeout EVEN WHEN we were
+    // notified.
+    //
+    // So in sum: true is a meaningful return value; false isn't,
+    // necessarily.
+    //
+    bool WaitForNotify();
+
+    bool ShouldContinueFromTimeout();
 
     // Executed on the IO thread.
     void OnSendReply(Message* msg);
     void NotifyWorkerThread();
 
     // On both
     bool AwaitingSyncReply() {
         mMutex.AssertCurrentThreadOwns();
@@ -121,14 +143,19 @@ protected:
     MessageId mPendingReply;
     bool mProcessingSyncMessage;
     Message mRecvd;
     // This is only accessed from the worker thread; seqno's are
     // completely opaque to the IO thread.
     int32 mNextSeqno;
 
     static bool sIsPumpingMessages;
+
+private:
+    bool EventOccurred();
+
+    int32 mTimeoutMs;
 };
 
 
 } // namespace ipc
 } // namespace mozilla
 #endif  // ifndef ipc_glue_SyncChannel_h
--- a/ipc/glue/WindowsMessageLoop.cpp
+++ b/ipc/glue/WindowsMessageLoop.cpp
@@ -607,17 +607,17 @@ RPCChannel::IsMessagePending()
 
   if (!mPending.empty() &&
       mPending.front().seqno() == mStack.top().seqno())
     return true;
 
   return false;
 }
 
-void
+bool
 SyncChannel::WaitForNotify()
 {
   mMutex.AssertCurrentThreadOwns();
 
   MutexAutoUnlock unlock(mMutex);
 
   // Initialize global objects used in deferred messaging.
   Init();
@@ -856,16 +856,18 @@ RPCChannel::WaitForNotify()
 void
 SyncChannel::NotifyWorkerThread()
 {
   mMutex.AssertCurrentThreadOwns();
   NS_ASSERTION(gUIThreadId, "This should have been set already!");
   if (!PostThreadMessage(gUIThreadId, gEventLoopMessage, 0, 0)) {
     NS_WARNING("Failed to post thread message!");
   }
+
+  return true;
 }
 
 void
 DeferredSendMessage::Run()
 {
   AssertWindowIsNotNeutered(hWnd);
   if (!IsWindow(hWnd)) {
     NS_ERROR("Invalid window!");