Bug 1312960 - Associate each message in IPC queue with the runnable that will run it (r=dvander)
authorBill McCloskey <billm@mozilla.com>
Sat, 15 Oct 2016 11:47:14 -0700
changeset 320264 774891f4556d
parent 320263 13826b3d90c7
child 320265 0432b5da3243
push id20754
push usercbook@mozilla.com
push date2016-10-31 15:58 +0000
treeherderfx-team@b1b66b1780c2 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersdvander
bugs1312960
milestone52.0a1
Bug 1312960 - Associate each message in IPC queue with the runnable that will run it (r=dvander)
ipc/glue/MessageChannel.cpp
ipc/glue/MessageChannel.h
--- a/ipc/glue/MessageChannel.cpp
+++ b/ipc/glue/MessageChannel.cpp
@@ -484,39 +484,32 @@ MessageChannel::MessageChannel(MessageLi
     mInTimeoutSecondHalf(false),
     mNextSeqno(0),
     mLastSendError(SyncSendError::SendSuccess),
     mDispatchingAsyncMessage(false),
     mDispatchingAsyncMessageNestedLevel(0),
     mTransactionStack(nullptr),
     mTimedOutMessageSeqno(0),
     mTimedOutMessageNestedLevel(0),
-#if defined(MOZ_CRASHREPORTER) && defined(OS_WIN)
-    mPending(AnnotateAllocator<Message>(*this)),
-#endif
-    mRemoteStackDepthGuess(false),
+    mRemoteStackDepthGuess(0),
     mSawInterruptOutMsg(false),
     mIsWaitingForIncoming(false),
     mAbortOnError(false),
     mNotifiedChannelDone(false),
     mFlags(REQUIRE_DEFAULT),
     mPeerPidSet(false),
     mPeerPid(-1)
 {
     MOZ_COUNT_CTOR(ipc::MessageChannel);
 
 #ifdef OS_WIN
     mTopFrame = nullptr;
     mIsSyncWaitingOnNonMainThread = false;
 #endif
 
-    RefPtr<CancelableRunnable> runnable =
-        NewNonOwningCancelableRunnableMethod(this, &MessageChannel::OnMaybeDequeueOne);
-    mDequeueOneTask = new RefCountedTask(runnable.forget());
-
     mOnChannelConnectedTask =
         NewNonOwningCancelableRunnableMethod(this, &MessageChannel::DispatchOnChannelConnected);
 
 #ifdef OS_WIN
     mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
     MOZ_RELEASE_ASSERT(mEvent, "CreateEvent failed! Nothing is going to work!");
 #endif
 }
@@ -630,31 +623,33 @@ MessageChannel::Clear()
     //
     // In practice, mListener owns the channel, so the channel gets deleted
     // before mListener.  But just to be safe, mListener is a weak pointer.
 
     if (gParentProcessBlocker == this) {
         gParentProcessBlocker = nullptr;
     }
 
-    mDequeueOneTask->Cancel();
-
     mWorkerLoop = nullptr;
     delete mLink;
     mLink = nullptr;
 
     mOnChannelConnectedTask->Cancel();
 
     if (mChannelErrorTask) {
         mChannelErrorTask->Cancel();
         mChannelErrorTask = nullptr;
     }
 
     // Free up any memory used by pending messages.
+    for (RefPtr<MessageTask> task : mPending) {
+        task->Clear();
+    }
     mPending.clear();
+
     mOutOfTurnReplies.clear();
     while (!mDeferred.empty()) {
         mDeferred.pop();
     }
 }
 
 bool
 MessageChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide)
@@ -874,29 +869,16 @@ MessageChannel::ShouldDeferMessage(const
     //
     // Deferring in the parent only sort of breaks message ordering. When the
     // child's message comes in, we can pretend the child hasn't quite
     // finished sending it yet. Since the message is sync, we know that the
     // child hasn't moved on yet.
     return mSide == ParentSide && aMsg.transaction_id() != CurrentNestedInsideSyncTransaction();
 }
 
-// Predicate that is true for messages that should be consolidated if 'compress' is set.
-class MatchingKinds {
-    typedef IPC::Message Message;
-    Message::msgid_t mType;
-    int32_t mRoutingId;
-public:
-    MatchingKinds(Message::msgid_t aType, int32_t aRoutingId) :
-        mType(aType), mRoutingId(aRoutingId) {}
-    bool operator()(const Message &msg) {
-        return msg.type() == mType && msg.routing_id() == mRoutingId;
-    }
-};
-
 void
 MessageChannel::OnMessageReceivedFromLink(Message&& aMsg)
 {
     AssertLinkThread();
     mMonitor->AssertCurrentThreadOwns();
 
     if (MaybeInterceptSpecialIOMessage(aMsg))
         return;
@@ -920,59 +902,66 @@ MessageChannel::OnMessageReceivedFromLin
         NotifyWorkerThread();
         return;
     }
 
     // Nested messages cannot be compressed.
     MOZ_RELEASE_ASSERT(aMsg.compress_type() == IPC::Message::COMPRESSION_NONE ||
                        aMsg.nested_level() == IPC::Message::NOT_NESTED);
 
-    bool compress = false;
+    bool reuseTask = false;
     if (aMsg.compress_type() == IPC::Message::COMPRESSION_ENABLED) {
-        compress = (!mPending.empty() &&
-                    mPending.back().type() == aMsg.type() &&
-                    mPending.back().routing_id() == aMsg.routing_id());
+        bool compress = (!mPending.isEmpty() &&
+                         mPending.getLast()->Msg().type() == aMsg.type() &&
+                         mPending.getLast()->Msg().routing_id() == aMsg.routing_id());
         if (compress) {
             // This message type has compression enabled, and the back of the
             // queue was the same message type and routed to the same destination.
             // Replace it with the newer message.
-            MOZ_RELEASE_ASSERT(mPending.back().compress_type() ==
-                                  IPC::Message::COMPRESSION_ENABLED);
-            mPending.pop_back();
+            MOZ_RELEASE_ASSERT(mPending.getLast()->Msg().compress_type() ==
+                               IPC::Message::COMPRESSION_ENABLED);
+            mPending.getLast()->Msg() = Move(aMsg);
+
+            reuseTask = true;
         }
-    } else if (aMsg.compress_type() == IPC::Message::COMPRESSION_ALL) {
-        // Check the message queue for another message with this type/destination.
-        auto it = std::find_if(mPending.rbegin(), mPending.rend(),
-                               MatchingKinds(aMsg.type(), aMsg.routing_id()));
-        if (it != mPending.rend()) {
-            // This message type has compression enabled, and the queue holds
-            // a message with the same message type and routed to the same destination.
-            // Erase it.  Note that, since we always compress these redundancies, There Can
-            // Be Only One.
-            compress = true;
-            MOZ_RELEASE_ASSERT((*it).compress_type() == IPC::Message::COMPRESSION_ALL);
-            mPending.erase((++it).base());
+    } else if (aMsg.compress_type() == IPC::Message::COMPRESSION_ALL && !mPending.isEmpty()) {
+        for (RefPtr<MessageTask> p = mPending.getLast(); p; p = p->getPrevious()) {
+            if (p->Msg().type() == aMsg.type() &&
+                p->Msg().routing_id() == aMsg.routing_id())
+            {
+                // This message type has compression enabled, and the queue
+                // holds a message with the same message type and routed to the
+                // same destination. Erase it. Note that, since we always
+                // compress these redundancies, There Can Be Only One.
+                MOZ_RELEASE_ASSERT(p->Msg().compress_type() == IPC::Message::COMPRESSION_ALL);
+                p->remove();
+                break;
+            }
         }
     }
 
     bool wakeUpSyncSend = AwaitingSyncReply() && !ShouldDeferMessage(aMsg);
 
     bool shouldWakeUp = AwaitingInterruptReply() ||
                         wakeUpSyncSend ||
                         AwaitingIncomingMessage();
 
-    // Although we usually don't need to post an OnMaybeDequeueOne task if
+    // Although we usually don't need to post a message task if
     // shouldWakeUp is true, it's easier to post anyway than to have to
     // guarantee that every Send call processes everything it's supposed to
     // before returning.
     bool shouldPostTask = !shouldWakeUp || wakeUpSyncSend;
 
     IPC_LOG("Receive on link thread; seqno=%d, xid=%d, shouldWakeUp=%d",
             aMsg.seqno(), aMsg.transaction_id(), shouldWakeUp);
 
+    if (reuseTask) {
+        return;
+    }
+
     // There are three cases we're concerned about, relating to the state of the
     // main thread:
     //
     // (1) We are waiting on a sync reply - main thread is blocked on the
     //     IPC monitor.
     //   - If the message is NESTED_INSIDE_SYNC, we wake up the main thread to
     //     deliver the message depending on ShouldDeferMessage. Otherwise, we
     //     leave it in the mPending queue, posting a task to the main event
@@ -985,83 +974,83 @@ MessageChannel::OnMessageReceivedFromLin
     //
     // (3) We are not waiting on a reply.
     //   - We post a task to the main event loop.
     //
     // Note that, we may notify the main thread even though the monitor is not
     // blocked. This is okay, since we always check for pending events before
     // blocking again.
 
-    mPending.push_back(Move(aMsg));
+    RefPtr<MessageTask> task = new MessageTask(this, Move(aMsg));
+    mPending.insertBack(task);
 
     if (shouldWakeUp) {
         NotifyWorkerThread();
     }
 
     if (shouldPostTask) {
-        if (!compress) {
-            // If we compressed away the previous message, we'll re-use
-            // its pending task.
-            RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
-            mWorkerLoop->PostTask(task.forget());
-        }
+        task->Post();
     }
 }
 
 void
 MessageChannel::PeekMessages(mozilla::function<bool(const Message& aMsg)> aInvoke)
 {
+    // FIXME: We shouldn't be holding the lock for aInvoke!
     MonitorAutoLock lock(*mMonitor);
 
-    for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); it++) {
-        Message &msg = *it;
+    for (RefPtr<MessageTask> it : mPending) {
+        const Message &msg = it->Msg();
         if (!aInvoke(msg)) {
             break;
         }
     }
 }
 
 void
 MessageChannel::ProcessPendingRequests(AutoEnterTransaction& aTransaction)
 {
+    mMonitor->AssertCurrentThreadOwns();
+
     IPC_LOG("ProcessPendingRequests for seqno=%d, xid=%d",
             aTransaction.SequenceNumber(), aTransaction.TransactionID());
 
     // Loop until there aren't any more nested messages to process.
     for (;;) {
         // If we canceled during ProcessPendingRequest, then we need to leave
         // immediately because the results of ShouldDeferMessage will be
         // operating with weird state (as if no Send is in progress). That could
         // cause even NOT_NESTED sync messages to be processed (but not
         // NOT_NESTED async messages), which would break message ordering.
         if (aTransaction.IsCanceled()) {
             return;
         }
 
         mozilla::Vector<Message> toProcess;
 
-        for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); ) {
-            Message &msg = *it;
+        for (RefPtr<MessageTask> p = mPending.getFirst(); p; ) {
+            Message &msg = p->Msg();
 
             MOZ_RELEASE_ASSERT(!aTransaction.IsCanceled(),
                                "Calling ShouldDeferMessage when cancelled");
             bool defer = ShouldDeferMessage(msg);
 
             // Only log the interesting messages.
             if (msg.is_sync() || msg.nested_level() == IPC::Message::NESTED_INSIDE_CPOW) {
                 IPC_LOG("ShouldDeferMessage(seqno=%d) = %d", msg.seqno(), defer);
             }
 
             if (!defer) {
                 if (!toProcess.append(Move(msg)))
                     MOZ_CRASH();
-                it = mPending.erase(it);
+
+                p = p->removeAndGetNext();
                 continue;
             }
-            it++;
+            p = p->getNext();
         }
 
         if (toProcess.empty()) {
             break;
         }
 
         // Processing these messages could result in more messages, so we
         // loop around to check for more afterwards.
@@ -1353,19 +1342,19 @@ MessageChannel::Call(Message* aMsg, Mess
         Message recvd;
         MessageMap::iterator it;
 
         if ((it = mOutOfTurnReplies.find(mInterruptStack.top().seqno()))
             != mOutOfTurnReplies.end())
         {
             recvd = Move(it->second);
             mOutOfTurnReplies.erase(it);
-        } else if (!mPending.empty()) {
-            recvd = Move(mPending.front());
-            mPending.pop_front();
+        } else if (!mPending.isEmpty()) {
+            RefPtr<MessageTask> task = mPending.popFirst();
+            recvd = Move(task->Msg());
         } else {
             // because of subtleties with nested event loops, it's possible
             // that we got here and nothing happened.  or, we might have a
             // deferred in-call that needs to be processed.  either way, we
             // won't break the inner while loop again until something new
             // happens.
             continue;
         }
@@ -1444,47 +1433,48 @@ MessageChannel::Call(Message* aMsg, Mess
 bool
 MessageChannel::WaitForIncomingMessage()
 {
 #ifdef OS_WIN
     SyncStackFrame frame(this, true);
     NeuteredWindowRegion neuteredRgn(mFlags & REQUIRE_DEFERRED_MESSAGE_PROTECTION);
 #endif
 
-    { // Scope for lock
-        MonitorAutoLock lock(*mMonitor);
-        AutoEnterWaitForIncoming waitingForIncoming(*this);
-        if (mChannelState != ChannelConnected) {
-            return false;
-        }
-        if (!HasPendingEvents()) {
-            return WaitForInterruptNotify();
-        }
+    MonitorAutoLock lock(*mMonitor);
+    AutoEnterWaitForIncoming waitingForIncoming(*this);
+    if (mChannelState != ChannelConnected) {
+        return false;
     }
-
-    return OnMaybeDequeueOne();
+    if (!HasPendingEvents()) {
+        return WaitForInterruptNotify();
+    }
+
+    MOZ_RELEASE_ASSERT(!mPending.isEmpty());
+    RefPtr<MessageTask> task = mPending.getFirst();
+    RunMessage(*task);
+    return true;
 }
 
 bool
 MessageChannel::HasPendingEvents()
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
-    return Connected() && !mPending.empty();
+    return Connected() && !mPending.isEmpty();
 }
 
 bool
 MessageChannel::InterruptEventOccurred()
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
     IPC_ASSERT(InterruptStackDepth() > 0, "not in wait loop");
 
     return (!Connected() ||
-            !mPending.empty() ||
+            !mPending.isEmpty() ||
             (!mOutOfTurnReplies.empty() &&
              mOutOfTurnReplies.find(mInterruptStack.top().seqno()) !=
              mOutOfTurnReplies.end()));
 }
 
 bool
 MessageChannel::ProcessPendingRequest(Message &&aUrgent)
 {
@@ -1498,95 +1488,163 @@ MessageChannel::ProcessPendingRequest(Me
         ReportConnectionError("MessageChannel::ProcessPendingRequest");
         return false;
     }
 
     return true;
 }
 
 bool
-MessageChannel::DequeueOne(Message *recvd)
+MessageChannel::ShouldRunMessage(const Message& aMsg)
 {
-    AssertWorkerThread();
-    mMonitor->AssertCurrentThreadOwns();
-
-    if (!Connected()) {
-        ReportConnectionError("OnMaybeDequeueOne");
-        return false;
+    if (!mTimedOutMessageSeqno) {
+        return true;
     }
 
-    if (!mDeferred.empty())
-        MaybeUndeferIncall();
-
     // If we've timed out a message and we're awaiting the reply to the timed
     // out message, we have to be careful what messages we process. Here's what
     // can go wrong:
     // 1. child sends a NOT_NESTED sync message S
     // 2. parent sends a NESTED_INSIDE_SYNC sync message H at the same time
     // 3. parent times out H
     // 4. child starts processing H and sends a NESTED_INSIDE_SYNC message H' nested
     //    within the same transaction
     // 5. parent dispatches S and sends reply
     // 6. child asserts because it instead expected a reply to H'.
     //
     // To solve this, we refuse to process S in the parent until we get a reply
     // to H. More generally, let the timed out message be M. We don't process a
     // message unless the child would need the response to that message in order
     // to process M. Those messages are the ones that have a higher nested level
     // than M or that are part of the same transaction as M.
-    if (mTimedOutMessageSeqno) {
-        for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); it++) {
-            Message &msg = *it;
-            if (msg.nested_level() > mTimedOutMessageNestedLevel ||
-                (msg.nested_level() == mTimedOutMessageNestedLevel
-                 && msg.transaction_id() == mTimedOutMessageSeqno))
-            {
-                *recvd = Move(msg);
-                mPending.erase(it);
-                return true;
-            }
-        }
+    if (aMsg.nested_level() < mTimedOutMessageNestedLevel ||
+        (aMsg.nested_level() == mTimedOutMessageNestedLevel
+         && aMsg.transaction_id() != mTimedOutMessageSeqno))
+    {
         return false;
     }
 
-    if (mPending.empty())
-        return false;
-
-    *recvd = Move(mPending.front());
-    mPending.pop_front();
-    return true;
-}
-
-bool
-MessageChannel::OnMaybeDequeueOne()
-{
-    AssertWorkerThread();
-    mMonitor->AssertNotCurrentThreadOwns();
-
-    Message recvd;
-
-    MonitorAutoLock lock(*mMonitor);
-    if (!DequeueOne(&recvd))
-        return false;
-
-    if (IsOnCxxStack() && recvd.is_interrupt() && recvd.is_reply()) {
-        // We probably just received a reply in a nested loop for an
-        // Interrupt call sent before entering that loop.
-        mOutOfTurnReplies[recvd.seqno()] = Move(recvd);
-        return false;
-    }
-
-    DispatchMessage(Move(recvd));
-
     return true;
 }
 
 void
+MessageChannel::RunMessage(MessageTask& aTask)
+{
+    AssertWorkerThread();
+    mMonitor->AssertCurrentThreadOwns();
+
+    Message& msg = aTask.Msg();
+
+    if (!Connected()) {
+        ReportConnectionError("RunMessage");
+        return;
+    }
+
+    // Check that we're going to run the first message that's valid to run.
+#ifdef DEBUG
+    for (RefPtr<MessageTask> task : mPending) {
+        if (task == &aTask) {
+            break;
+        }
+        MOZ_ASSERT(!ShouldRunMessage(task->Msg()));
+    }
+#endif
+
+    if (!mDeferred.empty()) {
+        MaybeUndeferIncall();
+    }
+
+    if (!ShouldRunMessage(msg)) {
+        return;
+    }
+
+    MOZ_RELEASE_ASSERT(aTask.isInList());
+    aTask.remove();
+
+    if (IsOnCxxStack() && msg.is_interrupt() && msg.is_reply()) {
+        // We probably just received a reply in a nested loop for an
+        // Interrupt call sent before entering that loop.
+        mOutOfTurnReplies[msg.seqno()] = Move(msg);
+        return;
+    }
+
+    DispatchMessage(Move(msg));
+}
+
+nsresult
+MessageChannel::MessageTask::Run()
+{
+    if (!mChannel) {
+        return NS_OK;
+    }
+
+    mChannel->AssertWorkerThread();
+    mChannel->mMonitor->AssertNotCurrentThreadOwns();
+
+    MonitorAutoLock lock(*mChannel->mMonitor);
+
+    // In case we choose not to run this message, we may need to be able to Post
+    // it again.
+    mScheduled = false;
+
+    if (!isInList()) {
+        return NS_OK;
+    }
+
+    mChannel->RunMessage(*this);
+    return NS_OK;
+}
+
+// Warning: This method removes the receiver from whatever list it might be in.
+nsresult
+MessageChannel::MessageTask::Cancel()
+{
+    if (!mChannel) {
+        return NS_OK;
+    }
+
+    mChannel->AssertWorkerThread();
+    mChannel->mMonitor->AssertNotCurrentThreadOwns();
+
+    MonitorAutoLock lock(*mChannel->mMonitor);
+
+    if (!isInList()) {
+        return NS_OK;
+    }
+    remove();
+
+    return NS_OK;
+}
+
+void
+MessageChannel::MessageTask::Post()
+{
+    MOZ_RELEASE_ASSERT(!mScheduled);
+    MOZ_RELEASE_ASSERT(isInList());
+
+    mScheduled = true;
+
+    RefPtr<MessageTask> self = this;
+    mChannel->mWorkerLoop->PostTask(self.forget());
+}
+
+void
+MessageChannel::MessageTask::Clear()
+{
+    mChannel->AssertWorkerThread();
+
+    mChannel = nullptr;
+}
+
+void
 MessageChannel::DispatchMessage(Message &&aMsg)
 {
+    AssertWorkerThread();
+    mMonitor->AssertCurrentThreadOwns();
+
     Maybe<AutoNoJSAPI> nojsapi;
     if (ScriptSettingsInitialized() && NS_IsMainThread())
         nojsapi.emplace();
 
     nsAutoPtr<Message> reply;
 
     IPC_LOG("DispatchMessage: seqno=%d, xid=%d", aMsg.seqno(), aMsg.transaction_id());
 
@@ -1775,17 +1833,19 @@ MessageChannel::MaybeUndeferIncall()
     Message call(Move(mDeferred.top()));
     mDeferred.pop();
 
     // fix up fudge factor we added to account for race
     IPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
     --mRemoteStackDepthGuess;
 
     MOZ_RELEASE_ASSERT(call.nested_level() == IPC::Message::NOT_NESTED);
-    mPending.push_back(Move(call));
+    RefPtr<MessageTask> task = new MessageTask(this, Move(call));
+    mPending.insertBack(task);
+    task->Post();
 }
 
 void
 MessageChannel::ExitedCxxStack()
 {
     mListener->OnExitedCxxStack();
     if (mSawInterruptOutMsg) {
         MonitorAutoLock lock(*mMonitor);
@@ -1798,28 +1858,20 @@ MessageChannel::ExitedCxxStack()
 void
 MessageChannel::EnqueuePendingMessages()
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
 
     MaybeUndeferIncall();
 
-    for (size_t i = 0; i < mDeferred.size(); ++i) {
-        RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
-        mWorkerLoop->PostTask(task.forget());
-    }
-
     // XXX performance tuning knob: could process all or k pending
     // messages here, rather than enqueuing for later processing
 
-    for (size_t i = 0; i < mPending.size(); ++i) {
-        RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
-        mWorkerLoop->PostTask(task.forget());
-    }
+    RepostAllMessages();
 }
 
 static inline bool
 IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout)
 {
     return (aTimeout != PR_INTERVAL_NO_TIMEOUT) &&
            (aTimeout <= (PR_IntervalNow() - aStart));
 }
@@ -2264,26 +2316,24 @@ MessageChannel::DebugAbort(const char* f
     // technically we need the mutex for this, but we're dying anyway
     DumpInterruptStack("  ");
     printf_stderr("  remote Interrupt stack guess: %" PRIuSIZE "\n",
                   mRemoteStackDepthGuess);
     printf_stderr("  deferred stack size: %" PRIuSIZE "\n",
                   mDeferred.size());
     printf_stderr("  out-of-turn Interrupt replies stack size: %" PRIuSIZE "\n",
                   mOutOfTurnReplies.size());
-    printf_stderr("  Pending queue size: %" PRIuSIZE ", front to back:\n",
-                  mPending.size());
 
     MessageQueue pending = Move(mPending);
-    while (!pending.empty()) {
+    while (!pending.isEmpty()) {
         printf_stderr("    [ %s%s ]\n",
-                      pending.front().is_interrupt() ? "intr" :
-                      (pending.front().is_sync() ? "sync" : "async"),
-                      pending.front().is_reply() ? "reply" : "");
-        pending.pop_front();
+                      pending.getFirst()->Msg().is_interrupt() ? "intr" :
+                      (pending.getFirst()->Msg().is_sync() ? "sync" : "async"),
+                      pending.getFirst()->Msg().is_reply() ? "reply" : "");
+        pending.popFirst();
     }
 
     NS_RUNTIMEABORT(why);
 }
 
 void
 MessageChannel::DumpInterruptStack(const char* const pfx) const
 {
@@ -2321,23 +2371,43 @@ void
 MessageChannel::EndTimeout()
 {
     mMonitor->AssertCurrentThreadOwns();
 
     IPC_LOG("Ending timeout of seqno=%d", mTimedOutMessageSeqno);
     mTimedOutMessageSeqno = 0;
     mTimedOutMessageNestedLevel = 0;
 
-    for (size_t i = 0; i < mPending.size(); i++) {
-        // There may be messages in the queue that we expected to process from
-        // OnMaybeDequeueOne. But during the timeout, that function will skip
-        // some messages. Now they're ready to be processed, so we enqueue more
-        // tasks.
-        RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
-        mWorkerLoop->PostTask(task.forget());
+    RepostAllMessages();
+}
+
+void
+MessageChannel::RepostAllMessages()
+{
+    bool needRepost = false;
+    for (RefPtr<MessageTask> task : mPending) {
+        if (!task->IsScheduled()) {
+            needRepost = true;
+        }
+    }
+    if (!needRepost) {
+        // If everything is already scheduled to run, do nothing.
+        return;
+    }
+
+    // In some cases we may have deferred dispatch of some messages in the
+    // queue. Now we want to run them again. However, we can't just re-post
+    // those messages since the messages after them in mPending would then be
+    // before them in the event queue. So instead we cancel everything and
+    // re-post all messages in the correct order.
+    MessageQueue queue = Move(mPending);
+    while (RefPtr<MessageTask> task = queue.popFirst()) {
+        RefPtr<MessageTask> newTask = new MessageTask(this, Move(task->Msg()));
+        mPending.insertBack(newTask);
+        newTask->Post();
     }
 }
 
 void
 MessageChannel::CancelTransaction(int transaction)
 {
     mMonitor->AssertCurrentThreadOwns();
 
@@ -2370,33 +2440,33 @@ MessageChannel::CancelTransaction(int tr
             mTransactionStack->Cancel();
         }
     } else {
         MOZ_RELEASE_ASSERT(mTransactionStack->TransactionID() == transaction);
         mTransactionStack->Cancel();
     }
 
     bool foundSync = false;
-    for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); ) {
-        Message &msg = *it;
+    for (RefPtr<MessageTask> p = mPending.getFirst(); p; ) {
+        Message &msg = p->Msg();
 
         // If there was a race between the parent and the child, then we may
         // have a queued sync message. We want to drop this message from the
         // queue since if will get cancelled along with the transaction being
         // cancelled. This happens if the message in the queue is NESTED_INSIDE_SYNC.
         if (msg.is_sync() && msg.nested_level() != IPC::Message::NOT_NESTED) {
             MOZ_RELEASE_ASSERT(!foundSync);
             MOZ_RELEASE_ASSERT(msg.transaction_id() != transaction);
             IPC_LOG("Removing msg from queue seqno=%d xid=%d", msg.seqno(), msg.transaction_id());
             foundSync = true;
-            it = mPending.erase(it);
+            p = p->removeAndGetNext();
             continue;
         }
 
-        it++;
+        p = p->getNext();
     }
 }
 
 bool
 MessageChannel::IsInTransaction() const
 {
     MonitorAutoLock lock(*mMonitor);
     return !!mTransactionStack;
--- a/ipc/glue/MessageChannel.h
+++ b/ipc/glue/MessageChannel.h
@@ -269,20 +269,16 @@ class MessageChannel : HasResultCodes
     bool HasPendingEvents();
 
     void ProcessPendingRequests(AutoEnterTransaction& aTransaction);
     bool ProcessPendingRequest(Message &&aUrgent);
 
     void MaybeUndeferIncall();
     void EnqueuePendingMessages();
 
-    // Executed on the worker thread. Dequeues one pending message.
-    bool OnMaybeDequeueOne();
-    bool DequeueOne(Message *recvd);
-
     // Dispatches an incoming message to its appropriate handler.
     void DispatchMessage(Message &&aMsg);
 
     // DispatchMessage will route to one of these functions depending on the
     // protocol type of the message.
     void DispatchSyncMessage(const Message &aMsg, Message*& aReply);
     void DispatchUrgentMessage(const Message &aMsg);
     void DispatchAsyncMessage(const Message &aMsg);
@@ -304,16 +300,18 @@ class MessageChannel : HasResultCodes
 
     bool WaitResponse(bool aWaitTimedOut);
 
     bool ShouldContinueFromTimeout();
 
     void EndTimeout();
     void CancelTransaction(int transaction);
 
+    void RepostAllMessages();
+
     // The "remote view of stack depth" can be different than the
     // actual stack depth when there are out-of-turn replies.  When we
     // receive one, our actual Interrupt stack depth doesn't decrease, but
     // the other side (that sent the reply) thinks it has.  So, the
     // "view" returned here is |stackDepth| minus the number of
     // out-of-turn replies.
     //
     // Only called from the worker thread.
@@ -449,147 +447,66 @@ class MessageChannel : HasResultCodes
     // NOT our worker thread.
     void AssertLinkThread() const
     {
         MOZ_RELEASE_ASSERT(mWorkerLoopID != MessageLoop::current()->id(),
                            "on worker thread but should not be!");
     }
 
   private:
-#if defined(MOZ_CRASHREPORTER) && defined(OS_WIN)
-    // TODO: Remove the condition OS_WIN above once we move to GCC 5 or higher,
-    // the code will be able to get compiled as std::deque will meet C++11
-    // allocator requirements.
-    template<class T>
-    struct AnnotateAllocator
+    class MessageTask :
+        public CancelableRunnable,
+        public LinkedListElement<RefPtr<MessageTask>>
     {
-      typedef T value_type;
-      AnnotateAllocator(MessageChannel& channel) : mChannel(channel) {}
-      template<class U> AnnotateAllocator(const AnnotateAllocator<U>& other) :
-        mChannel(other.mChannel) {}
-      template<class U> bool operator==(const AnnotateAllocator<U>&) { return true; }
-      template<class U> bool operator!=(const AnnotateAllocator<U>&) { return false; }
-      T* allocate(size_t n) {
-        void* p = ::operator new(n * sizeof(T), std::nothrow);
-        if (!p && n) {
-          // Sort the pending messages by its type, note the sorting algorithm
-          // has to be in-place to avoid memory allocation.
-          MessageQueue& q = mChannel.mPending;
-          std::sort(q.begin(), q.end(), [](const Message& a, const Message& b) {
-            return a.type() < b.type();
-          });
+    public:
+        explicit MessageTask(MessageChannel* aChannel, Message&& aMessage)
+          : mChannel(aChannel), mMessage(Move(aMessage)), mScheduled(false)
+        {}
+
+        NS_IMETHOD Run() override;
+        nsresult Cancel() override;
+        void Post();
+        void Clear();
+
+        bool IsScheduled() const { return mScheduled; }
 
-          // Iterate over the sorted queue to find the message that has the
-          // highest number of count.
-          const char* topName = nullptr;
-          const char* curName = nullptr;
-          msgid_t topType = 0, curType = 0;
-          uint32_t topCount = 0, curCount = 0;
-          for (MessageQueue::iterator it = q.begin(); it != q.end(); ++it) {
-            Message &msg = *it;
-            if (msg.type() == curType) {
-              ++curCount;
-            } else {
-              if (curCount > topCount) {
-                topName = curName;
-                topType = curType;
-                topCount = curCount;
-              }
-              curName = StringFromIPCMessageType(msg.type());
-              curType = msg.type();
-              curCount = 1;
-            }
-          }
-          // In case the last type is the top one.
-          if (curCount > topCount) {
-            topName = curName;
-            topType = curType;
-            topCount = curCount;
-          }
+        Message& Msg() { return mMessage; }
+        const Message& Msg() const { return mMessage; }
+
+    private:
+        MessageTask() = delete;
+        MessageTask(const MessageTask&) = delete;
 
-          CrashReporter::AnnotatePendingIPC(q.size(), topCount, topName, topType);
+        MessageChannel* mChannel;
+        Message mMessage;
+        bool mScheduled : 1;
+    };
 
-          mozalloc_handle_oom(n * sizeof(T));
-        }
-        return static_cast<T*>(p);
-      }
-      void deallocate(T* p, size_t n) {
-        ::operator delete(p);
-      }
-      MessageChannel& mChannel;
-    };
-    typedef std::deque<Message, AnnotateAllocator<Message>> MessageQueue;
-#else
-    typedef std::deque<Message> MessageQueue;
-#endif
+    bool ShouldRunMessage(const Message& aMsg);
+    void RunMessage(MessageTask& aTask);
+
+    typedef LinkedList<RefPtr<MessageTask>> MessageQueue;
     typedef std::map<size_t, Message> MessageMap;
     typedef IPC::Message::msgid_t msgid_t;
 
-    // XXXkhuey this can almost certainly die.
-    // All dequeuing tasks require a single point of cancellation,
-    // which is handled via a reference-counted task.
-    class RefCountedTask
-    {
-      public:
-        explicit RefCountedTask(already_AddRefed<CancelableRunnable> aTask)
-          : mTask(aTask)
-        { }
-      private:
-        ~RefCountedTask() { }
-      public:
-        void Run() { mTask->Run(); }
-        void Cancel() { mTask->Cancel(); }
-
-        NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedTask)
-
-      private:
-        RefPtr<CancelableRunnable> mTask;
-    };
-
-    // Wrap an existing task which can be cancelled at any time
-    // without the wrapper's knowledge.
-    class DequeueTask : public CancelableRunnable
-    {
-      public:
-        explicit DequeueTask(RefCountedTask* aTask)
-          : mTask(aTask)
-        { }
-        NS_IMETHOD Run() override {
-          if (mTask) {
-            mTask->Run();
-          }
-          return NS_OK;
-        }
-        nsresult Cancel() override {
-          mTask = nullptr;
-          return NS_OK;
-        }
-
-      private:
-        RefPtr<RefCountedTask> mTask;
-    };
-
   private:
     // Based on presumption the listener owns and overlives the channel,
     // this is never nullified.
     MessageListener* mListener;
     ChannelState mChannelState;
     RefPtr<RefCountedMonitor> mMonitor;
     Side mSide;
     MessageLink* mLink;
     MessageLoop* mWorkerLoop;           // thread where work is done
     RefPtr<CancelableRunnable> mChannelErrorTask;  // NotifyMaybeChannelError runnable
 
     // id() of mWorkerLoop.  This persists even after mWorkerLoop is cleared
     // during channel shutdown.
     int mWorkerLoopID;
 
-    // A task encapsulating dequeuing one pending message.
-    RefPtr<RefCountedTask> mDequeueOneTask;
-
     // Timeout periods are broken up in two to prevent system suspension from
     // triggering an abort. This method (called by WaitForEvent with a 'did
     // timeout' flag) decides if we should wait again for half of mTimeoutMs
     // or give up.
     int32_t mTimeoutMs;
     bool mInTimeoutSecondHalf;
 
     // Worker-thread only; sequence numbers for messages that require
@@ -666,46 +583,44 @@ class MessageChannel : HasResultCodes
     // error.
     //
     // A message is only timed out if it initiated a transaction. This avoids
     // hitting a lot of corner cases with message nesting that we don't really
     // care about.
     int32_t mTimedOutMessageSeqno;
     int mTimedOutMessageNestedLevel;
 
-    // Queue of all incoming messages, except for replies to sync and urgent
-    // messages, which are delivered directly to mRecvd, and any pending urgent
-    // incall, which is stored in mPendingUrgentRequest.
+    // Queue of all incoming messages.
     //
     // If both this side and the other side are functioning correctly, the queue
     // can only be in certain configurations.  Let
     //
     //   |A<| be an async in-message,
     //   |S<| be a sync in-message,
     //   |C<| be an Interrupt in-call,
     //   |R<| be an Interrupt reply.
     //
     // The queue can only match this configuration
     //
-    //  A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<)))
+    //  A<* (S< | C< | R< (?{mInterruptStack.size() == 1} A<* (S< | C<)))
     //
     // The other side can send as many async messages |A<*| as it wants before
     // sending us a blocking message.
     //
     // The first case is |S<|, a sync in-msg.  The other side must be blocked,
     // and thus can't send us any more messages until we process the sync
     // in-msg.
     //
     // The second case is |C<|, an Interrupt in-call; the other side must be blocked.
     // (There's a subtlety here: this in-call might have raced with an
     // out-call, but we detect that with the mechanism below,
     // |mRemoteStackDepth|, and races don't matter to the queue.)
     //
     // Final case, the other side replied to our most recent out-call |R<|.
-    // If that was the *only* out-call on our stack, |?{mStack.size() == 1}|,
+    // If that was the *only* out-call on our stack, |?{mInterruptStack.size() == 1}|,
     // then other side "finished with us," and went back to its own business.
     // That business might have included sending any number of async message
     // |A<*| until sending a blocking message |(S< | C<)|.  If we had more than
     // one Interrupt call on our stack, the other side *better* not have sent us
     // another blocking message, because it's blocked on a reply from us.
     //
     MessageQueue mPending;
 
@@ -720,17 +635,17 @@ class MessageChannel : HasResultCodes
     // calls.  With each Interrupt out-call sent, we send along what *we* think the
     // stack depth of the remote side is *before* it will receive the Interrupt call.
     //
     // After sending the out-call, our stack depth is "incremented" by pushing
     // that pending message onto mPending.
     //
     // Then when processing an in-call |c|, it must be true that
     //
-    //   mStack.size() == c.remoteDepth
+    //   mInterruptStack.size() == c.remoteDepth
     //
     // I.e., my depth is actually the same as what the other side thought it
     // was when it sent in-call |c|.  If this fails to hold, we have detected
     // racy Interrupt calls.
     //
     // We then increment mRemoteStackDepth *just before* processing the
     // in-call, since we know the other side is waiting on it, and decrement
     // it *just after* finishing processing that in-call, since our response