Bug 521929, part 2: Save racy RPC replies onto a special stack until they're the reply to the right out-call. r=bent
authorChris Jones <jones.chris.g@gmail.com>
Thu, 21 Jan 2010 20:04:10 -0600
changeset 37474 925601df843fdb59381d9c9a5e26961b8ae855c1
parent 37473 ca51ffe72682739dbd2bac31bf35112747fc8141
child 37475 c1e297cb449ee147e27d078d6e0b067760782ac9
push idunknown
push userunknown
push dateunknown
reviewersbent
bugs521929
milestone1.9.3a1pre
Bug 521929, part 2: Save racy RPC replies onto a special stack until they're the reply to the right out-call. r=bent
ipc/glue/RPCChannel.cpp
ipc/glue/RPCChannel.h
--- a/ipc/glue/RPCChannel.cpp
+++ b/ipc/glue/RPCChannel.cpp
@@ -62,16 +62,17 @@ struct RunnableMethodTraits<mozilla::ipc
 namespace mozilla {
 namespace ipc {
 
 RPCChannel::RPCChannel(RPCListener* aListener,
                        RacyRPCPolicy aPolicy)
   : SyncChannel(aListener),
     mPending(),
     mStack(),
+    mOutOfTurnReplies(),
     mDeferred(),
     mRemoteStackDepthGuess(0),
     mRacePolicy(aPolicy)
 {
     MOZ_COUNT_CTOR(RPCChannel);
 }
 
 RPCChannel::~RPCChannel()
@@ -107,27 +108,37 @@ RPCChannel::Call(Message* msg, Message* 
 
     while (1) {
         // now might be the time to process a message deferred because
         // of race resolution
         MaybeProcessDeferredIncall();
 
         // here we're waiting for something to happen. see long
         // comment about the queue in RPCChannel.h
-        while (Connected() && mPending.empty()) {
+        while (Connected() && mPending.empty() &&
+               (mOutOfTurnReplies.empty() ||
+                mOutOfTurnReplies.top().seqno() < mStack.top().seqno())) {
             WaitForNotify();
         }
 
         if (!Connected()) {
             ReportConnectionError("RPCChannel");
             return false;
         }
 
-        Message recvd = mPending.front();
-        mPending.pop();
+        Message recvd;
+        if (!mOutOfTurnReplies.empty() &&
+            mOutOfTurnReplies.top().seqno() == mStack.top().seqno()) {
+            recvd = mOutOfTurnReplies.top();
+            mOutOfTurnReplies.pop();
+        }
+        else {
+            recvd = mPending.front();
+            mPending.pop();
+        }
 
         if (!recvd.is_sync() && !recvd.is_rpc()) {
             MutexAutoUnlock unlock(mMutex);
             AsyncChannel::OnDispatchMessage(recvd);
             continue;
         }
 
         if (recvd.is_sync()) {
@@ -140,40 +151,52 @@ RPCChannel::Call(Message* msg, Message* 
 
         RPC_ASSERT(recvd.is_rpc(), "wtf???");
 
         if (recvd.is_reply()) {
             RPC_ASSERT(0 < mStack.size(), "invalid RPC stack");
 
             const Message& outcall = mStack.top();
 
+            if (recvd.seqno() < outcall.seqno()) {
+                mOutOfTurnReplies.push(recvd);
+                continue;
+            }
+
             // FIXME/cjones: handle error
             RPC_ASSERT(
                 recvd.is_reply_error() ||
                 (recvd.type() == (outcall.type()+1) &&
                  recvd.seqno() == outcall.seqno()),
                 "somebody's misbehavin'", "rpc", true);
 
             // we received a reply to our most recent outstanding
             // call.  pop this frame and return the reply
             mStack.pop();
 
             bool isError = recvd.is_reply_error();
             if (!isError) {
                 *reply = recvd;
             }
 
-            if (0 == StackDepth())
+            if (0 == StackDepth()) {
                 // we may have received new messages while waiting for
                 // our reply.  because we were awaiting a reply,
                 // StackDepth > 0, and the IO thread didn't enqueue
                 // OnMaybeDequeueOne() events for us.  so to avoid
                 // "losing" the new messages, we do that now.
                 EnqueuePendingMessages();
 
+                
+                RPC_ASSERT(
+                    mOutOfTurnReplies.empty(),
+                    "still have pending replies with no pending out-calls",
+                    "rpc", true);
+            }
+
             // finished with this RPC stack frame
             return !isError;
         }
 
         // in-call.  process in a new stack frame.
 
         // "snapshot" the current stack depth while we own the Mutex
         size_t stackDepth = StackDepth();
@@ -380,16 +403,18 @@ RPCChannel::DebugAbort(const char* file,
             type, reply ? "reply" : "");
     // technically we need the mutex for this, but we're dying anyway
     fprintf(stderr, "  local RPC stack size: %lu\n",
             mStack.size());
     fprintf(stderr, "  remote RPC stack guess: %lu\n",
             mRemoteStackDepthGuess);
     fprintf(stderr, "  deferred stack size: %lu\n",
             mDeferred.size());
+    fprintf(stderr, "  out-of-turn RPC replies stack size: %lu\n",
+            mOutOfTurnReplies.size());
     fprintf(stderr, "  Pending queue size: %lu, front to back:\n",
             mPending.size());
     while (!mPending.empty()) {
         fprintf(stderr, "    [ %s%s ]\n",
                 mPending.front().is_rpc() ? "rpc" :
                 (mPending.front().is_sync() ? "sync" : "async"),
                 mPending.front().is_reply() ? "reply" : "");
         mPending.pop();
--- a/ipc/glue/RPCChannel.h
+++ b/ipc/glue/RPCChannel.h
@@ -150,16 +150,23 @@ private:
 
     // 
     // Stack of all the RPC out-calls on which this RPCChannel is
     // awaiting a response.
     //
     std::stack<Message> mStack;
 
     //
+    // Stack of replies received "out of turn", because of RPC
+    // in-calls racing with replies to outstanding in-calls.  See
+    // https://bugzilla.mozilla.org/show_bug.cgi?id=521929.
+    //
+    std::stack<Message> mOutOfTurnReplies;
+
+    //
     // Stack of RPC in-calls that were deferred because of race
     // conditions.
     //
     std::stack<Message> mDeferred;
 
     //
     // This is what we think the RPC stack depth is on the "other
     // side" of this RPC channel.  We maintain this variable so that