Backed out changeset b119fb8bc703 (bug 1310547)
authorSebastian Hengst <archaeopteryx@coole-files.de>
Sat, 29 Oct 2016 13:45:10 +0200
changeset 320034 c5a67edeb6239cc576be71fc8ec4955e544a4ca4
parent 320033 e419e663860780657e7825bd7a0108e414c1a134
child 320035 be5f087a2c4e3fcfd64a56921429f2a5b76dc02f
push id30882
push userryanvm@gmail.com
push dateSat, 29 Oct 2016 13:12:06 +0000
treeherdermozilla-central@16cdd6273c48 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
bugs1310547
milestone52.0a1
backs outb119fb8bc703861588bfefcd70f6c42ef9d391f2
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Backed out changeset b119fb8bc703 (bug 1310547)
ipc/glue/MessageChannel.cpp
ipc/glue/MessageChannel.h
--- a/ipc/glue/MessageChannel.cpp
+++ b/ipc/glue/MessageChannel.cpp
@@ -484,39 +484,32 @@ MessageChannel::MessageChannel(MessageLi
     mInTimeoutSecondHalf(false),
     mNextSeqno(0),
     mLastSendError(SyncSendError::SendSuccess),
     mDispatchingAsyncMessage(false),
     mDispatchingAsyncMessageNestedLevel(0),
     mTransactionStack(nullptr),
     mTimedOutMessageSeqno(0),
     mTimedOutMessageNestedLevel(0),
-#if defined(MOZ_CRASHREPORTER) && defined(OS_WIN)
-    mPending(AnnotateAllocator<Message>(*this)),
-#endif
-    mRemoteStackDepthGuess(false),
+    mRemoteStackDepthGuess(0),
     mSawInterruptOutMsg(false),
     mIsWaitingForIncoming(false),
     mAbortOnError(false),
     mNotifiedChannelDone(false),
     mFlags(REQUIRE_DEFAULT),
     mPeerPidSet(false),
     mPeerPid(-1)
 {
     MOZ_COUNT_CTOR(ipc::MessageChannel);
 
 #ifdef OS_WIN
     mTopFrame = nullptr;
     mIsSyncWaitingOnNonMainThread = false;
 #endif
 
-    RefPtr<CancelableRunnable> runnable =
-        NewNonOwningCancelableRunnableMethod(this, &MessageChannel::OnMaybeDequeueOne);
-    mDequeueOneTask = new RefCountedTask(runnable.forget());
-
     mOnChannelConnectedTask =
         NewNonOwningCancelableRunnableMethod(this, &MessageChannel::DispatchOnChannelConnected);
 
 #ifdef OS_WIN
     mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
     MOZ_RELEASE_ASSERT(mEvent, "CreateEvent failed! Nothing is going to work!");
 #endif
 }
@@ -630,31 +623,33 @@ MessageChannel::Clear()
     //
     // In practice, mListener owns the channel, so the channel gets deleted
     // before mListener.  But just to be safe, mListener is a weak pointer.
 
     if (gParentProcessBlocker == this) {
         gParentProcessBlocker = nullptr;
     }
 
-    mDequeueOneTask->Cancel();
-
     mWorkerLoop = nullptr;
     delete mLink;
     mLink = nullptr;
 
     mOnChannelConnectedTask->Cancel();
 
     if (mChannelErrorTask) {
         mChannelErrorTask->Cancel();
         mChannelErrorTask = nullptr;
     }
 
     // Free up any memory used by pending messages.
+    for (RefPtr<MessageTask> task : mPending) {
+        task->Clear();
+    }
     mPending.clear();
+
     mOutOfTurnReplies.clear();
     while (!mDeferred.empty()) {
         mDeferred.pop();
     }
 }
 
 bool
 MessageChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide)
@@ -922,49 +917,47 @@ MessageChannel::OnMessageReceivedFromLin
     }
 
     // Nested messages cannot be compressed.
     MOZ_RELEASE_ASSERT(aMsg.compress_type() == IPC::Message::COMPRESSION_NONE ||
                        aMsg.nested_level() == IPC::Message::NOT_NESTED);
 
     bool compress = false;
     if (aMsg.compress_type() == IPC::Message::COMPRESSION_ENABLED) {
-        compress = (!mPending.empty() &&
-                    mPending.back().type() == aMsg.type() &&
-                    mPending.back().routing_id() == aMsg.routing_id());
+        compress = (!mPending.isEmpty() &&
+                    mPending.getLast()->Msg().type() == aMsg.type() &&
+                    mPending.getLast()->Msg().routing_id() == aMsg.routing_id());
         if (compress) {
             // This message type has compression enabled, and the back of the
             // queue was the same message type and routed to the same destination.
             // Replace it with the newer message.
-            MOZ_RELEASE_ASSERT(mPending.back().compress_type() ==
-                                  IPC::Message::COMPRESSION_ENABLED);
-            mPending.pop_back();
+            MOZ_RELEASE_ASSERT(mPending.getLast()->Msg().compress_type() ==
+                               IPC::Message::COMPRESSION_ENABLED);
+            mPending.popLast();
         }
-    } else if (aMsg.compress_type() == IPC::Message::COMPRESSION_ALL) {
-        // Check the message queue for another message with this type/destination.
-        auto it = std::find_if(mPending.rbegin(), mPending.rend(),
-                               MatchingKinds(aMsg.type(), aMsg.routing_id()));
-        if (it != mPending.rend()) {
-            // This message type has compression enabled, and the queue holds
-            // a message with the same message type and routed to the same destination.
-            // Erase it.  Note that, since we always compress these redundancies, There Can
-            // Be Only One.
-            compress = true;
-            MOZ_RELEASE_ASSERT((*it).compress_type() == IPC::Message::COMPRESSION_ALL);
-            mPending.erase((++it).base());
+    } else if (aMsg.compress_type() == IPC::Message::COMPRESSION_ALL && !mPending.isEmpty()) {
+        for (RefPtr<MessageTask> p = mPending.getLast(); p; p = p->getPrevious()) {
+            if (p->Msg().type() == aMsg.type() &&
+                p->Msg().routing_id() == aMsg.routing_id())
+            {
+                compress = true;
+                MOZ_RELEASE_ASSERT(p->Msg().compress_type() == IPC::Message::COMPRESSION_ALL);
+                p->remove();
+                break;
+            }
         }
     }
 
     bool wakeUpSyncSend = AwaitingSyncReply() && !ShouldDeferMessage(aMsg);
 
     bool shouldWakeUp = AwaitingInterruptReply() ||
                         wakeUpSyncSend ||
                         AwaitingIncomingMessage();
 
-    // Although we usually don't need to post an OnMaybeDequeueOne task if
+    // Although we usually don't need to post a message task if
     // shouldWakeUp is true, it's easier to post anyway than to have to
     // guarantee that every Send call processes everything it's supposed to
     // before returning.
     bool shouldPostTask = !shouldWakeUp || wakeUpSyncSend;
 
     IPC_LOG("Receive on link thread; seqno=%d, xid=%d, shouldWakeUp=%d",
             aMsg.seqno(), aMsg.transaction_id(), shouldWakeUp);
 
@@ -985,83 +978,83 @@ MessageChannel::OnMessageReceivedFromLin
     //
     // (3) We are not waiting on a reply.
     //   - We post a task to the main event loop.
     //
     // Note that, we may notify the main thread even though the monitor is not
     // blocked. This is okay, since we always check for pending events before
     // blocking again.
 
-    mPending.push_back(Move(aMsg));
+    RefPtr<MessageTask> task = new MessageTask(this, Move(aMsg));
+    mPending.insertBack(task);
 
     if (shouldWakeUp) {
         NotifyWorkerThread();
     }
 
     if (shouldPostTask) {
-        if (!compress) {
-            // If we compressed away the previous message, we'll re-use
-            // its pending task.
-            RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
-            mWorkerLoop->PostTask(task.forget());
-        }
+        task->Post();
     }
 }
 
 void
 MessageChannel::PeekMessages(mozilla::function<bool(const Message& aMsg)> aInvoke)
 {
+    // FIXME: We shouldn't be holding the lock for aInvoke!
     MonitorAutoLock lock(*mMonitor);
 
-    for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); it++) {
-        Message &msg = *it;
+    for (RefPtr<MessageTask> it : mPending) {
+        const Message &msg = it->Msg();
         if (!aInvoke(msg)) {
             break;
         }
     }
 }
 
 void
 MessageChannel::ProcessPendingRequests(AutoEnterTransaction& aTransaction)
 {
+    mMonitor->AssertCurrentThreadOwns();
+
     IPC_LOG("ProcessPendingRequests for seqno=%d, xid=%d",
             aTransaction.SequenceNumber(), aTransaction.TransactionID());
 
     // Loop until there aren't any more nested messages to process.
     for (;;) {
         // If we canceled during ProcessPendingRequest, then we need to leave
         // immediately because the results of ShouldDeferMessage will be
         // operating with weird state (as if no Send is in progress). That could
         // cause even NOT_NESTED sync messages to be processed (but not
         // NOT_NESTED async messages), which would break message ordering.
         if (aTransaction.IsCanceled()) {
             return;
         }
 
         mozilla::Vector<Message> toProcess;
 
-        for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); ) {
-            Message &msg = *it;
+        for (RefPtr<MessageTask> p = mPending.getFirst(); p; ) {
+            Message &msg = p->Msg();
 
             MOZ_RELEASE_ASSERT(!aTransaction.IsCanceled(),
                                "Calling ShouldDeferMessage when cancelled");
             bool defer = ShouldDeferMessage(msg);
 
             // Only log the interesting messages.
             if (msg.is_sync() || msg.nested_level() == IPC::Message::NESTED_INSIDE_CPOW) {
                 IPC_LOG("ShouldDeferMessage(seqno=%d) = %d", msg.seqno(), defer);
             }
 
             if (!defer) {
                 if (!toProcess.append(Move(msg)))
                     MOZ_CRASH();
-                it = mPending.erase(it);
+
+                p = p->removeAndGetNext();
                 continue;
             }
-            it++;
+            p = p->getNext();
         }
 
         if (toProcess.empty()) {
             break;
         }
 
         // Processing these messages could result in more messages, so we
         // loop around to check for more afterwards.
@@ -1353,19 +1346,19 @@ MessageChannel::Call(Message* aMsg, Mess
         Message recvd;
         MessageMap::iterator it;
 
         if ((it = mOutOfTurnReplies.find(mInterruptStack.top().seqno()))
             != mOutOfTurnReplies.end())
         {
             recvd = Move(it->second);
             mOutOfTurnReplies.erase(it);
-        } else if (!mPending.empty()) {
-            recvd = Move(mPending.front());
-            mPending.pop_front();
+        } else if (!mPending.isEmpty()) {
+            RefPtr<MessageTask> task = mPending.popFirst();
+            recvd = Move(task->Msg());
         } else {
             // because of subtleties with nested event loops, it's possible
             // that we got here and nothing happened.  or, we might have a
             // deferred in-call that needs to be processed.  either way, we
             // won't break the inner while loop again until something new
             // happens.
             continue;
         }
@@ -1444,47 +1437,48 @@ MessageChannel::Call(Message* aMsg, Mess
 bool
 MessageChannel::WaitForIncomingMessage()
 {
 #ifdef OS_WIN
     SyncStackFrame frame(this, true);
     NeuteredWindowRegion neuteredRgn(mFlags & REQUIRE_DEFERRED_MESSAGE_PROTECTION);
 #endif
 
-    { // Scope for lock
-        MonitorAutoLock lock(*mMonitor);
-        AutoEnterWaitForIncoming waitingForIncoming(*this);
-        if (mChannelState != ChannelConnected) {
-            return false;
-        }
-        if (!HasPendingEvents()) {
-            return WaitForInterruptNotify();
-        }
+    MonitorAutoLock lock(*mMonitor);
+    AutoEnterWaitForIncoming waitingForIncoming(*this);
+    if (mChannelState != ChannelConnected) {
+        return false;
     }
-
-    return OnMaybeDequeueOne();
+    if (!HasPendingEvents()) {
+        return WaitForInterruptNotify();
+    }
+
+    MOZ_RELEASE_ASSERT(!mPending.isEmpty());
+    RefPtr<MessageTask> task = mPending.getFirst();
+    RunMessage(*task);
+    return true;
 }
 
 bool
 MessageChannel::HasPendingEvents()
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
-    return Connected() && !mPending.empty();
+    return Connected() && !mPending.isEmpty();
 }
 
 bool
 MessageChannel::InterruptEventOccurred()
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
     IPC_ASSERT(InterruptStackDepth() > 0, "not in wait loop");
 
     return (!Connected() ||
-            !mPending.empty() ||
+            !mPending.isEmpty() ||
             (!mOutOfTurnReplies.empty() &&
              mOutOfTurnReplies.find(mInterruptStack.top().seqno()) !=
              mOutOfTurnReplies.end()));
 }
 
 bool
 MessageChannel::ProcessPendingRequest(Message &&aUrgent)
 {
@@ -1498,95 +1492,161 @@ MessageChannel::ProcessPendingRequest(Me
         ReportConnectionError("MessageChannel::ProcessPendingRequest");
         return false;
     }
 
     return true;
 }
 
 bool
-MessageChannel::DequeueOne(Message *recvd)
+MessageChannel::ShouldRunMessage(const Message& aMsg)
 {
-    AssertWorkerThread();
-    mMonitor->AssertCurrentThreadOwns();
-
-    if (!Connected()) {
-        ReportConnectionError("OnMaybeDequeueOne");
-        return false;
+    if (!mTimedOutMessageSeqno) {
+        return true;
     }
 
-    if (!mDeferred.empty())
-        MaybeUndeferIncall();
-
     // If we've timed out a message and we're awaiting the reply to the timed
     // out message, we have to be careful what messages we process. Here's what
     // can go wrong:
     // 1. child sends a NOT_NESTED sync message S
     // 2. parent sends a NESTED_INSIDE_SYNC sync message H at the same time
     // 3. parent times out H
     // 4. child starts processing H and sends a NESTED_INSIDE_SYNC message H' nested
     //    within the same transaction
     // 5. parent dispatches S and sends reply
     // 6. child asserts because it instead expected a reply to H'.
     //
     // To solve this, we refuse to process S in the parent until we get a reply
     // to H. More generally, let the timed out message be M. We don't process a
     // message unless the child would need the response to that message in order
     // to process M. Those messages are the ones that have a higher nested level
     // than M or that are part of the same transaction as M.
-    if (mTimedOutMessageSeqno) {
-        for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); it++) {
-            Message &msg = *it;
-            if (msg.nested_level() > mTimedOutMessageNestedLevel ||
-                (msg.nested_level() == mTimedOutMessageNestedLevel
-                 && msg.transaction_id() == mTimedOutMessageSeqno))
-            {
-                *recvd = Move(msg);
-                mPending.erase(it);
-                return true;
-            }
-        }
+    if (aMsg.nested_level() < mTimedOutMessageNestedLevel ||
+        (aMsg.nested_level() == mTimedOutMessageNestedLevel
+         && aMsg.transaction_id() != mTimedOutMessageSeqno))
+    {
         return false;
     }
 
-    if (mPending.empty())
-        return false;
-
-    *recvd = Move(mPending.front());
-    mPending.pop_front();
-    return true;
-}
-
-bool
-MessageChannel::OnMaybeDequeueOne()
-{
-    AssertWorkerThread();
-    mMonitor->AssertNotCurrentThreadOwns();
-
-    Message recvd;
-
-    MonitorAutoLock lock(*mMonitor);
-    if (!DequeueOne(&recvd))
-        return false;
-
-    if (IsOnCxxStack() && recvd.is_interrupt() && recvd.is_reply()) {
-        // We probably just received a reply in a nested loop for an
-        // Interrupt call sent before entering that loop.
-        mOutOfTurnReplies[recvd.seqno()] = Move(recvd);
-        return false;
-    }
-
-    DispatchMessage(Move(recvd));
-
     return true;
 }
 
 void
+MessageChannel::RunMessage(MessageTask& aTask)
+{
+    AssertWorkerThread();
+    mMonitor->AssertCurrentThreadOwns();
+
+    Message& msg = aTask.Msg();
+
+    if (!Connected()) {
+        ReportConnectionError("OnMaybeDequeueOne");
+        return;
+    }
+
+    // Check that we're going to run the first message that's valid to run.
+#ifdef DEBUG
+    for (RefPtr<MessageTask> task : mPending) {
+        if (task == &aTask) {
+            break;
+        }
+        MOZ_ASSERT(!ShouldRunMessage(task->Msg()));
+    }
+#endif
+
+    if (!mDeferred.empty()) {
+        MaybeUndeferIncall();
+    }
+
+    if (!ShouldRunMessage(msg)) {
+        return;
+    }
+
+    MOZ_RELEASE_ASSERT(aTask.isInList());
+    aTask.remove();
+
+    if (IsOnCxxStack() && msg.is_interrupt() && msg.is_reply()) {
+        // We probably just received a reply in a nested loop for an
+        // Interrupt call sent before entering that loop.
+        mOutOfTurnReplies[msg.seqno()] = Move(msg);
+        return;
+    }
+
+    DispatchMessage(Move(msg));
+}
+
+nsresult
+MessageChannel::MessageTask::Run()
+{
+    if (!mChannel) {
+        return NS_OK;
+    }
+
+    mChannel->AssertWorkerThread();
+    mChannel->mMonitor->AssertNotCurrentThreadOwns();
+
+    MonitorAutoLock lock(*mChannel->mMonitor);
+
+    // In case we choose not to run this message, we may need to be able to Post
+    // it again.
+    mScheduled = false;
+
+    if (!isInList()) {
+        return NS_OK;
+    }
+
+    mChannel->RunMessage(*this);
+    return NS_OK;
+}
+
+nsresult
+MessageChannel::MessageTask::Cancel()
+{
+    if (!mChannel) {
+        return NS_OK;
+    }
+
+    mChannel->AssertWorkerThread();
+    mChannel->mMonitor->AssertNotCurrentThreadOwns();
+
+    MonitorAutoLock lock(*mChannel->mMonitor);
+
+    if (!isInList()) {
+        return NS_OK;
+    }
+    remove();
+
+    return NS_OK;
+}
+
+void
+MessageChannel::MessageTask::Post()
+{
+    MOZ_RELEASE_ASSERT(!mScheduled);
+    MOZ_RELEASE_ASSERT(isInList());
+
+    RefPtr<MessageTask> self = this;
+    mChannel->mWorkerLoop->PostTask(self.forget());
+    mScheduled = true;
+}
+
+void
+MessageChannel::MessageTask::Clear()
+{
+    mChannel->AssertWorkerThread();
+
+    mChannel = nullptr;
+}
+
+void
 MessageChannel::DispatchMessage(Message &&aMsg)
 {
+    AssertWorkerThread();
+    mMonitor->AssertCurrentThreadOwns();
+
     Maybe<AutoNoJSAPI> nojsapi;
     if (ScriptSettingsInitialized() && NS_IsMainThread())
         nojsapi.emplace();
 
     nsAutoPtr<Message> reply;
 
     IPC_LOG("DispatchMessage: seqno=%d, xid=%d", aMsg.seqno(), aMsg.transaction_id());
 
@@ -1775,17 +1835,19 @@ MessageChannel::MaybeUndeferIncall()
     Message call(Move(mDeferred.top()));
     mDeferred.pop();
 
     // fix up fudge factor we added to account for race
     IPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
     --mRemoteStackDepthGuess;
 
     MOZ_RELEASE_ASSERT(call.nested_level() == IPC::Message::NOT_NESTED);
-    mPending.push_back(Move(call));
+    RefPtr<MessageTask> task = new MessageTask(this, Move(call));
+    mPending.insertBack(task);
+    task->Post();
 }
 
 void
 MessageChannel::ExitedCxxStack()
 {
     mListener->OnExitedCxxStack();
     if (mSawInterruptOutMsg) {
         MonitorAutoLock lock(*mMonitor);
@@ -1798,28 +1860,20 @@ MessageChannel::ExitedCxxStack()
 void
 MessageChannel::EnqueuePendingMessages()
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
 
     MaybeUndeferIncall();
 
-    for (size_t i = 0; i < mDeferred.size(); ++i) {
-        RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
-        mWorkerLoop->PostTask(task.forget());
-    }
-
     // XXX performance tuning knob: could process all or k pending
     // messages here, rather than enqueuing for later processing
 
-    for (size_t i = 0; i < mPending.size(); ++i) {
-        RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
-        mWorkerLoop->PostTask(task.forget());
-    }
+    RepostAllMessages();
 }
 
 static inline bool
 IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout)
 {
     return (aTimeout != PR_INTERVAL_NO_TIMEOUT) &&
            (aTimeout <= (PR_IntervalNow() - aStart));
 }
@@ -2264,26 +2318,24 @@ MessageChannel::DebugAbort(const char* f
     // technically we need the mutex for this, but we're dying anyway
     DumpInterruptStack("  ");
     printf_stderr("  remote Interrupt stack guess: %" PRIuSIZE "\n",
                   mRemoteStackDepthGuess);
     printf_stderr("  deferred stack size: %" PRIuSIZE "\n",
                   mDeferred.size());
     printf_stderr("  out-of-turn Interrupt replies stack size: %" PRIuSIZE "\n",
                   mOutOfTurnReplies.size());
-    printf_stderr("  Pending queue size: %" PRIuSIZE ", front to back:\n",
-                  mPending.size());
 
     MessageQueue pending = Move(mPending);
-    while (!pending.empty()) {
+    while (!pending.isEmpty()) {
         printf_stderr("    [ %s%s ]\n",
-                      pending.front().is_interrupt() ? "intr" :
-                      (pending.front().is_sync() ? "sync" : "async"),
-                      pending.front().is_reply() ? "reply" : "");
-        pending.pop_front();
+                      pending.getFirst()->Msg().is_interrupt() ? "intr" :
+                      (pending.getFirst()->Msg().is_sync() ? "sync" : "async"),
+                      pending.getFirst()->Msg().is_reply() ? "reply" : "");
+        pending.popFirst();
     }
 
     NS_RUNTIMEABORT(why);
 }
 
 void
 MessageChannel::DumpInterruptStack(const char* const pfx) const
 {
@@ -2321,23 +2373,43 @@ void
 MessageChannel::EndTimeout()
 {
     mMonitor->AssertCurrentThreadOwns();
 
     IPC_LOG("Ending timeout of seqno=%d", mTimedOutMessageSeqno);
     mTimedOutMessageSeqno = 0;
     mTimedOutMessageNestedLevel = 0;
 
-    for (size_t i = 0; i < mPending.size(); i++) {
-        // There may be messages in the queue that we expected to process from
-        // OnMaybeDequeueOne. But during the timeout, that function will skip
-        // some messages. Now they're ready to be processed, so we enqueue more
-        // tasks.
-        RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
-        mWorkerLoop->PostTask(task.forget());
+    RepostAllMessages();
+}
+
+void
+MessageChannel::RepostAllMessages()
+{
+    bool needRepost = false;
+    for (RefPtr<MessageTask> task : mPending) {
+        if (!task->IsScheduled()) {
+            needRepost = true;
+        }
+    }
+    if (!needRepost) {
+        // If everything is already scheduled to run, do nothing.
+        return;
+    }
+
+    // In some cases we may have deferred dispatch of some messages in the
+    // queue. Now we want to run them again. However, we can't just re-post
+    // those messages since the messages after them in mPending would then be
+    // before them in the event queue. So instead we cancel everything and
+    // re-post all messages in the correct order.
+    MessageQueue queue = Move(mPending);
+    while (RefPtr<MessageTask> task = queue.popFirst()) {
+        RefPtr<MessageTask> newTask = new MessageTask(this, Move(task->Msg()));
+        mPending.insertBack(newTask);
+        newTask->Post();
     }
 }
 
 void
 MessageChannel::CancelTransaction(int transaction)
 {
     mMonitor->AssertCurrentThreadOwns();
 
@@ -2370,33 +2442,33 @@ MessageChannel::CancelTransaction(int tr
             mTransactionStack->Cancel();
         }
     } else {
         MOZ_RELEASE_ASSERT(mTransactionStack->TransactionID() == transaction);
         mTransactionStack->Cancel();
     }
 
     bool foundSync = false;
-    for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); ) {
-        Message &msg = *it;
+    for (RefPtr<MessageTask> p = mPending.getFirst(); p; ) {
+        Message &msg = p->Msg();
 
         // If there was a race between the parent and the child, then we may
         // have a queued sync message. We want to drop this message from the
         // queue since if will get cancelled along with the transaction being
         // cancelled. This happens if the message in the queue is NESTED_INSIDE_SYNC.
         if (msg.is_sync() && msg.nested_level() != IPC::Message::NOT_NESTED) {
             MOZ_RELEASE_ASSERT(!foundSync);
             MOZ_RELEASE_ASSERT(msg.transaction_id() != transaction);
             IPC_LOG("Removing msg from queue seqno=%d xid=%d", msg.seqno(), msg.transaction_id());
             foundSync = true;
-            it = mPending.erase(it);
+            p = p->removeAndGetNext();
             continue;
         }
 
-        it++;
+        p = p->getNext();
     }
 }
 
 bool
 MessageChannel::IsInTransaction() const
 {
     MonitorAutoLock lock(*mMonitor);
     return !!mTransactionStack;
--- a/ipc/glue/MessageChannel.h
+++ b/ipc/glue/MessageChannel.h
@@ -269,20 +269,16 @@ class MessageChannel : HasResultCodes
     bool HasPendingEvents();
 
     void ProcessPendingRequests(AutoEnterTransaction& aTransaction);
     bool ProcessPendingRequest(Message &&aUrgent);
 
     void MaybeUndeferIncall();
     void EnqueuePendingMessages();
 
-    // Executed on the worker thread. Dequeues one pending message.
-    bool OnMaybeDequeueOne();
-    bool DequeueOne(Message *recvd);
-
     // Dispatches an incoming message to its appropriate handler.
     void DispatchMessage(Message &&aMsg);
 
     // DispatchMessage will route to one of these functions depending on the
     // protocol type of the message.
     void DispatchSyncMessage(const Message &aMsg, Message*& aReply);
     void DispatchUrgentMessage(const Message &aMsg);
     void DispatchAsyncMessage(const Message &aMsg);
@@ -304,16 +300,18 @@ class MessageChannel : HasResultCodes
 
     bool WaitResponse(bool aWaitTimedOut);
 
     bool ShouldContinueFromTimeout();
 
     void EndTimeout();
     void CancelTransaction(int transaction);
 
+    void RepostAllMessages();
+
     // The "remote view of stack depth" can be different than the
     // actual stack depth when there are out-of-turn replies.  When we
     // receive one, our actual Interrupt stack depth doesn't decrease, but
     // the other side (that sent the reply) thinks it has.  So, the
     // "view" returned here is |stackDepth| minus the number of
     // out-of-turn replies.
     //
     // Only called from the worker thread.
@@ -449,147 +447,66 @@ class MessageChannel : HasResultCodes
     // NOT our worker thread.
     void AssertLinkThread() const
     {
         MOZ_RELEASE_ASSERT(mWorkerLoopID != MessageLoop::current()->id(),
                            "on worker thread but should not be!");
     }
 
   private:
-#if defined(MOZ_CRASHREPORTER) && defined(OS_WIN)
-    // TODO: Remove the condition OS_WIN above once we move to GCC 5 or higher,
-    // the code will be able to get compiled as std::deque will meet C++11
-    // allocator requirements.
-    template<class T>
-    struct AnnotateAllocator
+    class MessageTask :
+        public CancelableRunnable,
+        public LinkedListElement<RefPtr<MessageTask>>
     {
-      typedef T value_type;
-      AnnotateAllocator(MessageChannel& channel) : mChannel(channel) {}
-      template<class U> AnnotateAllocator(const AnnotateAllocator<U>& other) :
-        mChannel(other.mChannel) {}
-      template<class U> bool operator==(const AnnotateAllocator<U>&) { return true; }
-      template<class U> bool operator!=(const AnnotateAllocator<U>&) { return false; }
-      T* allocate(size_t n) {
-        void* p = ::operator new(n * sizeof(T), std::nothrow);
-        if (!p && n) {
-          // Sort the pending messages by its type, note the sorting algorithm
-          // has to be in-place to avoid memory allocation.
-          MessageQueue& q = mChannel.mPending;
-          std::sort(q.begin(), q.end(), [](const Message& a, const Message& b) {
-            return a.type() < b.type();
-          });
+    public:
+        explicit MessageTask(MessageChannel* aChannel, Message&& aMessage)
+          : mChannel(aChannel), mMessage(Move(aMessage)), mScheduled(false)
+        {}
+
+        NS_IMETHOD Run() override;
+        NS_IMETHOD Cancel() override;
+        void Post();
+        void Clear();
+
+        bool IsScheduled() const { return mScheduled; }
 
-          // Iterate over the sorted queue to find the message that has the
-          // highest number of count.
-          const char* topName = nullptr;
-          const char* curName = nullptr;
-          msgid_t topType = 0, curType = 0;
-          uint32_t topCount = 0, curCount = 0;
-          for (MessageQueue::iterator it = q.begin(); it != q.end(); ++it) {
-            Message &msg = *it;
-            if (msg.type() == curType) {
-              ++curCount;
-            } else {
-              if (curCount > topCount) {
-                topName = curName;
-                topType = curType;
-                topCount = curCount;
-              }
-              curName = StringFromIPCMessageType(msg.type());
-              curType = msg.type();
-              curCount = 1;
-            }
-          }
-          // In case the last type is the top one.
-          if (curCount > topCount) {
-            topName = curName;
-            topType = curType;
-            topCount = curCount;
-          }
+        Message& Msg() { return mMessage; }
+        const Message& Msg() const { return mMessage; }
+
+    private:
+        MessageTask() = delete;
+        MessageTask(const MessageTask&) = delete;
 
-          CrashReporter::AnnotatePendingIPC(q.size(), topCount, topName, topType);
+        MessageChannel* mChannel;
+        Message mMessage;
+        bool mScheduled : 1;
+    };
 
-          mozalloc_handle_oom(n * sizeof(T));
-        }
-        return static_cast<T*>(p);
-      }
-      void deallocate(T* p, size_t n) {
-        ::operator delete(p);
-      }
-      MessageChannel& mChannel;
-    };
-    typedef std::deque<Message, AnnotateAllocator<Message>> MessageQueue;
-#else
-    typedef std::deque<Message> MessageQueue;
-#endif
+    bool ShouldRunMessage(const Message& aMsg);
+    void RunMessage(MessageTask& aTask);
+
+    typedef LinkedList<RefPtr<MessageTask>> MessageQueue;
     typedef std::map<size_t, Message> MessageMap;
     typedef IPC::Message::msgid_t msgid_t;
 
-    // XXXkhuey this can almost certainly die.
-    // All dequeuing tasks require a single point of cancellation,
-    // which is handled via a reference-counted task.
-    class RefCountedTask
-    {
-      public:
-        explicit RefCountedTask(already_AddRefed<CancelableRunnable> aTask)
-          : mTask(aTask)
-        { }
-      private:
-        ~RefCountedTask() { }
-      public:
-        void Run() { mTask->Run(); }
-        void Cancel() { mTask->Cancel(); }
-
-        NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedTask)
-
-      private:
-        RefPtr<CancelableRunnable> mTask;
-    };
-
-    // Wrap an existing task which can be cancelled at any time
-    // without the wrapper's knowledge.
-    class DequeueTask : public CancelableRunnable
-    {
-      public:
-        explicit DequeueTask(RefCountedTask* aTask)
-          : mTask(aTask)
-        { }
-        NS_IMETHOD Run() override {
-          if (mTask) {
-            mTask->Run();
-          }
-          return NS_OK;
-        }
-        nsresult Cancel() override {
-          mTask = nullptr;
-          return NS_OK;
-        }
-
-      private:
-        RefPtr<RefCountedTask> mTask;
-    };
-
   private:
     // Based on presumption the listener owns and overlives the channel,
     // this is never nullified.
     MessageListener* mListener;
     ChannelState mChannelState;
     RefPtr<RefCountedMonitor> mMonitor;
     Side mSide;
     MessageLink* mLink;
     MessageLoop* mWorkerLoop;           // thread where work is done
     RefPtr<CancelableRunnable> mChannelErrorTask;  // NotifyMaybeChannelError runnable
 
     // id() of mWorkerLoop.  This persists even after mWorkerLoop is cleared
     // during channel shutdown.
     int mWorkerLoopID;
 
-    // A task encapsulating dequeuing one pending message.
-    RefPtr<RefCountedTask> mDequeueOneTask;
-
     // Timeout periods are broken up in two to prevent system suspension from
     // triggering an abort. This method (called by WaitForEvent with a 'did
     // timeout' flag) decides if we should wait again for half of mTimeoutMs
     // or give up.
     int32_t mTimeoutMs;
     bool mInTimeoutSecondHalf;
 
     // Worker-thread only; sequence numbers for messages that require
@@ -666,46 +583,44 @@ class MessageChannel : HasResultCodes
     // error.
     //
     // A message is only timed out if it initiated a transaction. This avoids
     // hitting a lot of corner cases with message nesting that we don't really
     // care about.
     int32_t mTimedOutMessageSeqno;
     int mTimedOutMessageNestedLevel;
 
-    // Queue of all incoming messages, except for replies to sync and urgent
-    // messages, which are delivered directly to mRecvd, and any pending urgent
-    // incall, which is stored in mPendingUrgentRequest.
+    // Queue of all incoming messages.
     //
     // If both this side and the other side are functioning correctly, the queue
     // can only be in certain configurations.  Let
     //
     //   |A<| be an async in-message,
     //   |S<| be a sync in-message,
     //   |C<| be an Interrupt in-call,
     //   |R<| be an Interrupt reply.
     //
     // The queue can only match this configuration
     //
-    //  A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<)))
+    //  A<* (S< | C< | R< (?{mInterruptStack.size() == 1} A<* (S< | C<)))
     //
     // The other side can send as many async messages |A<*| as it wants before
     // sending us a blocking message.
     //
     // The first case is |S<|, a sync in-msg.  The other side must be blocked,
     // and thus can't send us any more messages until we process the sync
     // in-msg.
     //
     // The second case is |C<|, an Interrupt in-call; the other side must be blocked.
     // (There's a subtlety here: this in-call might have raced with an
     // out-call, but we detect that with the mechanism below,
     // |mRemoteStackDepth|, and races don't matter to the queue.)
     //
     // Final case, the other side replied to our most recent out-call |R<|.
-    // If that was the *only* out-call on our stack, |?{mStack.size() == 1}|,
+    // If that was the *only* out-call on our stack, |?{mInterruptStack.size() == 1}|,
     // then other side "finished with us," and went back to its own business.
     // That business might have included sending any number of async message
     // |A<*| until sending a blocking message |(S< | C<)|.  If we had more than
     // one Interrupt call on our stack, the other side *better* not have sent us
     // another blocking message, because it's blocked on a reply from us.
     //
     MessageQueue mPending;
 
@@ -720,17 +635,17 @@ class MessageChannel : HasResultCodes
     // calls.  With each Interrupt out-call sent, we send along what *we* think the
     // stack depth of the remote side is *before* it will receive the Interrupt call.
     //
     // After sending the out-call, our stack depth is "incremented" by pushing
     // that pending message onto mPending.
     //
     // Then when processing an in-call |c|, it must be true that
     //
-    //   mStack.size() == c.remoteDepth
+    //   mInterruptStack.size() == c.remoteDepth
     //
     // I.e., my depth is actually the same as what the other side thought it
     // was when it sent in-call |c|.  If this fails to hold, we have detected
     // racy Interrupt calls.
     //
     // We then increment mRemoteStackDepth *just before* processing the
     // in-call, since we know the other side is waiting on it, and decrement
     // it *just after* finishing processing that in-call, since our response