Backed out changeset 73e04e795ec3 (bug 1310547)
authorSebastian Hengst <archaeopteryx@coole-files.de>
Sat, 29 Oct 2016 13:45:17 +0200
changeset 320137 7e9f99645168e37e0f7298faebeba3e263491bbc
parent 320136 be5f087a2c4e3fcfd64a56921429f2a5b76dc02f
child 320138 d9c679129b669805d737c09c97a4768a8e6f7e6f
push id20749
push userryanvm@gmail.com
push dateSat, 29 Oct 2016 13:21:21 +0000
treeherderfx-team@1b170b39ed6b [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
bugs1310547
milestone52.0a1
backs out73e04e795ec392a4046ad9ee25906ee2f482c26d
Backed out changeset 73e04e795ec3 (bug 1310547)
ipc/glue/MessageChannel.cpp
ipc/glue/MessageChannel.h
--- a/ipc/glue/MessageChannel.cpp
+++ b/ipc/glue/MessageChannel.cpp
@@ -484,32 +484,39 @@ MessageChannel::MessageChannel(MessageLi
     mInTimeoutSecondHalf(false),
     mNextSeqno(0),
     mLastSendError(SyncSendError::SendSuccess),
     mDispatchingAsyncMessage(false),
     mDispatchingAsyncMessageNestedLevel(0),
     mTransactionStack(nullptr),
     mTimedOutMessageSeqno(0),
     mTimedOutMessageNestedLevel(0),
-    mRemoteStackDepthGuess(0),
+#if defined(MOZ_CRASHREPORTER) && defined(OS_WIN)
+    mPending(AnnotateAllocator<Message>(*this)),
+#endif
+    mRemoteStackDepthGuess(false),
     mSawInterruptOutMsg(false),
     mIsWaitingForIncoming(false),
     mAbortOnError(false),
     mNotifiedChannelDone(false),
     mFlags(REQUIRE_DEFAULT),
     mPeerPidSet(false),
     mPeerPid(-1)
 {
     MOZ_COUNT_CTOR(ipc::MessageChannel);
 
 #ifdef OS_WIN
     mTopFrame = nullptr;
     mIsSyncWaitingOnNonMainThread = false;
 #endif
 
+    RefPtr<CancelableRunnable> runnable =
+        NewNonOwningCancelableRunnableMethod(this, &MessageChannel::OnMaybeDequeueOne);
+    mDequeueOneTask = new RefCountedTask(runnable.forget());
+
     mOnChannelConnectedTask =
         NewNonOwningCancelableRunnableMethod(this, &MessageChannel::DispatchOnChannelConnected);
 
 #ifdef OS_WIN
     mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
     MOZ_RELEASE_ASSERT(mEvent, "CreateEvent failed! Nothing is going to work!");
 #endif
 }
@@ -623,33 +630,31 @@ MessageChannel::Clear()
     //
     // In practice, mListener owns the channel, so the channel gets deleted
     // before mListener.  But just to be safe, mListener is a weak pointer.
 
     if (gParentProcessBlocker == this) {
         gParentProcessBlocker = nullptr;
     }
 
+    mDequeueOneTask->Cancel();
+
     mWorkerLoop = nullptr;
     delete mLink;
     mLink = nullptr;
 
     mOnChannelConnectedTask->Cancel();
 
     if (mChannelErrorTask) {
         mChannelErrorTask->Cancel();
         mChannelErrorTask = nullptr;
     }
 
     // Free up any memory used by pending messages.
-    for (RefPtr<MessageTask> task : mPending) {
-        task->Clear();
-    }
     mPending.clear();
-
     mOutOfTurnReplies.clear();
     while (!mDeferred.empty()) {
         mDeferred.pop();
     }
 }
 
 bool
 MessageChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide)
@@ -917,47 +922,49 @@ MessageChannel::OnMessageReceivedFromLin
     }
 
     // Nested messages cannot be compressed.
     MOZ_RELEASE_ASSERT(aMsg.compress_type() == IPC::Message::COMPRESSION_NONE ||
                        aMsg.nested_level() == IPC::Message::NOT_NESTED);
 
     bool compress = false;
     if (aMsg.compress_type() == IPC::Message::COMPRESSION_ENABLED) {
-        compress = (!mPending.isEmpty() &&
-                    mPending.getLast()->Msg().type() == aMsg.type() &&
-                    mPending.getLast()->Msg().routing_id() == aMsg.routing_id());
+        compress = (!mPending.empty() &&
+                    mPending.back().type() == aMsg.type() &&
+                    mPending.back().routing_id() == aMsg.routing_id());
         if (compress) {
             // This message type has compression enabled, and the back of the
             // queue was the same message type and routed to the same destination.
             // Replace it with the newer message.
-            MOZ_RELEASE_ASSERT(mPending.getLast()->Msg().compress_type() ==
-                               IPC::Message::COMPRESSION_ENABLED);
-            mPending.popLast();
+            MOZ_RELEASE_ASSERT(mPending.back().compress_type() ==
+                                  IPC::Message::COMPRESSION_ENABLED);
+            mPending.pop_back();
         }
-    } else if (aMsg.compress_type() == IPC::Message::COMPRESSION_ALL && !mPending.isEmpty()) {
-        for (RefPtr<MessageTask> p = mPending.getLast(); p; p = p->getPrevious()) {
-            if (p->Msg().type() == aMsg.type() &&
-                p->Msg().routing_id() == aMsg.routing_id())
-            {
-                compress = true;
-                MOZ_RELEASE_ASSERT(p->Msg().compress_type() == IPC::Message::COMPRESSION_ALL);
-                p->remove();
-                break;
-            }
+    } else if (aMsg.compress_type() == IPC::Message::COMPRESSION_ALL) {
+        // Check the message queue for another message with this type/destination.
+        auto it = std::find_if(mPending.rbegin(), mPending.rend(),
+                               MatchingKinds(aMsg.type(), aMsg.routing_id()));
+        if (it != mPending.rend()) {
+            // This message type has compression enabled, and the queue holds
+            // a message with the same message type and routed to the same destination.
+            // Erase it.  Note that, since we always compress these redundancies, There Can
+            // Be Only One.
+            compress = true;
+            MOZ_RELEASE_ASSERT((*it).compress_type() == IPC::Message::COMPRESSION_ALL);
+            mPending.erase((++it).base());
         }
     }
 
     bool wakeUpSyncSend = AwaitingSyncReply() && !ShouldDeferMessage(aMsg);
 
     bool shouldWakeUp = AwaitingInterruptReply() ||
                         wakeUpSyncSend ||
                         AwaitingIncomingMessage();
 
-    // Although we usually don't need to post a message task if
+    // Although we usually don't need to post an OnMaybeDequeueOne task if
     // shouldWakeUp is true, it's easier to post anyway than to have to
     // guarantee that every Send call processes everything it's supposed to
     // before returning.
     bool shouldPostTask = !shouldWakeUp || wakeUpSyncSend;
 
     IPC_LOG("Receive on link thread; seqno=%d, xid=%d, shouldWakeUp=%d",
             aMsg.seqno(), aMsg.transaction_id(), shouldWakeUp);
 
@@ -978,83 +985,83 @@ MessageChannel::OnMessageReceivedFromLin
     //
     // (3) We are not waiting on a reply.
     //   - We post a task to the main event loop.
     //
     // Note that, we may notify the main thread even though the monitor is not
     // blocked. This is okay, since we always check for pending events before
     // blocking again.
 
-    RefPtr<MessageTask> task = new MessageTask(this, Move(aMsg));
-    mPending.insertBack(task);
+    mPending.push_back(Move(aMsg));
 
     if (shouldWakeUp) {
         NotifyWorkerThread();
     }
 
     if (shouldPostTask) {
-        task->Post();
+        if (!compress) {
+            // If we compressed away the previous message, we'll re-use
+            // its pending task.
+            RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
+            mWorkerLoop->PostTask(task.forget());
+        }
     }
 }
 
 void
 MessageChannel::PeekMessages(mozilla::function<bool(const Message& aMsg)> aInvoke)
 {
-    // FIXME: We shouldn't be holding the lock for aInvoke!
     MonitorAutoLock lock(*mMonitor);
 
-    for (RefPtr<MessageTask> it : mPending) {
-        const Message &msg = it->Msg();
+    for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); it++) {
+        Message &msg = *it;
         if (!aInvoke(msg)) {
             break;
         }
     }
 }
 
 void
 MessageChannel::ProcessPendingRequests(AutoEnterTransaction& aTransaction)
 {
-    mMonitor->AssertCurrentThreadOwns();
-
     IPC_LOG("ProcessPendingRequests for seqno=%d, xid=%d",
             aTransaction.SequenceNumber(), aTransaction.TransactionID());
 
     // Loop until there aren't any more nested messages to process.
     for (;;) {
         // If we canceled during ProcessPendingRequest, then we need to leave
         // immediately because the results of ShouldDeferMessage will be
         // operating with weird state (as if no Send is in progress). That could
         // cause even NOT_NESTED sync messages to be processed (but not
         // NOT_NESTED async messages), which would break message ordering.
         if (aTransaction.IsCanceled()) {
             return;
         }
 
         mozilla::Vector<Message> toProcess;
 
-        for (RefPtr<MessageTask> p = mPending.getFirst(); p; ) {
-            Message &msg = p->Msg();
+        for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); ) {
+            Message &msg = *it;
 
             MOZ_RELEASE_ASSERT(!aTransaction.IsCanceled(),
                                "Calling ShouldDeferMessage when cancelled");
             bool defer = ShouldDeferMessage(msg);
 
             // Only log the interesting messages.
             if (msg.is_sync() || msg.nested_level() == IPC::Message::NESTED_INSIDE_CPOW) {
                 IPC_LOG("ShouldDeferMessage(seqno=%d) = %d", msg.seqno(), defer);
             }
 
             if (!defer) {
                 if (!toProcess.append(Move(msg)))
                     MOZ_CRASH();
-
-                p = p->removeAndGetNext();
+                it = mPending.erase(it);
                 continue;
             }
-            p = p->getNext();
+            it++;
         }
 
         if (toProcess.empty()) {
             break;
         }
 
         // Processing these messages could result in more messages, so we
         // loop around to check for more afterwards.
@@ -1346,19 +1353,19 @@ MessageChannel::Call(Message* aMsg, Mess
         Message recvd;
         MessageMap::iterator it;
 
         if ((it = mOutOfTurnReplies.find(mInterruptStack.top().seqno()))
             != mOutOfTurnReplies.end())
         {
             recvd = Move(it->second);
             mOutOfTurnReplies.erase(it);
-        } else if (!mPending.isEmpty()) {
-            RefPtr<MessageTask> task = mPending.popFirst();
-            recvd = Move(task->Msg());
+        } else if (!mPending.empty()) {
+            recvd = Move(mPending.front());
+            mPending.pop_front();
         } else {
             // because of subtleties with nested event loops, it's possible
             // that we got here and nothing happened.  or, we might have a
             // deferred in-call that needs to be processed.  either way, we
             // won't break the inner while loop again until something new
             // happens.
             continue;
         }
@@ -1437,48 +1444,47 @@ MessageChannel::Call(Message* aMsg, Mess
 bool
 MessageChannel::WaitForIncomingMessage()
 {
 #ifdef OS_WIN
     SyncStackFrame frame(this, true);
     NeuteredWindowRegion neuteredRgn(mFlags & REQUIRE_DEFERRED_MESSAGE_PROTECTION);
 #endif
 
-    MonitorAutoLock lock(*mMonitor);
-    AutoEnterWaitForIncoming waitingForIncoming(*this);
-    if (mChannelState != ChannelConnected) {
-        return false;
-    }
-    if (!HasPendingEvents()) {
-        return WaitForInterruptNotify();
+    { // Scope for lock
+        MonitorAutoLock lock(*mMonitor);
+        AutoEnterWaitForIncoming waitingForIncoming(*this);
+        if (mChannelState != ChannelConnected) {
+            return false;
+        }
+        if (!HasPendingEvents()) {
+            return WaitForInterruptNotify();
+        }
     }
 
-    MOZ_RELEASE_ASSERT(!mPending.isEmpty());
-    RefPtr<MessageTask> task = mPending.getFirst();
-    RunMessage(*task);
-    return true;
+    return OnMaybeDequeueOne();
 }
 
 bool
 MessageChannel::HasPendingEvents()
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
-    return Connected() && !mPending.isEmpty();
+    return Connected() && !mPending.empty();
 }
 
 bool
 MessageChannel::InterruptEventOccurred()
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
     IPC_ASSERT(InterruptStackDepth() > 0, "not in wait loop");
 
     return (!Connected() ||
-            !mPending.isEmpty() ||
+            !mPending.empty() ||
             (!mOutOfTurnReplies.empty() &&
              mOutOfTurnReplies.find(mInterruptStack.top().seqno()) !=
              mOutOfTurnReplies.end()));
 }
 
 bool
 MessageChannel::ProcessPendingRequest(Message &&aUrgent)
 {
@@ -1492,161 +1498,95 @@ MessageChannel::ProcessPendingRequest(Me
         ReportConnectionError("MessageChannel::ProcessPendingRequest");
         return false;
     }
 
     return true;
 }
 
 bool
-MessageChannel::ShouldRunMessage(const Message& aMsg)
+MessageChannel::DequeueOne(Message *recvd)
 {
-    if (!mTimedOutMessageSeqno) {
-        return true;
+    AssertWorkerThread();
+    mMonitor->AssertCurrentThreadOwns();
+
+    if (!Connected()) {
+        ReportConnectionError("OnMaybeDequeueOne");
+        return false;
     }
 
+    if (!mDeferred.empty())
+        MaybeUndeferIncall();
+
     // If we've timed out a message and we're awaiting the reply to the timed
     // out message, we have to be careful what messages we process. Here's what
     // can go wrong:
     // 1. child sends a NOT_NESTED sync message S
     // 2. parent sends a NESTED_INSIDE_SYNC sync message H at the same time
     // 3. parent times out H
     // 4. child starts processing H and sends a NESTED_INSIDE_SYNC message H' nested
     //    within the same transaction
     // 5. parent dispatches S and sends reply
     // 6. child asserts because it instead expected a reply to H'.
     //
     // To solve this, we refuse to process S in the parent until we get a reply
     // to H. More generally, let the timed out message be M. We don't process a
     // message unless the child would need the response to that message in order
     // to process M. Those messages are the ones that have a higher nested level
     // than M or that are part of the same transaction as M.
-    if (aMsg.nested_level() < mTimedOutMessageNestedLevel ||
-        (aMsg.nested_level() == mTimedOutMessageNestedLevel
-         && aMsg.transaction_id() != mTimedOutMessageSeqno))
-    {
+    if (mTimedOutMessageSeqno) {
+        for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); it++) {
+            Message &msg = *it;
+            if (msg.nested_level() > mTimedOutMessageNestedLevel ||
+                (msg.nested_level() == mTimedOutMessageNestedLevel
+                 && msg.transaction_id() == mTimedOutMessageSeqno))
+            {
+                *recvd = Move(msg);
+                mPending.erase(it);
+                return true;
+            }
+        }
         return false;
     }
 
+    if (mPending.empty())
+        return false;
+
+    *recvd = Move(mPending.front());
+    mPending.pop_front();
+    return true;
+}
+
+bool
+MessageChannel::OnMaybeDequeueOne()
+{
+    AssertWorkerThread();
+    mMonitor->AssertNotCurrentThreadOwns();
+
+    Message recvd;
+
+    MonitorAutoLock lock(*mMonitor);
+    if (!DequeueOne(&recvd))
+        return false;
+
+    if (IsOnCxxStack() && recvd.is_interrupt() && recvd.is_reply()) {
+        // We probably just received a reply in a nested loop for an
+        // Interrupt call sent before entering that loop.
+        mOutOfTurnReplies[recvd.seqno()] = Move(recvd);
+        return false;
+    }
+
+    DispatchMessage(Move(recvd));
+
     return true;
 }
 
 void
-MessageChannel::RunMessage(MessageTask& aTask)
-{
-    AssertWorkerThread();
-    mMonitor->AssertCurrentThreadOwns();
-
-    Message& msg = aTask.Msg();
-
-    if (!Connected()) {
-        ReportConnectionError("OnMaybeDequeueOne");
-        return;
-    }
-
-    // Check that we're going to run the first message that's valid to run.
-#ifdef DEBUG
-    for (RefPtr<MessageTask> task : mPending) {
-        if (task == &aTask) {
-            break;
-        }
-        MOZ_ASSERT(!ShouldRunMessage(task->Msg()));
-    }
-#endif
-
-    if (!mDeferred.empty()) {
-        MaybeUndeferIncall();
-    }
-
-    if (!ShouldRunMessage(msg)) {
-        return;
-    }
-
-    MOZ_RELEASE_ASSERT(aTask.isInList());
-    aTask.remove();
-
-    if (IsOnCxxStack() && msg.is_interrupt() && msg.is_reply()) {
-        // We probably just received a reply in a nested loop for an
-        // Interrupt call sent before entering that loop.
-        mOutOfTurnReplies[msg.seqno()] = Move(msg);
-        return;
-    }
-
-    DispatchMessage(Move(msg));
-}
-
-nsresult
-MessageChannel::MessageTask::Run()
-{
-    if (!mChannel) {
-        return NS_OK;
-    }
-
-    mChannel->AssertWorkerThread();
-    mChannel->mMonitor->AssertNotCurrentThreadOwns();
-
-    MonitorAutoLock lock(*mChannel->mMonitor);
-
-    // In case we choose not to run this message, we may need to be able to Post
-    // it again.
-    mScheduled = false;
-
-    if (!isInList()) {
-        return NS_OK;
-    }
-
-    mChannel->RunMessage(*this);
-    return NS_OK;
-}
-
-nsresult
-MessageChannel::MessageTask::Cancel()
-{
-    if (!mChannel) {
-        return NS_OK;
-    }
-
-    mChannel->AssertWorkerThread();
-    mChannel->mMonitor->AssertNotCurrentThreadOwns();
-
-    MonitorAutoLock lock(*mChannel->mMonitor);
-
-    if (!isInList()) {
-        return NS_OK;
-    }
-    remove();
-
-    return NS_OK;
-}
-
-void
-MessageChannel::MessageTask::Post()
-{
-    MOZ_RELEASE_ASSERT(!mScheduled);
-    MOZ_RELEASE_ASSERT(isInList());
-
-    RefPtr<MessageTask> self = this;
-    mChannel->mWorkerLoop->PostTask(self.forget());
-    mScheduled = true;
-}
-
-void
-MessageChannel::MessageTask::Clear()
-{
-    mChannel->AssertWorkerThread();
-
-    mChannel = nullptr;
-}
-
-void
 MessageChannel::DispatchMessage(Message &&aMsg)
 {
-    AssertWorkerThread();
-    mMonitor->AssertCurrentThreadOwns();
-
     Maybe<AutoNoJSAPI> nojsapi;
     if (ScriptSettingsInitialized() && NS_IsMainThread())
         nojsapi.emplace();
 
     nsAutoPtr<Message> reply;
 
     IPC_LOG("DispatchMessage: seqno=%d, xid=%d", aMsg.seqno(), aMsg.transaction_id());
 
@@ -1835,19 +1775,17 @@ MessageChannel::MaybeUndeferIncall()
     Message call(Move(mDeferred.top()));
     mDeferred.pop();
 
     // fix up fudge factor we added to account for race
     IPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
     --mRemoteStackDepthGuess;
 
     MOZ_RELEASE_ASSERT(call.nested_level() == IPC::Message::NOT_NESTED);
-    RefPtr<MessageTask> task = new MessageTask(this, Move(call));
-    mPending.insertBack(task);
-    task->Post();
+    mPending.push_back(Move(call));
 }
 
 void
 MessageChannel::ExitedCxxStack()
 {
     mListener->OnExitedCxxStack();
     if (mSawInterruptOutMsg) {
         MonitorAutoLock lock(*mMonitor);
@@ -1860,20 +1798,28 @@ MessageChannel::ExitedCxxStack()
 void
 MessageChannel::EnqueuePendingMessages()
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
 
     MaybeUndeferIncall();
 
+    for (size_t i = 0; i < mDeferred.size(); ++i) {
+        RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
+        mWorkerLoop->PostTask(task.forget());
+    }
+
     // XXX performance tuning knob: could process all or k pending
     // messages here, rather than enqueuing for later processing
 
-    RepostAllMessages();
+    for (size_t i = 0; i < mPending.size(); ++i) {
+        RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
+        mWorkerLoop->PostTask(task.forget());
+    }
 }
 
 static inline bool
 IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout)
 {
     return (aTimeout != PR_INTERVAL_NO_TIMEOUT) &&
            (aTimeout <= (PR_IntervalNow() - aStart));
 }
@@ -2318,24 +2264,26 @@ MessageChannel::DebugAbort(const char* f
     // technically we need the mutex for this, but we're dying anyway
     DumpInterruptStack("  ");
     printf_stderr("  remote Interrupt stack guess: %" PRIuSIZE "\n",
                   mRemoteStackDepthGuess);
     printf_stderr("  deferred stack size: %" PRIuSIZE "\n",
                   mDeferred.size());
     printf_stderr("  out-of-turn Interrupt replies stack size: %" PRIuSIZE "\n",
                   mOutOfTurnReplies.size());
+    printf_stderr("  Pending queue size: %" PRIuSIZE ", front to back:\n",
+                  mPending.size());
 
     MessageQueue pending = Move(mPending);
-    while (!pending.isEmpty()) {
+    while (!pending.empty()) {
         printf_stderr("    [ %s%s ]\n",
-                      pending.getFirst()->Msg().is_interrupt() ? "intr" :
-                      (pending.getFirst()->Msg().is_sync() ? "sync" : "async"),
-                      pending.getFirst()->Msg().is_reply() ? "reply" : "");
-        pending.popFirst();
+                      pending.front().is_interrupt() ? "intr" :
+                      (pending.front().is_sync() ? "sync" : "async"),
+                      pending.front().is_reply() ? "reply" : "");
+        pending.pop_front();
     }
 
     NS_RUNTIMEABORT(why);
 }
 
 void
 MessageChannel::DumpInterruptStack(const char* const pfx) const
 {
@@ -2373,43 +2321,23 @@ void
 MessageChannel::EndTimeout()
 {
     mMonitor->AssertCurrentThreadOwns();
 
     IPC_LOG("Ending timeout of seqno=%d", mTimedOutMessageSeqno);
     mTimedOutMessageSeqno = 0;
     mTimedOutMessageNestedLevel = 0;
 
-    RepostAllMessages();
-}
-
-void
-MessageChannel::RepostAllMessages()
-{
-    bool needRepost = false;
-    for (RefPtr<MessageTask> task : mPending) {
-        if (!task->IsScheduled()) {
-            needRepost = true;
-        }
-    }
-    if (!needRepost) {
-        // If everything is already scheduled to run, do nothing.
-        return;
-    }
-
-    // In some cases we may have deferred dispatch of some messages in the
-    // queue. Now we want to run them again. However, we can't just re-post
-    // those messages since the messages after them in mPending would then be
-    // before them in the event queue. So instead we cancel everything and
-    // re-post all messages in the correct order.
-    MessageQueue queue = Move(mPending);
-    while (RefPtr<MessageTask> task = queue.popFirst()) {
-        RefPtr<MessageTask> newTask = new MessageTask(this, Move(task->Msg()));
-        mPending.insertBack(newTask);
-        newTask->Post();
+    for (size_t i = 0; i < mPending.size(); i++) {
+        // There may be messages in the queue that we expected to process from
+        // OnMaybeDequeueOne. But during the timeout, that function will skip
+        // some messages. Now they're ready to be processed, so we enqueue more
+        // tasks.
+        RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
+        mWorkerLoop->PostTask(task.forget());
     }
 }
 
 void
 MessageChannel::CancelTransaction(int transaction)
 {
     mMonitor->AssertCurrentThreadOwns();
 
@@ -2442,33 +2370,33 @@ MessageChannel::CancelTransaction(int tr
             mTransactionStack->Cancel();
         }
     } else {
         MOZ_RELEASE_ASSERT(mTransactionStack->TransactionID() == transaction);
         mTransactionStack->Cancel();
     }
 
     bool foundSync = false;
-    for (RefPtr<MessageTask> p = mPending.getFirst(); p; ) {
-        Message &msg = p->Msg();
+    for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); ) {
+        Message &msg = *it;
 
         // If there was a race between the parent and the child, then we may
         // have a queued sync message. We want to drop this message from the
         // queue since if will get cancelled along with the transaction being
         // cancelled. This happens if the message in the queue is NESTED_INSIDE_SYNC.
         if (msg.is_sync() && msg.nested_level() != IPC::Message::NOT_NESTED) {
             MOZ_RELEASE_ASSERT(!foundSync);
             MOZ_RELEASE_ASSERT(msg.transaction_id() != transaction);
             IPC_LOG("Removing msg from queue seqno=%d xid=%d", msg.seqno(), msg.transaction_id());
             foundSync = true;
-            p = p->removeAndGetNext();
+            it = mPending.erase(it);
             continue;
         }
 
-        p = p->getNext();
+        it++;
     }
 }
 
 bool
 MessageChannel::IsInTransaction() const
 {
     MonitorAutoLock lock(*mMonitor);
     return !!mTransactionStack;
--- a/ipc/glue/MessageChannel.h
+++ b/ipc/glue/MessageChannel.h
@@ -269,16 +269,20 @@ class MessageChannel : HasResultCodes
     bool HasPendingEvents();
 
     void ProcessPendingRequests(AutoEnterTransaction& aTransaction);
     bool ProcessPendingRequest(Message &&aUrgent);
 
     void MaybeUndeferIncall();
     void EnqueuePendingMessages();
 
+    // Executed on the worker thread. Dequeues one pending message.
+    bool OnMaybeDequeueOne();
+    bool DequeueOne(Message *recvd);
+
     // Dispatches an incoming message to its appropriate handler.
     void DispatchMessage(Message &&aMsg);
 
     // DispatchMessage will route to one of these functions depending on the
     // protocol type of the message.
     void DispatchSyncMessage(const Message &aMsg, Message*& aReply);
     void DispatchUrgentMessage(const Message &aMsg);
     void DispatchAsyncMessage(const Message &aMsg);
@@ -300,18 +304,16 @@ class MessageChannel : HasResultCodes
 
     bool WaitResponse(bool aWaitTimedOut);
 
     bool ShouldContinueFromTimeout();
 
     void EndTimeout();
     void CancelTransaction(int transaction);
 
-    void RepostAllMessages();
-
     // The "remote view of stack depth" can be different than the
     // actual stack depth when there are out-of-turn replies.  When we
     // receive one, our actual Interrupt stack depth doesn't decrease, but
     // the other side (that sent the reply) thinks it has.  So, the
     // "view" returned here is |stackDepth| minus the number of
     // out-of-turn replies.
     //
     // Only called from the worker thread.
@@ -447,66 +449,147 @@ class MessageChannel : HasResultCodes
     // NOT our worker thread.
     void AssertLinkThread() const
     {
         MOZ_RELEASE_ASSERT(mWorkerLoopID != MessageLoop::current()->id(),
                            "on worker thread but should not be!");
     }
 
   private:
-    class MessageTask :
-        public CancelableRunnable,
-        public LinkedListElement<RefPtr<MessageTask>>
+#if defined(MOZ_CRASHREPORTER) && defined(OS_WIN)
+    // TODO: Remove the condition OS_WIN above once we move to GCC 5 or higher,
+    // the code will be able to get compiled as std::deque will meet C++11
+    // allocator requirements.
+    template<class T>
+    struct AnnotateAllocator
     {
-    public:
-        explicit MessageTask(MessageChannel* aChannel, Message&& aMessage)
-          : mChannel(aChannel), mMessage(Move(aMessage)), mScheduled(false)
-        {}
+      typedef T value_type;
+      AnnotateAllocator(MessageChannel& channel) : mChannel(channel) {}
+      template<class U> AnnotateAllocator(const AnnotateAllocator<U>& other) :
+        mChannel(other.mChannel) {}
+      template<class U> bool operator==(const AnnotateAllocator<U>&) { return true; }
+      template<class U> bool operator!=(const AnnotateAllocator<U>&) { return false; }
+      T* allocate(size_t n) {
+        void* p = ::operator new(n * sizeof(T), std::nothrow);
+        if (!p && n) {
+          // Sort the pending messages by its type, note the sorting algorithm
+          // has to be in-place to avoid memory allocation.
+          MessageQueue& q = mChannel.mPending;
+          std::sort(q.begin(), q.end(), [](const Message& a, const Message& b) {
+            return a.type() < b.type();
+          });
 
-        NS_IMETHOD Run() override;
-        NS_IMETHOD Cancel() override;
-        void Post();
-        void Clear();
+          // Iterate over the sorted queue to find the message that has the
+          // highest number of count.
+          const char* topName = nullptr;
+          const char* curName = nullptr;
+          msgid_t topType = 0, curType = 0;
+          uint32_t topCount = 0, curCount = 0;
+          for (MessageQueue::iterator it = q.begin(); it != q.end(); ++it) {
+            Message &msg = *it;
+            if (msg.type() == curType) {
+              ++curCount;
+            } else {
+              if (curCount > topCount) {
+                topName = curName;
+                topType = curType;
+                topCount = curCount;
+              }
+              curName = StringFromIPCMessageType(msg.type());
+              curType = msg.type();
+              curCount = 1;
+            }
+          }
+          // In case the last type is the top one.
+          if (curCount > topCount) {
+            topName = curName;
+            topType = curType;
+            topCount = curCount;
+          }
 
-        bool IsScheduled() const { return mScheduled; }
+          CrashReporter::AnnotatePendingIPC(q.size(), topCount, topName, topType);
 
-        Message& Msg() { return mMessage; }
-        const Message& Msg() const { return mMessage; }
+          mozalloc_handle_oom(n * sizeof(T));
+        }
+        return static_cast<T*>(p);
+      }
+      void deallocate(T* p, size_t n) {
+        ::operator delete(p);
+      }
+      MessageChannel& mChannel;
+    };
+    typedef std::deque<Message, AnnotateAllocator<Message>> MessageQueue;
+#else
+    typedef std::deque<Message> MessageQueue;
+#endif
+    typedef std::map<size_t, Message> MessageMap;
+    typedef IPC::Message::msgid_t msgid_t;
 
-    private:
-        MessageTask() = delete;
-        MessageTask(const MessageTask&) = delete;
+    // XXXkhuey this can almost certainly die.
+    // All dequeuing tasks require a single point of cancellation,
+    // which is handled via a reference-counted task.
+    class RefCountedTask
+    {
+      public:
+        explicit RefCountedTask(already_AddRefed<CancelableRunnable> aTask)
+          : mTask(aTask)
+        { }
+      private:
+        ~RefCountedTask() { }
+      public:
+        void Run() { mTask->Run(); }
+        void Cancel() { mTask->Cancel(); }
 
-        MessageChannel* mChannel;
-        Message mMessage;
-        bool mScheduled : 1;
+        NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedTask)
+
+      private:
+        RefPtr<CancelableRunnable> mTask;
     };
 
-    bool ShouldRunMessage(const Message& aMsg);
-    void RunMessage(MessageTask& aTask);
+    // Wrap an existing task which can be cancelled at any time
+    // without the wrapper's knowledge.
+    class DequeueTask : public CancelableRunnable
+    {
+      public:
+        explicit DequeueTask(RefCountedTask* aTask)
+          : mTask(aTask)
+        { }
+        NS_IMETHOD Run() override {
+          if (mTask) {
+            mTask->Run();
+          }
+          return NS_OK;
+        }
+        nsresult Cancel() override {
+          mTask = nullptr;
+          return NS_OK;
+        }
 
-    typedef LinkedList<RefPtr<MessageTask>> MessageQueue;
-    typedef std::map<size_t, Message> MessageMap;
-    typedef IPC::Message::msgid_t msgid_t;
+      private:
+        RefPtr<RefCountedTask> mTask;
+    };
 
   private:
     // Based on presumption the listener owns and overlives the channel,
     // this is never nullified.
     MessageListener* mListener;
     ChannelState mChannelState;
     RefPtr<RefCountedMonitor> mMonitor;
     Side mSide;
     MessageLink* mLink;
     MessageLoop* mWorkerLoop;           // thread where work is done
     RefPtr<CancelableRunnable> mChannelErrorTask;  // NotifyMaybeChannelError runnable
 
     // id() of mWorkerLoop.  This persists even after mWorkerLoop is cleared
     // during channel shutdown.
     int mWorkerLoopID;
 
+    // A task encapsulating dequeuing one pending message.
+    RefPtr<RefCountedTask> mDequeueOneTask;
+
     // Timeout periods are broken up in two to prevent system suspension from
     // triggering an abort. This method (called by WaitForEvent with a 'did
     // timeout' flag) decides if we should wait again for half of mTimeoutMs
     // or give up.
     int32_t mTimeoutMs;
     bool mInTimeoutSecondHalf;
 
     // Worker-thread only; sequence numbers for messages that require
@@ -583,44 +666,46 @@ class MessageChannel : HasResultCodes
     // error.
     //
     // A message is only timed out if it initiated a transaction. This avoids
     // hitting a lot of corner cases with message nesting that we don't really
     // care about.
     int32_t mTimedOutMessageSeqno;
     int mTimedOutMessageNestedLevel;
 
-    // Queue of all incoming messages.
+    // Queue of all incoming messages, except for replies to sync and urgent
+    // messages, which are delivered directly to mRecvd, and any pending urgent
+    // incall, which is stored in mPendingUrgentRequest.
     //
     // If both this side and the other side are functioning correctly, the queue
     // can only be in certain configurations.  Let
     //
     //   |A<| be an async in-message,
     //   |S<| be a sync in-message,
     //   |C<| be an Interrupt in-call,
     //   |R<| be an Interrupt reply.
     //
     // The queue can only match this configuration
     //
-    //  A<* (S< | C< | R< (?{mInterruptStack.size() == 1} A<* (S< | C<)))
+    //  A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<)))
     //
     // The other side can send as many async messages |A<*| as it wants before
     // sending us a blocking message.
     //
     // The first case is |S<|, a sync in-msg.  The other side must be blocked,
     // and thus can't send us any more messages until we process the sync
     // in-msg.
     //
     // The second case is |C<|, an Interrupt in-call; the other side must be blocked.
     // (There's a subtlety here: this in-call might have raced with an
     // out-call, but we detect that with the mechanism below,
     // |mRemoteStackDepth|, and races don't matter to the queue.)
     //
     // Final case, the other side replied to our most recent out-call |R<|.
-    // If that was the *only* out-call on our stack, |?{mInterruptStack.size() == 1}|,
+    // If that was the *only* out-call on our stack, |?{mStack.size() == 1}|,
     // then other side "finished with us," and went back to its own business.
     // That business might have included sending any number of async message
     // |A<*| until sending a blocking message |(S< | C<)|.  If we had more than
     // one Interrupt call on our stack, the other side *better* not have sent us
     // another blocking message, because it's blocked on a reply from us.
     //
     MessageQueue mPending;
 
@@ -635,17 +720,17 @@ class MessageChannel : HasResultCodes
     // calls.  With each Interrupt out-call sent, we send along what *we* think the
     // stack depth of the remote side is *before* it will receive the Interrupt call.
     //
     // After sending the out-call, our stack depth is "incremented" by pushing
     // that pending message onto mPending.
     //
     // Then when processing an in-call |c|, it must be true that
     //
-    //   mInterruptStack.size() == c.remoteDepth
+    //   mStack.size() == c.remoteDepth
     //
     // I.e., my depth is actually the same as what the other side thought it
     // was when it sent in-call |c|.  If this fails to hold, we have detected
     // racy Interrupt calls.
     //
     // We then increment mRemoteStackDepth *just before* processing the
     // in-call, since we know the other side is waiting on it, and decrement
     // it *just after* finishing processing that in-call, since our response