ipc/glue/MessageChannel.cpp
author Sebastian Hengst <archaeopteryx@coole-files.de>
Sat, 29 Oct 2016 13:44:45 +0200
changeset 320128 c4f2be33044f293bba6518622bbf4043b48e57bb
parent 320127 a171aae3ae491dd52e184735c51924c7a467a0bf
child 320130 4c5b55684c21a261a1f4187b290d6ac7b4d5891c
permissions -rw-r--r--
Backed out changeset a171aae3ae49 (bug 1312960)

/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
 * vim: sw=4 ts=4 et :
 */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include "mozilla/ipc/MessageChannel.h"
#include "mozilla/ipc/ProtocolUtils.h"

#include "mozilla/dom/ScriptSettings.h"

#include "mozilla/Assertions.h"
#include "mozilla/DebugOnly.h"
#include "mozilla/Move.h"
#include "mozilla/SizePrintfMacros.h"
#include "mozilla/Sprintf.h"
#include "mozilla/Telemetry.h"
#include "mozilla/Logging.h"
#include "nsAutoPtr.h"
#include "nsDebug.h"
#include "nsISupportsImpl.h"
#include "nsContentUtils.h"

using mozilla::Move;

// Undo the damage done by mozzconf.h
#undef compress

// Logging seems to be somewhat broken on b2g.
#ifdef MOZ_B2G
#define IPC_LOG(...)
#else
static mozilla::LazyLogModule sLogModule("ipc");
#define IPC_LOG(...) MOZ_LOG(sLogModule, LogLevel::Debug, (__VA_ARGS__))
#endif

/*
 * IPC design:
 *
 * There are three kinds of messages: async, sync, and intr. Sync and intr
 * messages are blocking.
 *
 * Terminology: To dispatch a message Foo is to run the RecvFoo code for
 * it. This is also called "handling" the message.
 *
 * Sync and async messages can sometimes "nest" inside other sync messages
 * (i.e., while waiting for the sync reply, we can dispatch the inner
 * message). Intr messages cannot nest.  The three possible nesting levels are
 * NOT_NESTED, NESTED_INSIDE_SYNC, and NESTED_INSIDE_CPOW.  The intended uses
 * are:
 *   NOT_NESTED - most messages.
 *   NESTED_INSIDE_SYNC - CPOW-related messages, which are always sync
 *                        and can go in either direction.
 *   NESTED_INSIDE_CPOW - messages where we don't want to dispatch
 *                        incoming CPOWs while waiting for the response.
 * These nesting levels are ordered: NOT_NESTED, NESTED_INSIDE_SYNC,
 * NESTED_INSIDE_CPOW.  Async messages cannot be NESTED_INSIDE_SYNC but they can
 * be NESTED_INSIDE_CPOW.
 *
 * To avoid jank, the parent process is not allowed to send NOT_NESTED sync messages.
 * When a process is waiting for a response to a sync message
 * M0, it will dispatch an incoming message M if:
 *   1. M has a higher nesting level than M0, or
 *   2. if M has the same nesting level as M0 and we're in the child, or
 *   3. if M has the same nesting level as M0 and it was sent by the other side
 *      while dispatching M0.
 * The idea is that messages with higher nesting should take precendence. The
 * purpose of rule 2 is to handle a race where both processes send to each other
 * simultaneously. In this case, we resolve the race in favor of the parent (so
 * the child dispatches first).
 *
 * Messages satisfy the following properties:
 *   A. When waiting for a response to a sync message, we won't dispatch any
 *      messages of nesting level.
 *   B. Messages of the same nesting level will be dispatched roughly in the
 *      order they were sent. The exception is when the parent and child send
 *      sync messages to each other simulataneously. In this case, the parent's
 *      message is dispatched first. While it is dispatched, the child may send
 *      further nested messages, and these messages may be dispatched before the
 *      child's original message. We can consider ordering to be preserved here
 *      because we pretend that the child's original message wasn't sent until
 *      after the parent's message is finished being dispatched.
 *
 * When waiting for a sync message reply, we dispatch an async message only if
 * it is NESTED_INSIDE_CPOW. Normally NESTED_INSIDE_CPOW async
 * messages are sent only from the child. However, the parent can send
 * NESTED_INSIDE_CPOW async messages when it is creating a bridged protocol.
 *
 * Intr messages are blocking and can nest, but they don't participate in the
 * nesting levels. While waiting for an intr response, all incoming messages are
 * dispatched until a response is received. When two intr messages race with
 * each other, a similar scheme is used to ensure that one side wins. The
 * winning side is chosen based on the message type.
 *
 * Intr messages differ from sync messages in that, while sending an intr
 * message, we may dispatch an async message. This causes some additional
 * complexity. One issue is that replies can be received out of order. It's also
 * more difficult to determine whether one message is nested inside
 * another. Consequently, intr handling uses mOutOfTurnReplies and
 * mRemoteStackDepthGuess, which are not needed for sync messages.
 */

using namespace mozilla;
using namespace mozilla::ipc;
using namespace std;

using mozilla::dom::AutoNoJSAPI;
using mozilla::dom::ScriptSettingsInitialized;
using mozilla::MonitorAutoLock;
using mozilla::MonitorAutoUnlock;

#define IPC_ASSERT(_cond, ...)                                      \
    do {                                                            \
        if (!(_cond))                                               \
            DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__);  \
    } while (0)

static MessageChannel* gParentProcessBlocker;

namespace mozilla {
namespace ipc {

static const uint32_t kMinTelemetryMessageSize = 8192;

const int32_t MessageChannel::kNoTimeout = INT32_MIN;

// static
bool MessageChannel::sIsPumpingMessages = false;

enum Direction
{
    IN_MESSAGE,
    OUT_MESSAGE
};

class MessageChannel::InterruptFrame
{
private:
    enum Semantics
    {
        INTR_SEMS,
        SYNC_SEMS,
        ASYNC_SEMS
    };

public:
    InterruptFrame(Direction direction, const Message* msg)
      : mMessageName(msg->name()),
        mMessageRoutingId(msg->routing_id()),
        mMesageSemantics(msg->is_interrupt() ? INTR_SEMS :
                          msg->is_sync() ? SYNC_SEMS :
                          ASYNC_SEMS),
        mDirection(direction),
        mMoved(false)
    {
        MOZ_RELEASE_ASSERT(mMessageName);
    }

    InterruptFrame(InterruptFrame&& aOther)
    {
        MOZ_RELEASE_ASSERT(aOther.mMessageName);
        mMessageName = aOther.mMessageName;
        aOther.mMessageName = nullptr;
        mMoved = aOther.mMoved;
        aOther.mMoved = true;

        mMessageRoutingId = aOther.mMessageRoutingId;
        mMesageSemantics = aOther.mMesageSemantics;
        mDirection = aOther.mDirection;
    }

    ~InterruptFrame()
    {
        MOZ_RELEASE_ASSERT(mMessageName || mMoved);
    }

    InterruptFrame& operator=(InterruptFrame&& aOther)
    {
        MOZ_RELEASE_ASSERT(&aOther != this);
        this->~InterruptFrame();
        new (this) InterruptFrame(Move(aOther));
        return *this;
    }

    bool IsInterruptIncall() const
    {
        return INTR_SEMS == mMesageSemantics && IN_MESSAGE == mDirection;
    }

    bool IsInterruptOutcall() const
    {
        return INTR_SEMS == mMesageSemantics && OUT_MESSAGE == mDirection;
    }

    bool IsOutgoingSync() const {
        return (mMesageSemantics == INTR_SEMS || mMesageSemantics == SYNC_SEMS) &&
               mDirection == OUT_MESSAGE;
    }

    void Describe(int32_t* id, const char** dir, const char** sems,
                  const char** name) const
    {
        *id = mMessageRoutingId;
        *dir = (IN_MESSAGE == mDirection) ? "in" : "out";
        *sems = (INTR_SEMS == mMesageSemantics) ? "intr" :
                (SYNC_SEMS == mMesageSemantics) ? "sync" :
                "async";
        *name = mMessageName;
    }

    int32_t GetRoutingId() const
    {
        return mMessageRoutingId;
    }

private:
    const char* mMessageName;
    int32_t mMessageRoutingId;
    Semantics mMesageSemantics;
    Direction mDirection;
    bool mMoved;

    // Disable harmful methods.
    InterruptFrame(const InterruptFrame& aOther) = delete;
    InterruptFrame& operator=(const InterruptFrame&) = delete;
};

class MOZ_STACK_CLASS MessageChannel::CxxStackFrame
{
public:
    CxxStackFrame(MessageChannel& that, Direction direction, const Message* msg)
      : mThat(that)
    {
        mThat.AssertWorkerThread();

        if (mThat.mCxxStackFrames.empty())
            mThat.EnteredCxxStack();

        if (!mThat.mCxxStackFrames.append(InterruptFrame(direction, msg)))
            MOZ_CRASH();

        const InterruptFrame& frame = mThat.mCxxStackFrames.back();

        if (frame.IsInterruptIncall())
            mThat.EnteredCall();

        if (frame.IsOutgoingSync())
            mThat.EnteredSyncSend();

        mThat.mSawInterruptOutMsg |= frame.IsInterruptOutcall();
    }

    ~CxxStackFrame() {
        mThat.AssertWorkerThread();

        MOZ_RELEASE_ASSERT(!mThat.mCxxStackFrames.empty());

        const InterruptFrame& frame = mThat.mCxxStackFrames.back();
        bool exitingSync = frame.IsOutgoingSync();
        bool exitingCall = frame.IsInterruptIncall();
        mThat.mCxxStackFrames.shrinkBy(1);

        bool exitingStack = mThat.mCxxStackFrames.empty();

        // According how lifetime is declared, mListener on MessageChannel
        // lives longer than MessageChannel itself.  Hence is expected to
        // be alive.  There is nothing to even assert here, there is no place
        // we would be nullifying mListener on MessageChannel.

        if (exitingCall)
            mThat.ExitedCall();

        if (exitingSync)
            mThat.ExitedSyncSend();

        if (exitingStack)
            mThat.ExitedCxxStack();
    }
private:
    MessageChannel& mThat;

    // Disable harmful methods.
    CxxStackFrame() = delete;
    CxxStackFrame(const CxxStackFrame&) = delete;
    CxxStackFrame& operator=(const CxxStackFrame&) = delete;
};

class AutoEnterTransaction
{
public:
    explicit AutoEnterTransaction(MessageChannel *aChan,
                                  int32_t aMsgSeqno,
                                  int32_t aTransactionID,
                                  int aNestedLevel)
      : mChan(aChan),
        mActive(true),
        mOutgoing(true),
        mNestedLevel(aNestedLevel),
        mSeqno(aMsgSeqno),
        mTransaction(aTransactionID),
        mNext(mChan->mTransactionStack)
    {
        mChan->mMonitor->AssertCurrentThreadOwns();
        mChan->mTransactionStack = this;
    }

    explicit AutoEnterTransaction(MessageChannel *aChan, const IPC::Message &aMessage)
      : mChan(aChan),
        mActive(true),
        mOutgoing(false),
        mNestedLevel(aMessage.nested_level()),
        mSeqno(aMessage.seqno()),
        mTransaction(aMessage.transaction_id()),
        mNext(mChan->mTransactionStack)
    {
        mChan->mMonitor->AssertCurrentThreadOwns();

        if (!aMessage.is_sync()) {
            mActive = false;
            return;
        }

        mChan->mTransactionStack = this;
    }

    ~AutoEnterTransaction() {
        mChan->mMonitor->AssertCurrentThreadOwns();
        if (mActive) {
            mChan->mTransactionStack = mNext;
        }
    }

    void Cancel() {
        AutoEnterTransaction *cur = mChan->mTransactionStack;
        MOZ_RELEASE_ASSERT(cur == this);
        while (cur && cur->mNestedLevel != IPC::Message::NOT_NESTED) {
            // Note that, in the following situation, we will cancel multiple
            // transactions:
            // 1. Parent sends NESTED_INSIDE_SYNC message P1 to child.
            // 2. Child sends NESTED_INSIDE_SYNC message C1 to child.
            // 3. Child dispatches P1, parent blocks.
            // 4. Child cancels.
            // In this case, both P1 and C1 are cancelled. The parent will
            // remove C1 from its queue when it gets the cancellation message.
            MOZ_RELEASE_ASSERT(cur->mActive);
            cur->mActive = false;
            cur = cur->mNext;
        }

        mChan->mTransactionStack = cur;

        MOZ_RELEASE_ASSERT(IsComplete());
    }

    bool AwaitingSyncReply() const {
        MOZ_RELEASE_ASSERT(mActive);
        if (mOutgoing) {
            return true;
        }
        return mNext ? mNext->AwaitingSyncReply() : false;
    }

    int AwaitingSyncReplyNestedLevel() const {
        MOZ_RELEASE_ASSERT(mActive);
        if (mOutgoing) {
            return mNestedLevel;
        }
        return mNext ? mNext->AwaitingSyncReplyNestedLevel() : 0;
    }

    bool DispatchingSyncMessage() const {
        MOZ_RELEASE_ASSERT(mActive);
        if (!mOutgoing) {
            return true;
        }
        return mNext ? mNext->DispatchingSyncMessage() : false;
    }

    int DispatchingSyncMessageNestedLevel() const {
        MOZ_RELEASE_ASSERT(mActive);
        if (!mOutgoing) {
            return mNestedLevel;
        }
        return mNext ? mNext->DispatchingSyncMessageNestedLevel() : 0;
    }

    int NestedLevel() const {
        MOZ_RELEASE_ASSERT(mActive);
        return mNestedLevel;
    }

    int32_t SequenceNumber() const {
        MOZ_RELEASE_ASSERT(mActive);
        return mSeqno;
    }

    int32_t TransactionID() const {
        MOZ_RELEASE_ASSERT(mActive);
        return mTransaction;
    }

    void ReceivedReply(IPC::Message&& aMessage) {
        MOZ_RELEASE_ASSERT(aMessage.seqno() == mSeqno);
        MOZ_RELEASE_ASSERT(aMessage.transaction_id() == mTransaction);
        MOZ_RELEASE_ASSERT(!mReply);
        IPC_LOG("Reply received on worker thread: seqno=%d", mSeqno);
        mReply = new IPC::Message(Move(aMessage));
        MOZ_RELEASE_ASSERT(IsComplete());
    }

    void HandleReply(IPC::Message&& aMessage) {
        AutoEnterTransaction *cur = mChan->mTransactionStack;
        MOZ_RELEASE_ASSERT(cur == this);
        while (cur) {
            MOZ_RELEASE_ASSERT(cur->mActive);
            if (aMessage.seqno() == cur->mSeqno) {
                cur->ReceivedReply(Move(aMessage));
                break;
            }
            cur = cur->mNext;
            MOZ_RELEASE_ASSERT(cur);
        }
    }

    bool IsComplete() {
        return !mActive || mReply;
    }

    bool IsOutgoing() {
        return mOutgoing;
    }

    bool IsCanceled() {
        return !mActive;
    }

    bool IsBottom() const {
        return !mNext;
    }

    bool IsError() {
        MOZ_RELEASE_ASSERT(mReply);
        return mReply->is_reply_error();
    }

    nsAutoPtr<IPC::Message> GetReply() {
        return Move(mReply);
    }

private:
    MessageChannel *mChan;

    // Active is true if this transaction is on the mChan->mTransactionStack
    // stack. Generally we're not on the stack if the transaction was canceled
    // or if it was for a message that doesn't require transactions (an async
    // message).
    bool mActive;

    // Is this stack frame for an outgoing message?
    bool mOutgoing;

    // Properties of the message being sent/received.
    int mNestedLevel;
    int32_t mSeqno;
    int32_t mTransaction;

    // Next item in mChan->mTransactionStack.
    AutoEnterTransaction *mNext;

    // Pointer the a reply received for this message, if one was received.
    nsAutoPtr<IPC::Message> mReply;
};

MessageChannel::MessageChannel(MessageListener *aListener)
  : mListener(aListener),
    mChannelState(ChannelClosed),
    mSide(UnknownSide),
    mLink(nullptr),
    mWorkerLoop(nullptr),
    mChannelErrorTask(nullptr),
    mWorkerLoopID(-1),
    mTimeoutMs(kNoTimeout),
    mInTimeoutSecondHalf(false),
    mNextSeqno(0),
    mLastSendError(SyncSendError::SendSuccess),
    mDispatchingAsyncMessage(false),
    mDispatchingAsyncMessageNestedLevel(0),
    mTransactionStack(nullptr),
    mTimedOutMessageSeqno(0),
    mTimedOutMessageNestedLevel(0),
    mRemoteStackDepthGuess(0),
    mSawInterruptOutMsg(false),
    mIsWaitingForIncoming(false),
    mAbortOnError(false),
    mNotifiedChannelDone(false),
    mFlags(REQUIRE_DEFAULT),
    mPeerPidSet(false),
    mPeerPid(-1)
{
    MOZ_COUNT_CTOR(ipc::MessageChannel);

#ifdef OS_WIN
    mTopFrame = nullptr;
    mIsSyncWaitingOnNonMainThread = false;
#endif

    mOnChannelConnectedTask =
        NewNonOwningCancelableRunnableMethod(this, &MessageChannel::DispatchOnChannelConnected);

#ifdef OS_WIN
    mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
    MOZ_RELEASE_ASSERT(mEvent, "CreateEvent failed! Nothing is going to work!");
#endif
}

MessageChannel::~MessageChannel()
{
    MOZ_COUNT_DTOR(ipc::MessageChannel);
    IPC_ASSERT(mCxxStackFrames.empty(), "mismatched CxxStackFrame ctor/dtors");
#ifdef OS_WIN
    if (mEvent) {
        BOOL ok = CloseHandle(mEvent);
        mEvent = nullptr;

        if (!ok) {
            gfxDevCrash(mozilla::gfx::LogReason::MessageChannelCloseFailure) <<
                "MessageChannel failed to close. GetLastError: " <<
                GetLastError();
        }
        MOZ_RELEASE_ASSERT(ok);
    } else {
        gfxDevCrash(mozilla::gfx::LogReason::MessageChannelCloseFailure) <<
                "MessageChannel destructor ran without an mEvent Handle";
    }
#endif
    Clear();
}

// This function returns the current transaction ID. Since the notion of a
// "current transaction" can be hard to define when messages race with each
// other and one gets canceled and the other doesn't, we require that this
// function is only called when the current transaction is known to be for a
// NESTED_INSIDE_SYNC message. In that case, we know for sure what the caller is
// looking for.
int32_t
MessageChannel::CurrentNestedInsideSyncTransaction() const
{
    mMonitor->AssertCurrentThreadOwns();
    if (!mTransactionStack) {
        return 0;
    }
    MOZ_RELEASE_ASSERT(mTransactionStack->NestedLevel() == IPC::Message::NESTED_INSIDE_SYNC);
    return mTransactionStack->TransactionID();
}

bool
MessageChannel::AwaitingSyncReply() const
{
    mMonitor->AssertCurrentThreadOwns();
    return mTransactionStack ? mTransactionStack->AwaitingSyncReply() : false;
}

int
MessageChannel::AwaitingSyncReplyNestedLevel() const
{
    mMonitor->AssertCurrentThreadOwns();
    return mTransactionStack ? mTransactionStack->AwaitingSyncReplyNestedLevel() : 0;
}

bool
MessageChannel::DispatchingSyncMessage() const
{
    mMonitor->AssertCurrentThreadOwns();
    return mTransactionStack ? mTransactionStack->DispatchingSyncMessage() : false;
}

int
MessageChannel::DispatchingSyncMessageNestedLevel() const
{
    mMonitor->AssertCurrentThreadOwns();
    return mTransactionStack ? mTransactionStack->DispatchingSyncMessageNestedLevel() : 0;
}

static void
PrintErrorMessage(Side side, const char* channelName, const char* msg)
{
    const char *from = (side == ChildSide)
                       ? "Child"
                       : ((side == ParentSide) ? "Parent" : "Unknown");
    printf_stderr("\n###!!! [%s][%s] Error: %s\n\n", from, channelName, msg);
}

bool
MessageChannel::Connected() const
{
    mMonitor->AssertCurrentThreadOwns();

    // The transport layer allows us to send messages before
    // receiving the "connected" ack from the remote side.
    return (ChannelOpening == mChannelState || ChannelConnected == mChannelState);
}

bool
MessageChannel::CanSend() const
{
    if (!mMonitor) {
        return false;
    }
    MonitorAutoLock lock(*mMonitor);
    return Connected();
}

void
MessageChannel::Clear()
{
    // Don't clear mWorkerLoopID; we use it in AssertLinkThread() and
    // AssertWorkerThread().
    //
    // Also don't clear mListener.  If we clear it, then sending a message
    // through this channel after it's Clear()'ed can cause this process to
    // crash.
    //
    // In practice, mListener owns the channel, so the channel gets deleted
    // before mListener.  But just to be safe, mListener is a weak pointer.

    if (gParentProcessBlocker == this) {
        gParentProcessBlocker = nullptr;
    }

    mWorkerLoop = nullptr;
    delete mLink;
    mLink = nullptr;

    mOnChannelConnectedTask->Cancel();

    if (mChannelErrorTask) {
        mChannelErrorTask->Cancel();
        mChannelErrorTask = nullptr;
    }

    // Free up any memory used by pending messages.
    for (RefPtr<MessageTask> task : mPending) {
        task->Clear();
    }
    mPending.clear();

    mOutOfTurnReplies.clear();
    while (!mDeferred.empty()) {
        mDeferred.pop();
    }
}

bool
MessageChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide)
{
    NS_PRECONDITION(!mLink, "Open() called > once");

    mMonitor = new RefCountedMonitor();
    mWorkerLoop = MessageLoop::current();
    mWorkerLoopID = mWorkerLoop->id();

    ProcessLink *link = new ProcessLink(this);
    link->Open(aTransport, aIOLoop, aSide); // :TODO: n.b.: sets mChild
    mLink = link;
    return true;
}

bool
MessageChannel::Open(MessageChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide)
{
    // Opens a connection to another thread in the same process.

    //  This handshake proceeds as follows:
    //  - Let A be the thread initiating the process (either child or parent)
    //    and B be the other thread.
    //  - A spawns thread for B, obtaining B's message loop
    //  - A creates ProtocolChild and ProtocolParent instances.
    //    Let PA be the one appropriate to A and PB the side for B.
    //  - A invokes PA->Open(PB, ...):
    //    - set state to mChannelOpening
    //    - this will place a work item in B's worker loop (see next bullet)
    //      and then spins until PB->mChannelState becomes mChannelConnected
    //    - meanwhile, on PB's worker loop, the work item is removed and:
    //      - invokes PB->SlaveOpen(PA, ...):
    //        - sets its state and that of PA to Connected
    NS_PRECONDITION(aTargetChan, "Need a target channel");
    NS_PRECONDITION(ChannelClosed == mChannelState, "Not currently closed");

    CommonThreadOpenInit(aTargetChan, aSide);

    Side oppSide = UnknownSide;
    switch(aSide) {
      case ChildSide: oppSide = ParentSide; break;
      case ParentSide: oppSide = ChildSide; break;
      case UnknownSide: break;
    }

    mMonitor = new RefCountedMonitor();

    MonitorAutoLock lock(*mMonitor);
    mChannelState = ChannelOpening;
    aTargetLoop->PostTask(NewNonOwningRunnableMethod
                          <MessageChannel*, Side>(aTargetChan,
                                                  &MessageChannel::OnOpenAsSlave,
                                                  this, oppSide));

    while (ChannelOpening == mChannelState)
        mMonitor->Wait();
    MOZ_RELEASE_ASSERT(ChannelConnected == mChannelState, "not connected when awoken");
    return (ChannelConnected == mChannelState);
}

void
MessageChannel::OnOpenAsSlave(MessageChannel *aTargetChan, Side aSide)
{
    // Invoked when the other side has begun the open.
    NS_PRECONDITION(ChannelClosed == mChannelState,
                    "Not currently closed");
    NS_PRECONDITION(ChannelOpening == aTargetChan->mChannelState,
                    "Target channel not in the process of opening");

    CommonThreadOpenInit(aTargetChan, aSide);
    mMonitor = aTargetChan->mMonitor;

    MonitorAutoLock lock(*mMonitor);
    MOZ_RELEASE_ASSERT(ChannelOpening == aTargetChan->mChannelState,
                          "Target channel not in the process of opening");
    mChannelState = ChannelConnected;
    aTargetChan->mChannelState = ChannelConnected;
    aTargetChan->mMonitor->Notify();
}

void
MessageChannel::CommonThreadOpenInit(MessageChannel *aTargetChan, Side aSide)
{
    mWorkerLoop = MessageLoop::current();
    mWorkerLoopID = mWorkerLoop->id();
    mLink = new ThreadLink(this, aTargetChan);
    mSide = aSide;
}

bool
MessageChannel::Echo(Message* aMsg)
{
    nsAutoPtr<Message> msg(aMsg);
    AssertWorkerThread();
    mMonitor->AssertNotCurrentThreadOwns();
    if (MSG_ROUTING_NONE == msg->routing_id()) {
        ReportMessageRouteError("MessageChannel::Echo");
        return false;
    }

    MonitorAutoLock lock(*mMonitor);

    if (!Connected()) {
        ReportConnectionError("MessageChannel", msg);
        return false;
    }

    mLink->EchoMessage(msg.forget());
    return true;
}

bool
MessageChannel::Send(Message* aMsg)
{
    if (aMsg->size() >= kMinTelemetryMessageSize) {
        Telemetry::Accumulate(Telemetry::IPC_MESSAGE_SIZE,
                              nsDependentCString(aMsg->name()), aMsg->size());
    }

    MOZ_RELEASE_ASSERT(!aMsg->is_sync());
    MOZ_RELEASE_ASSERT(aMsg->nested_level() != IPC::Message::NESTED_INSIDE_SYNC);

    CxxStackFrame frame(*this, OUT_MESSAGE, aMsg);

    nsAutoPtr<Message> msg(aMsg);
    AssertWorkerThread();
    mMonitor->AssertNotCurrentThreadOwns();
    if (MSG_ROUTING_NONE == msg->routing_id()) {
        ReportMessageRouteError("MessageChannel::Send");
        return false;
    }

    MonitorAutoLock lock(*mMonitor);
    if (!Connected()) {
        ReportConnectionError("MessageChannel", msg);
        return false;
    }
    mLink->SendMessage(msg.forget());
    return true;
}

class CancelMessage : public IPC::Message
{
public:
    explicit CancelMessage(int transaction) :
        IPC::Message(MSG_ROUTING_NONE, CANCEL_MESSAGE_TYPE)
    {
        set_transaction_id(transaction);
    }
    static bool Read(const Message* msg) {
        return true;
    }
    void Log(const std::string& aPrefix, FILE* aOutf) const {
        fputs("(special `Cancel' message)", aOutf);
    }
};

bool
MessageChannel::MaybeInterceptSpecialIOMessage(const Message& aMsg)
{
    AssertLinkThread();
    mMonitor->AssertCurrentThreadOwns();

    if (MSG_ROUTING_NONE == aMsg.routing_id()) {
        if (GOODBYE_MESSAGE_TYPE == aMsg.type()) {
            // :TODO: Sort out Close() on this side racing with Close() on the
            // other side
            mChannelState = ChannelClosing;
            if (LoggingEnabled()) {
                printf("NOTE: %s process received `Goodbye', closing down\n",
                       (mSide == ChildSide) ? "child" : "parent");
            }
            return true;
        } else if (CANCEL_MESSAGE_TYPE == aMsg.type()) {
            IPC_LOG("Cancel from message");
            CancelTransaction(aMsg.transaction_id());
            NotifyWorkerThread();
            return true;
        }
    }
    return false;
}

bool
MessageChannel::ShouldDeferMessage(const Message& aMsg)
{
    // Never defer messages that have the highest nested level, even async
    // ones. This is safe because only the child can send these messages, so
    // they can never nest.
    if (aMsg.nested_level() == IPC::Message::NESTED_INSIDE_CPOW)
        return false;

    // Unless they're NESTED_INSIDE_CPOW, we always defer async messages.
    // Note that we never send an async NESTED_INSIDE_SYNC message.
    if (!aMsg.is_sync()) {
        MOZ_RELEASE_ASSERT(aMsg.nested_level() == IPC::Message::NOT_NESTED);
        return true;
    }

    int msgNestedLevel = aMsg.nested_level();
    int waitingNestedLevel = AwaitingSyncReplyNestedLevel();

    // Always defer if the nested level of the incoming message is less than the
    // nested level of the message we're awaiting.
    if (msgNestedLevel < waitingNestedLevel)
        return true;

    // Never defer if the message has strictly greater nested level.
    if (msgNestedLevel > waitingNestedLevel)
        return false;

    // When both sides send sync messages of the same nested level, we resolve the
    // race by dispatching in the child and deferring the incoming message in
    // the parent. However, the parent still needs to dispatch nested sync
    // messages.
    //
    // Deferring in the parent only sort of breaks message ordering. When the
    // child's message comes in, we can pretend the child hasn't quite
    // finished sending it yet. Since the message is sync, we know that the
    // child hasn't moved on yet.
    return mSide == ParentSide && aMsg.transaction_id() != CurrentNestedInsideSyncTransaction();
}

// Predicate that is true for messages that should be consolidated if 'compress' is set.
class MatchingKinds {
    typedef IPC::Message Message;
    Message::msgid_t mType;
    int32_t mRoutingId;
public:
    MatchingKinds(Message::msgid_t aType, int32_t aRoutingId) :
        mType(aType), mRoutingId(aRoutingId) {}
    bool operator()(const Message &msg) {
        return msg.type() == mType && msg.routing_id() == mRoutingId;
    }
};

void
MessageChannel::OnMessageReceivedFromLink(Message&& aMsg)
{
    AssertLinkThread();
    mMonitor->AssertCurrentThreadOwns();

    if (MaybeInterceptSpecialIOMessage(aMsg))
        return;

    // Regardless of the Interrupt stack, if we're awaiting a sync reply,
    // we know that it needs to be immediately handled to unblock us.
    if (aMsg.is_sync() && aMsg.is_reply()) {
        IPC_LOG("Received reply seqno=%d xid=%d", aMsg.seqno(), aMsg.transaction_id());

        if (aMsg.seqno() == mTimedOutMessageSeqno) {
            // Drop the message, but allow future sync messages to be sent.
            IPC_LOG("Received reply to timedout message; igoring; xid=%d", mTimedOutMessageSeqno);
            EndTimeout();
            return;
        }

        MOZ_RELEASE_ASSERT(AwaitingSyncReply());
        MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno);

        mTransactionStack->HandleReply(Move(aMsg));
        NotifyWorkerThread();
        return;
    }

    // Nested messages cannot be compressed.
    MOZ_RELEASE_ASSERT(aMsg.compress_type() == IPC::Message::COMPRESSION_NONE ||
                       aMsg.nested_level() == IPC::Message::NOT_NESTED);

    bool compress = false;
    if (aMsg.compress_type() == IPC::Message::COMPRESSION_ENABLED) {
        compress = (!mPending.isEmpty() &&
                    mPending.getLast()->Msg().type() == aMsg.type() &&
                    mPending.getLast()->Msg().routing_id() == aMsg.routing_id());
        if (compress) {
            // This message type has compression enabled, and the back of the
            // queue was the same message type and routed to the same destination.
            // Replace it with the newer message.
            MOZ_RELEASE_ASSERT(mPending.getLast()->Msg().compress_type() ==
                               IPC::Message::COMPRESSION_ENABLED);
            mPending.popLast();
        }
    } else if (aMsg.compress_type() == IPC::Message::COMPRESSION_ALL && !mPending.isEmpty()) {
        for (RefPtr<MessageTask> p = mPending.getLast(); p; p = p->getPrevious()) {
            if (p->Msg().type() == aMsg.type() &&
                p->Msg().routing_id() == aMsg.routing_id())
            {
                compress = true;
                MOZ_RELEASE_ASSERT(p->Msg().compress_type() == IPC::Message::COMPRESSION_ALL);
                p->remove();
                break;
            }
        }
    }

    bool wakeUpSyncSend = AwaitingSyncReply() && !ShouldDeferMessage(aMsg);

    bool shouldWakeUp = AwaitingInterruptReply() ||
                        wakeUpSyncSend ||
                        AwaitingIncomingMessage();

    // Although we usually don't need to post a message task if
    // shouldWakeUp is true, it's easier to post anyway than to have to
    // guarantee that every Send call processes everything it's supposed to
    // before returning.
    bool shouldPostTask = !shouldWakeUp || wakeUpSyncSend;

    IPC_LOG("Receive on link thread; seqno=%d, xid=%d, shouldWakeUp=%d",
            aMsg.seqno(), aMsg.transaction_id(), shouldWakeUp);

    // There are three cases we're concerned about, relating to the state of the
    // main thread:
    //
    // (1) We are waiting on a sync reply - main thread is blocked on the
    //     IPC monitor.
    //   - If the message is NESTED_INSIDE_SYNC, we wake up the main thread to
    //     deliver the message depending on ShouldDeferMessage. Otherwise, we
    //     leave it in the mPending queue, posting a task to the main event
    //     loop, where it will be processed once the synchronous reply has been
    //     received.
    //
    // (2) We are waiting on an Interrupt reply - main thread is blocked on the
    //     IPC monitor.
    //   - Always notify and wake up the main thread.
    //
    // (3) We are not waiting on a reply.
    //   - We post a task to the main event loop.
    //
    // Note that, we may notify the main thread even though the monitor is not
    // blocked. This is okay, since we always check for pending events before
    // blocking again.

    RefPtr<MessageTask> task = new MessageTask(this, Move(aMsg));
    mPending.insertBack(task);

    if (shouldWakeUp) {
        NotifyWorkerThread();
    }

    if (shouldPostTask) {
        task->Post();
    }
}

void
MessageChannel::PeekMessages(mozilla::function<bool(const Message& aMsg)> aInvoke)
{
    // FIXME: We shouldn't be holding the lock for aInvoke!
    MonitorAutoLock lock(*mMonitor);

    for (RefPtr<MessageTask> it : mPending) {
        const Message &msg = it->Msg();
        if (!aInvoke(msg)) {
            break;
        }
    }
}

void
MessageChannel::ProcessPendingRequests(AutoEnterTransaction& aTransaction)
{
    mMonitor->AssertCurrentThreadOwns();

    IPC_LOG("ProcessPendingRequests for seqno=%d, xid=%d",
            aTransaction.SequenceNumber(), aTransaction.TransactionID());

    // Loop until there aren't any more nested messages to process.
    for (;;) {
        // If we canceled during ProcessPendingRequest, then we need to leave
        // immediately because the results of ShouldDeferMessage will be
        // operating with weird state (as if no Send is in progress). That could
        // cause even NOT_NESTED sync messages to be processed (but not
        // NOT_NESTED async messages), which would break message ordering.
        if (aTransaction.IsCanceled()) {
            return;
        }

        mozilla::Vector<Message> toProcess;

        for (RefPtr<MessageTask> p = mPending.getFirst(); p; ) {
            Message &msg = p->Msg();

            MOZ_RELEASE_ASSERT(!aTransaction.IsCanceled(),
                               "Calling ShouldDeferMessage when cancelled");
            bool defer = ShouldDeferMessage(msg);

            // Only log the interesting messages.
            if (msg.is_sync() || msg.nested_level() == IPC::Message::NESTED_INSIDE_CPOW) {
                IPC_LOG("ShouldDeferMessage(seqno=%d) = %d", msg.seqno(), defer);
            }

            if (!defer) {
                if (!toProcess.append(Move(msg)))
                    MOZ_CRASH();

                p = p->removeAndGetNext();
                continue;
            }
            p = p->getNext();
        }

        if (toProcess.empty()) {
            break;
        }

        // Processing these messages could result in more messages, so we
        // loop around to check for more afterwards.

        for (auto it = toProcess.begin(); it != toProcess.end(); it++) {
            ProcessPendingRequest(Move(*it));
        }
    }
}

bool
MessageChannel::Send(Message* aMsg, Message* aReply)
{
    if (aMsg->size() >= kMinTelemetryMessageSize) {
        Telemetry::Accumulate(Telemetry::IPC_MESSAGE_SIZE,
                              nsDependentCString(aMsg->name()), aMsg->size());
    }

    nsAutoPtr<Message> msg(aMsg);

    // Sanity checks.
    AssertWorkerThread();
    mMonitor->AssertNotCurrentThreadOwns();

#ifdef OS_WIN
    SyncStackFrame frame(this, false);
    NeuteredWindowRegion neuteredRgn(mFlags & REQUIRE_DEFERRED_MESSAGE_PROTECTION);
#endif

    CxxStackFrame f(*this, OUT_MESSAGE, msg);

    MonitorAutoLock lock(*mMonitor);

    if (mTimedOutMessageSeqno) {
        // Don't bother sending another sync message if a previous one timed out
        // and we haven't received a reply for it. Once the original timed-out
        // message receives a reply, we'll be able to send more sync messages
        // again.
        IPC_LOG("Send() failed due to previous timeout");
        mLastSendError = SyncSendError::PreviousTimeout;
        return false;
    }

    if (DispatchingSyncMessageNestedLevel() == IPC::Message::NOT_NESTED &&
        msg->nested_level() > IPC::Message::NOT_NESTED)
    {
        // Don't allow sending CPOWs while we're dispatching a sync message.
        // If you want to do that, use sendRpcMessage instead.
        IPC_LOG("Nested level forbids send");
        mLastSendError = SyncSendError::SendingCPOWWhileDispatchingSync;
        return false;
    }

    if (DispatchingSyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW ||
        DispatchingAsyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW)
    {
        // Generally only the parent dispatches urgent messages. And the only
        // sync messages it can send are NESTED_INSIDE_SYNC. Mainly we want to ensure
        // here that we don't return false for non-CPOW messages.
        MOZ_RELEASE_ASSERT(msg->nested_level() == IPC::Message::NESTED_INSIDE_SYNC);
        IPC_LOG("Sending while dispatching urgent message");
        mLastSendError = SyncSendError::SendingCPOWWhileDispatchingUrgent;
        return false;
    }

    if (msg->nested_level() < DispatchingSyncMessageNestedLevel() ||
        msg->nested_level() < AwaitingSyncReplyNestedLevel())
    {
        MOZ_RELEASE_ASSERT(DispatchingSyncMessage() || DispatchingAsyncMessage());
        IPC_LOG("Cancel from Send");
        CancelMessage *cancel = new CancelMessage(CurrentNestedInsideSyncTransaction());
        CancelTransaction(CurrentNestedInsideSyncTransaction());
        mLink->SendMessage(cancel);
    }

    IPC_ASSERT(msg->is_sync(), "can only Send() sync messages here");

    IPC_ASSERT(msg->nested_level() >= DispatchingSyncMessageNestedLevel(),
               "can't send sync message of a lesser nested level than what's being dispatched");
    IPC_ASSERT(AwaitingSyncReplyNestedLevel() <= msg->nested_level(),
               "nested sync message sends must be of increasing nested level");
    IPC_ASSERT(DispatchingSyncMessageNestedLevel() != IPC::Message::NESTED_INSIDE_CPOW,
               "not allowed to send messages while dispatching urgent messages");

    IPC_ASSERT(DispatchingAsyncMessageNestedLevel() != IPC::Message::NESTED_INSIDE_CPOW,
               "not allowed to send messages while dispatching urgent messages");

    if (!Connected()) {
        ReportConnectionError("MessageChannel::SendAndWait", msg);
        mLastSendError = SyncSendError::NotConnectedBeforeSend;
        return false;
    }

    msg->set_seqno(NextSeqno());

    int32_t seqno = msg->seqno();
    int nestedLevel = msg->nested_level();
    msgid_t replyType = msg->type() + 1;

    AutoEnterTransaction *stackTop = mTransactionStack;

    // If the most recent message on the stack is NESTED_INSIDE_SYNC, then our
    // message should nest inside that and we use the same transaction
    // ID. Otherwise we need a new transaction ID (so we use the seqno of the
    // message we're sending).
    bool nest = stackTop && stackTop->NestedLevel() == IPC::Message::NESTED_INSIDE_SYNC;
    int32_t transaction = nest ? stackTop->TransactionID() : seqno;
    msg->set_transaction_id(transaction);

    bool handleWindowsMessages = mListener->HandleWindowsMessages(*aMsg);
    AutoEnterTransaction transact(this, seqno, transaction, nestedLevel);

    IPC_LOG("Send seqno=%d, xid=%d", seqno, transaction);

    // msg will be destroyed soon, but name() is not owned by msg.
    const char* msgName = msg->name();

    mLink->SendMessage(msg.forget());

    while (true) {
        MOZ_RELEASE_ASSERT(!transact.IsCanceled());
        ProcessPendingRequests(transact);
        if (transact.IsComplete()) {
            break;
        }
        if (!Connected()) {
            ReportConnectionError("MessageChannel::Send");
            mLastSendError = SyncSendError::DisconnectedDuringSend;
            return false;
        }

        MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno);
        MOZ_RELEASE_ASSERT(!transact.IsComplete());
        MOZ_RELEASE_ASSERT(mTransactionStack == &transact);

        bool maybeTimedOut = !WaitForSyncNotify(handleWindowsMessages);

        if (mListener->NeedArtificialSleep()) {
            MonitorAutoUnlock unlock(*mMonitor);
            mListener->ArtificialSleep();
        }

        if (!Connected()) {
            ReportConnectionError("MessageChannel::SendAndWait");
            mLastSendError = SyncSendError::DisconnectedDuringSend;
            return false;
        }

        if (transact.IsCanceled()) {
            break;
        }

        MOZ_RELEASE_ASSERT(mTransactionStack == &transact);

        // We only time out a message if it initiated a new transaction (i.e.,
        // if neither side has any other message Sends on the stack).
        bool canTimeOut = transact.IsBottom();
        if (maybeTimedOut && canTimeOut && !ShouldContinueFromTimeout()) {
            // Since ShouldContinueFromTimeout drops the lock, we need to
            // re-check all our conditions here. We shouldn't time out if any of
            // these things happen because there won't be a reply to the timed
            // out message in these cases.
            if (transact.IsComplete()) {
                break;
            }

            IPC_LOG("Timing out Send: xid=%d", transaction);

            mTimedOutMessageSeqno = seqno;
            mTimedOutMessageNestedLevel = nestedLevel;
            mLastSendError = SyncSendError::TimedOut;
            return false;
        }

        if (transact.IsCanceled()) {
            break;
        }
    }

    if (transact.IsCanceled()) {
        IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction);
        mLastSendError = SyncSendError::CancelledAfterSend;
        return false;
    }

    if (transact.IsError()) {
        IPC_LOG("Error: seqno=%d, xid=%d", seqno, transaction);
        mLastSendError = SyncSendError::ReplyError;
        return false;
    }

    IPC_LOG("Got reply: seqno=%d, xid=%d", seqno, transaction);

    nsAutoPtr<Message> reply = transact.GetReply();

    MOZ_RELEASE_ASSERT(reply);
    MOZ_RELEASE_ASSERT(reply->is_reply(), "expected reply");
    MOZ_RELEASE_ASSERT(!reply->is_reply_error());
    MOZ_RELEASE_ASSERT(reply->seqno() == seqno);
    MOZ_RELEASE_ASSERT(reply->type() == replyType, "wrong reply type");
    MOZ_RELEASE_ASSERT(reply->is_sync());

    *aReply = Move(*reply);
    if (aReply->size() >= kMinTelemetryMessageSize) {
        Telemetry::Accumulate(Telemetry::IPC_REPLY_SIZE,
                              nsDependentCString(msgName), aReply->size());
    }
    return true;
}

bool
MessageChannel::Call(Message* aMsg, Message* aReply)
{
    nsAutoPtr<Message> msg(aMsg);
    AssertWorkerThread();
    mMonitor->AssertNotCurrentThreadOwns();

#ifdef OS_WIN
    SyncStackFrame frame(this, true);
#endif

    // This must come before MonitorAutoLock, as its destructor acquires the
    // monitor lock.
    CxxStackFrame cxxframe(*this, OUT_MESSAGE, msg);

    MonitorAutoLock lock(*mMonitor);
    if (!Connected()) {
        ReportConnectionError("MessageChannel::Call", msg);
        return false;
    }

    // Sanity checks.
    IPC_ASSERT(!AwaitingSyncReply(),
               "cannot issue Interrupt call while blocked on sync request");
    IPC_ASSERT(!DispatchingSyncMessage(),
               "violation of sync handler invariant");
    IPC_ASSERT(msg->is_interrupt(), "can only Call() Interrupt messages here");

    msg->set_seqno(NextSeqno());
    msg->set_interrupt_remote_stack_depth_guess(mRemoteStackDepthGuess);
    msg->set_interrupt_local_stack_depth(1 + InterruptStackDepth());
    mInterruptStack.push(MessageInfo(*msg));
    mLink->SendMessage(msg.forget());

    while (true) {
        // if a handler invoked by *Dispatch*() spun a nested event
        // loop, and the connection was broken during that loop, we
        // might have already processed the OnError event. if so,
        // trying another loop iteration will be futile because
        // channel state will have been cleared
        if (!Connected()) {
            ReportConnectionError("MessageChannel::Call");
            return false;
        }

#ifdef OS_WIN
        // We need to limit the scoped of neuteredRgn to this spot in the code.
        // Window neutering can't be enabled during some plugin calls because
        // we then risk the neutered window procedure being subclassed by a
        // plugin.
        {
            NeuteredWindowRegion neuteredRgn(mFlags & REQUIRE_DEFERRED_MESSAGE_PROTECTION);
            /* We should pump messages at this point to ensure that the IPC peer
               does not become deadlocked on a pending inter-thread SendMessage() */
            neuteredRgn.PumpOnce();
        }
#endif

        // Now might be the time to process a message deferred because of race
        // resolution.
        MaybeUndeferIncall();

        // Wait for an event to occur.
        while (!InterruptEventOccurred()) {
            bool maybeTimedOut = !WaitForInterruptNotify();

            // We might have received a "subtly deferred" message in a nested
            // loop that it's now time to process.
            if (InterruptEventOccurred() ||
                (!maybeTimedOut && (!mDeferred.empty() || !mOutOfTurnReplies.empty())))
            {
                break;
            }

            if (maybeTimedOut && !ShouldContinueFromTimeout())
                return false;
        }

        Message recvd;
        MessageMap::iterator it;

        if ((it = mOutOfTurnReplies.find(mInterruptStack.top().seqno()))
            != mOutOfTurnReplies.end())
        {
            recvd = Move(it->second);
            mOutOfTurnReplies.erase(it);
        } else if (!mPending.isEmpty()) {
            RefPtr<MessageTask> task = mPending.popFirst();
            recvd = Move(task->Msg());
        } else {
            // because of subtleties with nested event loops, it's possible
            // that we got here and nothing happened.  or, we might have a
            // deferred in-call that needs to be processed.  either way, we
            // won't break the inner while loop again until something new
            // happens.
            continue;
        }

        // If the message is not Interrupt, we can dispatch it as normal.
        if (!recvd.is_interrupt()) {
            DispatchMessage(Move(recvd));
            if (!Connected()) {
                ReportConnectionError("MessageChannel::DispatchMessage");
                return false;
            }
            continue;
        }

        // If the message is an Interrupt reply, either process it as a reply to our
        // call, or add it to the list of out-of-turn replies we've received.
        if (recvd.is_reply()) {
            IPC_ASSERT(!mInterruptStack.empty(), "invalid Interrupt stack");

            // If this is not a reply the call we've initiated, add it to our
            // out-of-turn replies and keep polling for events.
            {
                const MessageInfo &outcall = mInterruptStack.top();

                // Note, In the parent, sequence numbers increase from 0, and
                // in the child, they decrease from 0.
                if ((mSide == ChildSide && recvd.seqno() > outcall.seqno()) ||
                    (mSide != ChildSide && recvd.seqno() < outcall.seqno()))
                {
                    mOutOfTurnReplies[recvd.seqno()] = Move(recvd);
                    continue;
                }

                IPC_ASSERT(recvd.is_reply_error() ||
                           (recvd.type() == (outcall.type() + 1) &&
                            recvd.seqno() == outcall.seqno()),
                           "somebody's misbehavin'", true);
            }

            // We received a reply to our most recent outstanding call. Pop
            // this frame and return the reply.
            mInterruptStack.pop();

            bool is_reply_error = recvd.is_reply_error();
            if (!is_reply_error) {
                *aReply = Move(recvd);
            }

            // If we have no more pending out calls waiting on replies, then
            // the reply queue should be empty.
            IPC_ASSERT(!mInterruptStack.empty() || mOutOfTurnReplies.empty(),
                       "still have pending replies with no pending out-calls",
                       true);

            return !is_reply_error;
        }

        // Dispatch an Interrupt in-call. Snapshot the current stack depth while we
        // own the monitor.
        size_t stackDepth = InterruptStackDepth();
        {
            MonitorAutoUnlock unlock(*mMonitor);

            CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
            DispatchInterruptMessage(Move(recvd), stackDepth);
        }
        if (!Connected()) {
            ReportConnectionError("MessageChannel::DispatchInterruptMessage");
            return false;
        }
    }

    return true;
}

bool
MessageChannel::WaitForIncomingMessage()
{
#ifdef OS_WIN
    SyncStackFrame frame(this, true);
    NeuteredWindowRegion neuteredRgn(mFlags & REQUIRE_DEFERRED_MESSAGE_PROTECTION);
#endif

    MonitorAutoLock lock(*mMonitor);
    AutoEnterWaitForIncoming waitingForIncoming(*this);
    if (mChannelState != ChannelConnected) {
        return false;
    }
    if (!HasPendingEvents()) {
        return WaitForInterruptNotify();
    }

    MOZ_RELEASE_ASSERT(!mPending.isEmpty());
    RefPtr<MessageTask> task = mPending.getFirst();
    RunMessage(*task);
    return true;
}

bool
MessageChannel::HasPendingEvents()
{
    AssertWorkerThread();
    mMonitor->AssertCurrentThreadOwns();
    return Connected() && !mPending.isEmpty();
}

bool
MessageChannel::InterruptEventOccurred()
{
    AssertWorkerThread();
    mMonitor->AssertCurrentThreadOwns();
    IPC_ASSERT(InterruptStackDepth() > 0, "not in wait loop");

    return (!Connected() ||
            !mPending.isEmpty() ||
            (!mOutOfTurnReplies.empty() &&
             mOutOfTurnReplies.find(mInterruptStack.top().seqno()) !=
             mOutOfTurnReplies.end()));
}

bool
MessageChannel::ProcessPendingRequest(Message &&aUrgent)
{
    AssertWorkerThread();
    mMonitor->AssertCurrentThreadOwns();

    IPC_LOG("Process pending: seqno=%d, xid=%d", aUrgent.seqno(), aUrgent.transaction_id());

    DispatchMessage(Move(aUrgent));
    if (!Connected()) {
        ReportConnectionError("MessageChannel::ProcessPendingRequest");
        return false;
    }

    return true;
}

bool
MessageChannel::ShouldRunMessage(const Message& aMsg)
{
    if (!mTimedOutMessageSeqno) {
        return true;
    }

    // If we've timed out a message and we're awaiting the reply to the timed
    // out message, we have to be careful what messages we process. Here's what
    // can go wrong:
    // 1. child sends a NOT_NESTED sync message S
    // 2. parent sends a NESTED_INSIDE_SYNC sync message H at the same time
    // 3. parent times out H
    // 4. child starts processing H and sends a NESTED_INSIDE_SYNC message H' nested
    //    within the same transaction
    // 5. parent dispatches S and sends reply
    // 6. child asserts because it instead expected a reply to H'.
    //
    // To solve this, we refuse to process S in the parent until we get a reply
    // to H. More generally, let the timed out message be M. We don't process a
    // message unless the child would need the response to that message in order
    // to process M. Those messages are the ones that have a higher nested level
    // than M or that are part of the same transaction as M.
    if (aMsg.nested_level() < mTimedOutMessageNestedLevel ||
        (aMsg.nested_level() == mTimedOutMessageNestedLevel
         && aMsg.transaction_id() != mTimedOutMessageSeqno))
    {
        return false;
    }

    return true;
}

void
MessageChannel::RunMessage(MessageTask& aTask)
{
    AssertWorkerThread();
    mMonitor->AssertCurrentThreadOwns();

    Message& msg = aTask.Msg();

    if (!Connected()) {
        ReportConnectionError("OnMaybeDequeueOne");
        return;
    }

    // Check that we're going to run the first message that's valid to run.
#ifdef DEBUG
    for (RefPtr<MessageTask> task : mPending) {
        if (task == &aTask) {
            break;
        }
        MOZ_ASSERT(!ShouldRunMessage(task->Msg()));
    }
#endif

    if (!mDeferred.empty()) {
        MaybeUndeferIncall();
    }

    if (!ShouldRunMessage(msg)) {
        return;
    }

    MOZ_RELEASE_ASSERT(aTask.isInList());
    aTask.remove();

    if (IsOnCxxStack() && msg.is_interrupt() && msg.is_reply()) {
        // We probably just received a reply in a nested loop for an
        // Interrupt call sent before entering that loop.
        mOutOfTurnReplies[msg.seqno()] = Move(msg);
        return;
    }

    DispatchMessage(Move(msg));
}

nsresult
MessageChannel::MessageTask::Run()
{
    if (!mChannel) {
        return NS_OK;
    }

    mChannel->AssertWorkerThread();
    mChannel->mMonitor->AssertNotCurrentThreadOwns();

    MonitorAutoLock lock(*mChannel->mMonitor);

    // In case we choose not to run this message, we may need to be able to Post
    // it again.
    mScheduled = false;

    if (!isInList()) {
        return NS_OK;
    }

    mChannel->RunMessage(*this);
    return NS_OK;
}

nsresult
MessageChannel::MessageTask::Cancel()
{
    if (!mChannel) {
        return NS_OK;
    }

    mChannel->AssertWorkerThread();
    mChannel->mMonitor->AssertNotCurrentThreadOwns();

    MonitorAutoLock lock(*mChannel->mMonitor);

    if (!isInList()) {
        return NS_OK;
    }
    remove();

    return NS_OK;
}

void
MessageChannel::MessageTask::Post()
{
    MOZ_RELEASE_ASSERT(!mScheduled);
    MOZ_RELEASE_ASSERT(isInList());

    RefPtr<MessageTask> self = this;
    mChannel->mWorkerLoop->PostTask(self.forget());
    mScheduled = true;
}

void
MessageChannel::MessageTask::Clear()
{
    mChannel->AssertWorkerThread();

    mChannel = nullptr;
}

void
MessageChannel::DispatchMessage(Message &&aMsg)
{
    AssertWorkerThread();
    mMonitor->AssertCurrentThreadOwns();

    Maybe<AutoNoJSAPI> nojsapi;
    if (ScriptSettingsInitialized() && NS_IsMainThread())
        nojsapi.emplace();

    nsAutoPtr<Message> reply;

    IPC_LOG("DispatchMessage: seqno=%d, xid=%d", aMsg.seqno(), aMsg.transaction_id());

    {
        AutoEnterTransaction transaction(this, aMsg);

        int id = aMsg.transaction_id();
        MOZ_RELEASE_ASSERT(!aMsg.is_sync() || id == transaction.TransactionID());

        {
            MonitorAutoUnlock unlock(*mMonitor);
            CxxStackFrame frame(*this, IN_MESSAGE, &aMsg);

            mListener->ArtificialSleep();

            if (aMsg.is_sync())
                DispatchSyncMessage(aMsg, *getter_Transfers(reply));
            else if (aMsg.is_interrupt())
                DispatchInterruptMessage(Move(aMsg), 0);
            else
                DispatchAsyncMessage(aMsg);

            mListener->ArtificialSleep();
        }

        if (reply && transaction.IsCanceled()) {
            // The transaction has been canceled. Don't send a reply.
            IPC_LOG("Nulling out reply due to cancellation, seqno=%d, xid=%d", aMsg.seqno(), id);
            reply = nullptr;
        }
    }

    if (reply && ChannelConnected == mChannelState) {
        IPC_LOG("Sending reply seqno=%d, xid=%d", aMsg.seqno(), aMsg.transaction_id());
        mLink->SendMessage(reply.forget());
    }
}

void
MessageChannel::DispatchSyncMessage(const Message& aMsg, Message*& aReply)
{
    AssertWorkerThread();

    int nestedLevel = aMsg.nested_level();

    MOZ_RELEASE_ASSERT(nestedLevel == IPC::Message::NOT_NESTED || NS_IsMainThread());

    MessageChannel* dummy;
    MessageChannel*& blockingVar = mSide == ChildSide && NS_IsMainThread() ? gParentProcessBlocker : dummy;

    Result rv;
    {
        AutoSetValue<MessageChannel*> blocked(blockingVar, this);
        rv = mListener->OnMessageReceived(aMsg, aReply);
    }

    if (!MaybeHandleError(rv, aMsg, "DispatchSyncMessage")) {
        aReply = new Message();
        aReply->set_sync();
        aReply->set_nested_level(aMsg.nested_level());
        aReply->set_reply();
        aReply->set_reply_error();
    }
    aReply->set_seqno(aMsg.seqno());
    aReply->set_transaction_id(aMsg.transaction_id());
}

void
MessageChannel::DispatchAsyncMessage(const Message& aMsg)
{
    AssertWorkerThread();
    MOZ_RELEASE_ASSERT(!aMsg.is_interrupt() && !aMsg.is_sync());

    if (aMsg.routing_id() == MSG_ROUTING_NONE) {
        NS_RUNTIMEABORT("unhandled special message!");
    }

    Result rv;
    {
        int nestedLevel = aMsg.nested_level();
        AutoSetValue<bool> async(mDispatchingAsyncMessage, true);
        AutoSetValue<int> nestedLevelSet(mDispatchingAsyncMessageNestedLevel, nestedLevel);
        rv = mListener->OnMessageReceived(aMsg);
    }
    MaybeHandleError(rv, aMsg, "DispatchAsyncMessage");
}

void
MessageChannel::DispatchInterruptMessage(Message&& aMsg, size_t stackDepth)
{
    AssertWorkerThread();
    mMonitor->AssertNotCurrentThreadOwns();

    IPC_ASSERT(aMsg.is_interrupt() && !aMsg.is_reply(), "wrong message type");

    // Race detection: see the long comment near mRemoteStackDepthGuess in
    // MessageChannel.h. "Remote" stack depth means our side, and "local" means
    // the other side.
    if (aMsg.interrupt_remote_stack_depth_guess() != RemoteViewOfStackDepth(stackDepth)) {
        // Interrupt in-calls have raced. The winner, if there is one, gets to defer
        // processing of the other side's in-call.
        bool defer;
        const char* winner;
        const MessageInfo parentMsgInfo =
          (mSide == ChildSide) ? MessageInfo(aMsg) : mInterruptStack.top();
        const MessageInfo childMsgInfo =
          (mSide == ChildSide) ? mInterruptStack.top() : MessageInfo(aMsg);
        switch (mListener->MediateInterruptRace(parentMsgInfo, childMsgInfo))
        {
          case RIPChildWins:
            winner = "child";
            defer = (mSide == ChildSide);
            break;
          case RIPParentWins:
            winner = "parent";
            defer = (mSide != ChildSide);
            break;
          case RIPError:
            NS_RUNTIMEABORT("NYI: 'Error' Interrupt race policy");
            return;
          default:
            NS_RUNTIMEABORT("not reached");
            return;
        }

        if (LoggingEnabled()) {
            printf_stderr("  (%s: %s won, so we're%sdeferring)\n",
                          (mSide == ChildSide) ? "child" : "parent",
                          winner,
                          defer ? " " : " not ");
        }

        if (defer) {
            // We now know the other side's stack has one more frame
            // than we thought.
            ++mRemoteStackDepthGuess; // decremented in MaybeProcessDeferred()
            mDeferred.push(Move(aMsg));
            return;
        }

        // We "lost" and need to process the other side's in-call. Don't need
        // to fix up the mRemoteStackDepthGuess here, because we're just about
        // to increment it in DispatchCall(), which will make it correct again.
    }

#ifdef OS_WIN
    SyncStackFrame frame(this, true);
#endif

    nsAutoPtr<Message> reply;

    ++mRemoteStackDepthGuess;
    Result rv = mListener->OnCallReceived(aMsg, *getter_Transfers(reply));
    --mRemoteStackDepthGuess;

    if (!MaybeHandleError(rv, aMsg, "DispatchInterruptMessage")) {
        reply = new Message();
        reply->set_interrupt();
        reply->set_reply();
        reply->set_reply_error();
    }
    reply->set_seqno(aMsg.seqno());

    MonitorAutoLock lock(*mMonitor);
    if (ChannelConnected == mChannelState) {
        mLink->SendMessage(reply.forget());
    }
}

void
MessageChannel::MaybeUndeferIncall()
{
    AssertWorkerThread();
    mMonitor->AssertCurrentThreadOwns();

    if (mDeferred.empty())
        return;

    size_t stackDepth = InterruptStackDepth();

    // the other side can only *under*-estimate our actual stack depth
    IPC_ASSERT(mDeferred.top().interrupt_remote_stack_depth_guess() <= stackDepth,
               "fatal logic error");

    // maybe time to process this message
    Message call(Move(mDeferred.top()));
    mDeferred.pop();

    // fix up fudge factor we added to account for race
    IPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
    --mRemoteStackDepthGuess;

    MOZ_RELEASE_ASSERT(call.nested_level() == IPC::Message::NOT_NESTED);
    RefPtr<MessageTask> task = new MessageTask(this, Move(call));
    mPending.insertBack(task);
    task->Post();
}

void
MessageChannel::ExitedCxxStack()
{
    mListener->OnExitedCxxStack();
    if (mSawInterruptOutMsg) {
        MonitorAutoLock lock(*mMonitor);
        // see long comment in OnMaybeDequeueOne()
        EnqueuePendingMessages();
        mSawInterruptOutMsg = false;
    }
}

void
MessageChannel::EnqueuePendingMessages()
{
    AssertWorkerThread();
    mMonitor->AssertCurrentThreadOwns();

    MaybeUndeferIncall();

    // XXX performance tuning knob: could process all or k pending
    // messages here, rather than enqueuing for later processing

    RepostAllMessages();
}

static inline bool
IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout)
{
    return (aTimeout != PR_INTERVAL_NO_TIMEOUT) &&
           (aTimeout <= (PR_IntervalNow() - aStart));
}

bool
MessageChannel::WaitResponse(bool aWaitTimedOut)
{
    if (aWaitTimedOut) {
        if (mInTimeoutSecondHalf) {
            // We've really timed out this time.
            return false;
        }
        // Try a second time.
        mInTimeoutSecondHalf = true;
    } else {
        mInTimeoutSecondHalf = false;
    }
    return true;
}

#ifndef OS_WIN
bool
MessageChannel::WaitForSyncNotify(bool /* aHandleWindowsMessages */)
{
#ifdef DEBUG
    // WARNING: We don't release the lock here. We can't because the link thread
    // could signal at this time and we would miss it. Instead we require
    // ArtificialTimeout() to be extremely simple.
    if (mListener->ArtificialTimeout()) {
        return false;
    }
#endif

    PRIntervalTime timeout = (kNoTimeout == mTimeoutMs) ?
                             PR_INTERVAL_NO_TIMEOUT :
                             PR_MillisecondsToInterval(mTimeoutMs);
    // XXX could optimize away this syscall for "no timeout" case if desired
    PRIntervalTime waitStart = PR_IntervalNow();

    mMonitor->Wait(timeout);

    // If the timeout didn't expire, we know we received an event. The
    // converse is not true.
    return WaitResponse(IsTimeoutExpired(waitStart, timeout));
}

bool
MessageChannel::WaitForInterruptNotify()
{
    return WaitForSyncNotify(true);
}

void
MessageChannel::NotifyWorkerThread()
{
    mMonitor->Notify();
}
#endif

bool
MessageChannel::ShouldContinueFromTimeout()
{
    AssertWorkerThread();
    mMonitor->AssertCurrentThreadOwns();

    bool cont;
    {
        MonitorAutoUnlock unlock(*mMonitor);
        cont = mListener->OnReplyTimeout();
        mListener->ArtificialSleep();
    }

    static enum { UNKNOWN, NOT_DEBUGGING, DEBUGGING } sDebuggingChildren = UNKNOWN;

    if (sDebuggingChildren == UNKNOWN) {
        sDebuggingChildren = getenv("MOZ_DEBUG_CHILD_PROCESS") ? DEBUGGING : NOT_DEBUGGING;
    }
    if (sDebuggingChildren == DEBUGGING) {
        return true;
    }

    return cont;
}

void
MessageChannel::SetReplyTimeoutMs(int32_t aTimeoutMs)
{
    // Set channel timeout value. Since this is broken up into
    // two period, the minimum timeout value is 2ms.
    AssertWorkerThread();
    mTimeoutMs = (aTimeoutMs <= 0)
                 ? kNoTimeout
                 : (int32_t)ceil((double)aTimeoutMs / 2.0);
}

void
MessageChannel::OnChannelConnected(int32_t peer_id)
{
    MOZ_RELEASE_ASSERT(!mPeerPidSet);
    mPeerPidSet = true;
    mPeerPid = peer_id;
    RefPtr<CancelableRunnable> task = mOnChannelConnectedTask;
    mWorkerLoop->PostTask(task.forget());
}

void
MessageChannel::DispatchOnChannelConnected()
{
    AssertWorkerThread();
    MOZ_RELEASE_ASSERT(mPeerPidSet);
    mListener->OnChannelConnected(mPeerPid);
}

void
MessageChannel::ReportMessageRouteError(const char* channelName) const
{
    PrintErrorMessage(mSide, channelName, "Need a route");
    mListener->OnProcessingError(MsgRouteError, "MsgRouteError");
}

void
MessageChannel::ReportConnectionError(const char* aChannelName, Message* aMsg) const
{
    AssertWorkerThread();
    mMonitor->AssertCurrentThreadOwns();

    const char* errorMsg = nullptr;
    switch (mChannelState) {
      case ChannelClosed:
        errorMsg = "Closed channel: cannot send/recv";
        break;
      case ChannelOpening:
        errorMsg = "Opening channel: not yet ready for send/recv";
        break;
      case ChannelTimeout:
        errorMsg = "Channel timeout: cannot send/recv";
        break;
      case ChannelClosing:
        errorMsg = "Channel closing: too late to send/recv, messages will be lost";
        break;
      case ChannelError:
        errorMsg = "Channel error: cannot send/recv";
        break;

      default:
        NS_RUNTIMEABORT("unreached");
    }

    if (aMsg) {
        char reason[512];
        SprintfLiteral(reason,"(msgtype=0x%X,name=%s) %s",
                       aMsg->type(), aMsg->name(), errorMsg);

        PrintErrorMessage(mSide, aChannelName, reason);
    } else {
        PrintErrorMessage(mSide, aChannelName, errorMsg);
    }

    MonitorAutoUnlock unlock(*mMonitor);
    mListener->OnProcessingError(MsgDropped, errorMsg);
}

bool
MessageChannel::MaybeHandleError(Result code, const Message& aMsg, const char* channelName)
{
    if (MsgProcessed == code)
        return true;

    const char* errorMsg = nullptr;
    switch (code) {
      case MsgNotKnown:
        errorMsg = "Unknown message: not processed";
        break;
      case MsgNotAllowed:
        errorMsg = "Message not allowed: cannot be sent/recvd in this state";
        break;
      case MsgPayloadError:
        errorMsg = "Payload error: message could not be deserialized";
        break;
      case MsgProcessingError:
        errorMsg = "Processing error: message was deserialized, but the handler returned false (indicating failure)";
        break;
      case MsgRouteError:
        errorMsg = "Route error: message sent to unknown actor ID";
        break;
      case MsgValueError:
        errorMsg = "Value error: message was deserialized, but contained an illegal value";
        break;

    default:
        NS_RUNTIMEABORT("unknown Result code");
        return false;
    }

    char reason[512];
    const char* msgname = StringFromIPCMessageType(aMsg.type());
    if (msgname[0] == '?') {
        SprintfLiteral(reason,"(msgtype=0x%X) %s", aMsg.type(), errorMsg);
    } else {
        SprintfLiteral(reason,"%s %s", msgname, errorMsg);
    }

    PrintErrorMessage(mSide, channelName, reason);

    mListener->OnProcessingError(code, reason);

    return false;
}

void
MessageChannel::OnChannelErrorFromLink()
{
    AssertLinkThread();
    mMonitor->AssertCurrentThreadOwns();

    IPC_LOG("OnChannelErrorFromLink");

    if (InterruptStackDepth() > 0)
        NotifyWorkerThread();

    if (AwaitingSyncReply() || AwaitingIncomingMessage())
        NotifyWorkerThread();

    if (ChannelClosing != mChannelState) {
        if (mAbortOnError) {
            NS_RUNTIMEABORT("Aborting on channel error.");
        }
        mChannelState = ChannelError;
        mMonitor->Notify();
    }

    PostErrorNotifyTask();
}

void
MessageChannel::NotifyMaybeChannelError()
{
    mMonitor->AssertNotCurrentThreadOwns();

    // TODO sort out Close() on this side racing with Close() on the other side
    if (ChannelClosing == mChannelState) {
        // the channel closed, but we received a "Goodbye" message warning us
        // about it. no worries
        mChannelState = ChannelClosed;
        NotifyChannelClosed();
        return;
    }

    Clear();

    // Oops, error!  Let the listener know about it.
    mChannelState = ChannelError;

    // IPDL assumes these notifications do not fire twice, so we do not let
    // that happen.
    if (mNotifiedChannelDone) {
      return;
    }
    mNotifiedChannelDone = true;

    // After this, the channel may be deleted.  Based on the premise that
    // mListener owns this channel, any calls back to this class that may
    // work with mListener should still work on living objects.
    mListener->OnChannelError();
}

void
MessageChannel::OnNotifyMaybeChannelError()
{
    AssertWorkerThread();
    mMonitor->AssertNotCurrentThreadOwns();

    mChannelErrorTask = nullptr;

    // OnChannelError holds mMonitor when it posts this task and this
    // task cannot be allowed to run until OnChannelError has
    // exited. We enforce that order by grabbing the mutex here which
    // should only continue once OnChannelError has completed.
    {
        MonitorAutoLock lock(*mMonitor);
        // nothing to do here
    }

    if (IsOnCxxStack()) {
        mChannelErrorTask =
            NewNonOwningCancelableRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError);
        RefPtr<Runnable> task = mChannelErrorTask;
        // 10 ms delay is completely arbitrary
        mWorkerLoop->PostDelayedTask(task.forget(), 10);
        return;
    }

    NotifyMaybeChannelError();
}

void
MessageChannel::PostErrorNotifyTask()
{
    mMonitor->AssertCurrentThreadOwns();

    if (mChannelErrorTask)
        return;

    // This must be the last code that runs on this thread!
    mChannelErrorTask =
        NewNonOwningCancelableRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError);
    RefPtr<Runnable> task = mChannelErrorTask;
    mWorkerLoop->PostTask(task.forget());
}

// Special async message.
class GoodbyeMessage : public IPC::Message
{
public:
    GoodbyeMessage() :
        IPC::Message(MSG_ROUTING_NONE, GOODBYE_MESSAGE_TYPE)
    {
    }
    static bool Read(const Message* msg) {
        return true;
    }
    void Log(const std::string& aPrefix, FILE* aOutf) const {
        fputs("(special `Goodbye' message)", aOutf);
    }
};

void
MessageChannel::SynchronouslyClose()
{
    AssertWorkerThread();
    mMonitor->AssertCurrentThreadOwns();
    mLink->SendClose();
    while (ChannelClosed != mChannelState)
        mMonitor->Wait();
}

void
MessageChannel::CloseWithError()
{
    AssertWorkerThread();

    MonitorAutoLock lock(*mMonitor);
    if (ChannelConnected != mChannelState) {
        return;
    }
    SynchronouslyClose();
    mChannelState = ChannelError;
    PostErrorNotifyTask();
}

void
MessageChannel::CloseWithTimeout()
{
    AssertWorkerThread();

    MonitorAutoLock lock(*mMonitor);
    if (ChannelConnected != mChannelState) {
        return;
    }
    SynchronouslyClose();
    mChannelState = ChannelTimeout;
}

void
MessageChannel::Close()
{
    AssertWorkerThread();

    {
        MonitorAutoLock lock(*mMonitor);

        if (ChannelError == mChannelState || ChannelTimeout == mChannelState) {
            // See bug 538586: if the listener gets deleted while the
            // IO thread's NotifyChannelError event is still enqueued
            // and subsequently deletes us, then the error event will
            // also be deleted and the listener will never be notified
            // of the channel error.
            if (mListener) {
                MonitorAutoUnlock unlock(*mMonitor);
                NotifyMaybeChannelError();
            }
            return;
        }

        if (ChannelOpening == mChannelState) {
            // SynchronouslyClose() waits for an ack from the other side, so
            // the opening sequence should complete before this returns.
            SynchronouslyClose();
            mChannelState = ChannelError;
            NotifyMaybeChannelError();
            return;
        }

        if (ChannelConnected != mChannelState) {
            // XXX be strict about this until there's a compelling reason
            // to relax
            NS_RUNTIMEABORT("Close() called on closed channel!");
        }

        // notify the other side that we're about to close our socket
        mLink->SendMessage(new GoodbyeMessage());
        SynchronouslyClose();
    }

    NotifyChannelClosed();
}

void
MessageChannel::NotifyChannelClosed()
{
    mMonitor->AssertNotCurrentThreadOwns();

    if (ChannelClosed != mChannelState)
        NS_RUNTIMEABORT("channel should have been closed!");

    Clear();

    // IPDL assumes these notifications do not fire twice, so we do not let
    // that happen.
    if (mNotifiedChannelDone) {
      return;
    }
    mNotifiedChannelDone = true;

    // OK, the IO thread just closed the channel normally.  Let the
    // listener know about it. After this point the channel may be
    // deleted.
    mListener->OnChannelClose();
}

void
MessageChannel::DebugAbort(const char* file, int line, const char* cond,
                           const char* why,
                           bool reply)
{
    printf_stderr("###!!! [MessageChannel][%s][%s:%d] "
                  "Assertion (%s) failed.  %s %s\n",
                  mSide == ChildSide ? "Child" : "Parent",
                  file, line, cond,
                  why,
                  reply ? "(reply)" : "");
    // technically we need the mutex for this, but we're dying anyway
    DumpInterruptStack("  ");
    printf_stderr("  remote Interrupt stack guess: %" PRIuSIZE "\n",
                  mRemoteStackDepthGuess);
    printf_stderr("  deferred stack size: %" PRIuSIZE "\n",
                  mDeferred.size());
    printf_stderr("  out-of-turn Interrupt replies stack size: %" PRIuSIZE "\n",
                  mOutOfTurnReplies.size());

    MessageQueue pending = Move(mPending);
    while (!pending.isEmpty()) {
        printf_stderr("    [ %s%s ]\n",
                      pending.getFirst()->Msg().is_interrupt() ? "intr" :
                      (pending.getFirst()->Msg().is_sync() ? "sync" : "async"),
                      pending.getFirst()->Msg().is_reply() ? "reply" : "");
        pending.popFirst();
    }

    NS_RUNTIMEABORT(why);
}

void
MessageChannel::DumpInterruptStack(const char* const pfx) const
{
    NS_WARNING_ASSERTION(
      MessageLoop::current() != mWorkerLoop,
      "The worker thread had better be paused in a debugger!");

    printf_stderr("%sMessageChannel 'backtrace':\n", pfx);

    // print a python-style backtrace, first frame to last
    for (uint32_t i = 0; i < mCxxStackFrames.length(); ++i) {
        int32_t id;
        const char* dir;
        const char* sems;
        const char* name;
        mCxxStackFrames[i].Describe(&id, &dir, &sems, &name);

        printf_stderr("%s[(%u) %s %s %s(actor=%d) ]\n", pfx,
                      i, dir, sems, name, id);
    }
}

int32_t
MessageChannel::GetTopmostMessageRoutingId() const
{
    MOZ_RELEASE_ASSERT(MessageLoop::current() == mWorkerLoop);
    if (mCxxStackFrames.empty()) {
        return MSG_ROUTING_NONE;
    }
    const InterruptFrame& frame = mCxxStackFrames.back();
    return frame.GetRoutingId();
}

void
MessageChannel::EndTimeout()
{
    mMonitor->AssertCurrentThreadOwns();

    IPC_LOG("Ending timeout of seqno=%d", mTimedOutMessageSeqno);
    mTimedOutMessageSeqno = 0;
    mTimedOutMessageNestedLevel = 0;

    RepostAllMessages();
}

void
MessageChannel::RepostAllMessages()
{
    bool needRepost = false;
    for (RefPtr<MessageTask> task : mPending) {
        if (!task->IsScheduled()) {
            needRepost = true;
        }
    }
    if (!needRepost) {
        // If everything is already scheduled to run, do nothing.
        return;
    }

    // In some cases we may have deferred dispatch of some messages in the
    // queue. Now we want to run them again. However, we can't just re-post
    // those messages since the messages after them in mPending would then be
    // before them in the event queue. So instead we cancel everything and
    // re-post all messages in the correct order.
    MessageQueue queue = Move(mPending);
    while (RefPtr<MessageTask> task = queue.popFirst()) {
        RefPtr<MessageTask> newTask = new MessageTask(this, Move(task->Msg()));
        mPending.insertBack(newTask);
        newTask->Post();
    }
}

void
MessageChannel::CancelTransaction(int transaction)
{
    mMonitor->AssertCurrentThreadOwns();

    // When we cancel a transaction, we need to behave as if there's no longer
    // any IPC on the stack. Anything we were dispatching or sending will get
    // canceled. Consequently, we have to update the state variables below.
    //
    // We also need to ensure that when any IPC functions on the stack return,
    // they don't reset these values using an RAII class like AutoSetValue. To
    // avoid that, these RAII classes check if the variable they set has been
    // tampered with (by us). If so, they don't reset the variable to the old
    // value.

    IPC_LOG("CancelTransaction: xid=%d", transaction);

    // An unusual case: We timed out a transaction which the other side then
    // cancelled. In this case we just leave the timedout state and try to
    // forget this ever happened.
    if (transaction == mTimedOutMessageSeqno) {
        IPC_LOG("Cancelled timed out message %d", mTimedOutMessageSeqno);
        EndTimeout();

        // Normally mCurrentTransaction == 0 here. But it can be non-zero if:
        // 1. Parent sends NESTED_INSIDE_SYNC message H.
        // 2. Parent times out H.
        // 3. Child dispatches H and sends nested message H' (same transaction).
        // 4. Parent dispatches H' and cancels.
        MOZ_RELEASE_ASSERT(!mTransactionStack || mTransactionStack->TransactionID() == transaction);
        if (mTransactionStack) {
            mTransactionStack->Cancel();
        }
    } else {
        MOZ_RELEASE_ASSERT(mTransactionStack->TransactionID() == transaction);
        mTransactionStack->Cancel();
    }

    bool foundSync = false;
    for (RefPtr<MessageTask> p = mPending.getFirst(); p; ) {
        Message &msg = p->Msg();

        // If there was a race between the parent and the child, then we may
        // have a queued sync message. We want to drop this message from the
        // queue since if will get cancelled along with the transaction being
        // cancelled. This happens if the message in the queue is NESTED_INSIDE_SYNC.
        if (msg.is_sync() && msg.nested_level() != IPC::Message::NOT_NESTED) {
            MOZ_RELEASE_ASSERT(!foundSync);
            MOZ_RELEASE_ASSERT(msg.transaction_id() != transaction);
            IPC_LOG("Removing msg from queue seqno=%d xid=%d", msg.seqno(), msg.transaction_id());
            foundSync = true;
            p = p->removeAndGetNext();
            continue;
        }

        p = p->getNext();
    }
}

bool
MessageChannel::IsInTransaction() const
{
    MonitorAutoLock lock(*mMonitor);
    return !!mTransactionStack;
}

void
MessageChannel::CancelCurrentTransaction()
{
    MonitorAutoLock lock(*mMonitor);
    if (DispatchingSyncMessageNestedLevel() >= IPC::Message::NESTED_INSIDE_SYNC) {
        if (DispatchingSyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW ||
            DispatchingAsyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW)
        {
            mListener->IntentionalCrash();
        }

        IPC_LOG("Cancel requested: current xid=%d", CurrentNestedInsideSyncTransaction());
        MOZ_RELEASE_ASSERT(DispatchingSyncMessage());
        CancelMessage *cancel = new CancelMessage(CurrentNestedInsideSyncTransaction());
        CancelTransaction(CurrentNestedInsideSyncTransaction());
        mLink->SendMessage(cancel);
    }
}

void
CancelCPOWs()
{
    if (gParentProcessBlocker) {
        mozilla::Telemetry::Accumulate(mozilla::Telemetry::IPC_TRANSACTION_CANCEL, true);
        gParentProcessBlocker->CancelCurrentTransaction();
    }
}

} // namespace ipc
} // namespace mozilla