Bug 699319 - Part 1: Abstract out the mTransport and I/O thread into the Link abstraction. r=cjones
authorNiko Matsakis <nmatsakis@mozilla.com>
Wed, 30 Nov 2011 08:24:46 -0800
changeset 83116 d5b16492bc5798b7c97c16edb4df19af5ec35c1d
parent 83115 adc488386b171100530e87b68c752b8bc8cd951d
child 83117 fb54dde96ed2dcbaa0e36a5c1204a37e529c00cd
push id519
push userakeybl@mozilla.com
push dateWed, 01 Feb 2012 00:38:35 +0000
treeherdermozilla-beta@788ea1ef610b [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerscjones
bugs699319
milestone11.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 699319 - Part 1: Abstract out the mTransport and I/O thread into the Link abstraction. r=cjones
ipc/glue/AsyncChannel.cpp
ipc/glue/AsyncChannel.h
ipc/glue/RPCChannel.cpp
ipc/glue/RPCChannel.h
ipc/glue/SyncChannel.cpp
ipc/glue/SyncChannel.h
ipc/glue/WindowsMessageLoop.cpp
--- a/ipc/glue/AsyncChannel.cpp
+++ b/ipc/glue/AsyncChannel.cpp
@@ -49,16 +49,23 @@ using mozilla::MonitorAutoLock;
 
 template<>
 struct RunnableMethodTraits<mozilla::ipc::AsyncChannel>
 {
     static void RetainCallee(mozilla::ipc::AsyncChannel* obj) { }
     static void ReleaseCallee(mozilla::ipc::AsyncChannel* obj) { }
 };
 
+template<>
+struct RunnableMethodTraits<mozilla::ipc::AsyncChannel::ProcessLink>
+{
+    static void RetainCallee(mozilla::ipc::AsyncChannel::ProcessLink* obj) { }
+    static void ReleaseCallee(mozilla::ipc::AsyncChannel::ProcessLink* obj) { }
+};
+
 // We rely on invariants about the lifetime of the transport:
 //
 //  - outlives this AsyncChannel
 //  - deleted on the IO thread
 //
 // These invariants allow us to send messages directly through the
 // transport without having to worry about orphaned Send() tasks on
 // the IO thread touching AsyncChannel memory after it's been deleted
@@ -97,106 +104,178 @@ public:
     }
 };
 
 } // namespace <anon>
 
 namespace mozilla {
 namespace ipc {
 
-AsyncChannel::AsyncChannel(AsyncListener* aListener)
-  : mTransport(0),
-    mListener(aListener),
-    mChannelState(ChannelClosed),
-    mMonitor("mozilla.ipc.AsyncChannel.mMonitor"),
-    mIOLoop(),
-    mWorkerLoop(),
-    mChild(false),
-    mChannelErrorTask(NULL),
-    mExistingListener(NULL)
+AsyncChannel::Link::Link(AsyncChannel *aChan)
+    : mChan(aChan)
+{
+}
+
+AsyncChannel::Link::~Link()
 {
-    MOZ_COUNT_CTOR(AsyncChannel);
+    mChan = 0;
+}
+
+AsyncChannel::ProcessLink::ProcessLink(AsyncChannel *aChan)
+    : Link(aChan)
+    , mExistingListener(NULL)
+{
 }
 
-AsyncChannel::~AsyncChannel()
+AsyncChannel::ProcessLink::~ProcessLink()
 {
-    MOZ_COUNT_DTOR(AsyncChannel);
-    Clear();
+    mIOLoop = 0;
+    if (mTransport) {
+        mTransport->set_listener(0);
+        
+        // we only hold a weak ref to the transport, which is "owned"
+        // by GeckoChildProcess/GeckoThread
+        mTransport = 0;
+    }
 }
 
-bool
-AsyncChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide)
+void 
+AsyncChannel::ProcessLink::Open(mozilla::ipc::Transport* aTransport,
+                                MessageLoop *aIOLoop,
+                                Side aSide)
 {
-    NS_PRECONDITION(!mTransport, "Open() called > once");
     NS_PRECONDITION(aTransport, "need transport layer");
 
     // FIXME need to check for valid channel
 
     mTransport = aTransport;
     mExistingListener = mTransport->set_listener(this);
 
     // FIXME figure out whether we're in parent or child, grab IO loop
     // appropriately
     bool needOpen = true;
     if(aIOLoop) {
         // We're a child or using the new arguments.  Either way, we
         // need an open.
         needOpen = true;
-        mChild = (aSide == Unknown) || (aSide == Child);
+        mChan->mChild = (aSide == AsyncChannel::Unknown) || (aSide == AsyncChannel::Child);
     } else {
         NS_PRECONDITION(aSide == Unknown, "expected default side arg");
 
         // parent
-        mChild = false;
+        mChan->mChild = false;
         needOpen = false;
         aIOLoop = XRE_GetIOMessageLoop();
         // FIXME assuming that the parent waits for the OnConnected event.
         // FIXME see GeckoChildProcessHost.cpp.  bad assumption!
-        mChannelState = ChannelConnected;
+        mChan->mChannelState = ChannelConnected;
     }
 
     mIOLoop = aIOLoop;
-    mWorkerLoop = MessageLoop::current();
 
     NS_ASSERTION(mIOLoop, "need an IO loop");
-    NS_ASSERTION(mWorkerLoop, "need a worker loop");
+    NS_ASSERTION(mChan->mWorkerLoop, "need a worker loop");
 
     if (needOpen) {             // child process
-        MonitorAutoLock lock(mMonitor);
+        MonitorAutoLock lock(*mChan->mMonitor);
 
         mIOLoop->PostTask(FROM_HERE, 
-                          NewRunnableMethod(this,
-                                            &AsyncChannel::OnChannelOpened));
+                          NewRunnableMethod(this, &ProcessLink::OnChannelOpened));
 
         // FIXME/cjones: handle errors
-        while (mChannelState != ChannelConnected) {
-            mMonitor.Wait();
+        while (mChan->mChannelState != ChannelConnected) {
+            mChan->mMonitor->Wait();
         }
     }
+}
 
+void
+AsyncChannel::ProcessLink::EchoMessage(Message *msg)
+{
+    // NB: Go through this OnMessageReceived indirection so that
+    // echoing this message does the right thing for SyncChannel
+    // and RPCChannel too
+    mIOLoop->PostTask(
+        FROM_HERE,
+        NewRunnableMethod(this, &ProcessLink::OnEchoMessage, msg));
+    // OnEchoMessage takes ownership of |msg|
+}
+
+void
+AsyncChannel::ProcessLink::SendMessage(Message *msg)
+{
+    mChan->AssertWorkerThread();
+    mIOLoop->PostTask(
+        FROM_HERE,
+        NewRunnableMethod(mTransport, &Transport::Send, msg));
+}
+
+void
+AsyncChannel::ProcessLink::SendClose()
+{
+    mChan->AssertWorkerThread();
+    mChan->mMonitor->AssertCurrentThreadOwns();
+
+    mIOLoop->PostTask(
+        FROM_HERE, NewRunnableMethod(this, &ProcessLink::OnCloseChannel));
+}
+
+AsyncChannel::AsyncChannel(AsyncListener* aListener)
+  : mListener(aListener),
+    mChannelState(ChannelClosed),
+    mWorkerLoop(),
+    mChild(false),
+    mChannelErrorTask(NULL),
+    mLink(NULL)
+{
+    MOZ_COUNT_CTOR(AsyncChannel);
+}
+
+AsyncChannel::~AsyncChannel()
+{
+    MOZ_COUNT_DTOR(AsyncChannel);
+    Clear();
+}
+
+bool
+AsyncChannel::Open(Transport* aTransport,
+                   MessageLoop* aIOLoop,
+                   AsyncChannel::Side aSide)
+{
+    ProcessLink *link;
+    NS_PRECONDITION(!mLink, "Open() called > once");
+    mMonitor = new RefCountedMonitor();
+    mWorkerLoop = MessageLoop::current();
+    mLink = link = new ProcessLink(this);
+    link->Open(aTransport, aIOLoop, aSide); // n.b.: sets mChild
     return true;
 }
 
 void
 AsyncChannel::Close()
 {
     AssertWorkerThread();
 
     {
-        MonitorAutoLock lock(mMonitor);
+        // n.b.: We increase the ref count of monitor temporarily
+        //       for the duration of this block.  Otherwise, the
+        //       function NotifyMaybeChannelError() will call
+        //       ::Clear() which can free the monitor.
+        nsRefPtr<RefCountedMonitor> monitor(mMonitor);
+        MonitorAutoLock lock(*monitor);
 
         if (ChannelError == mChannelState ||
             ChannelTimeout == mChannelState) {
             // See bug 538586: if the listener gets deleted while the
             // IO thread's NotifyChannelError event is still enqueued
             // and subsequently deletes us, then the error event will
             // also be deleted and the listener will never be notified
             // of the channel error.
             if (mListener) {
-                MonitorAutoUnlock unlock(mMonitor);
+                MonitorAutoUnlock unlock(*monitor);
                 NotifyMaybeChannelError();
             }
             return;
         }
 
         if (ChannelConnected != mChannelState)
             // XXX be strict about this until there's a compelling reason
             // to relax
@@ -212,70 +291,61 @@ AsyncChannel::Close()
 
     NotifyChannelClosed();
 }
 
 void 
 AsyncChannel::SynchronouslyClose()
 {
     AssertWorkerThread();
-    mMonitor.AssertCurrentThreadOwns();
-
-    mIOLoop->PostTask(
-        FROM_HERE, NewRunnableMethod(this, &AsyncChannel::OnCloseChannel));
-
+    mMonitor->AssertCurrentThreadOwns();
+    mLink->SendClose();
     while (ChannelClosed != mChannelState)
-        mMonitor.Wait();
+        mMonitor->Wait();
 }
 
 bool
 AsyncChannel::Send(Message* _msg)
 {
     nsAutoPtr<Message> msg(_msg);
     AssertWorkerThread();
-    mMonitor.AssertNotCurrentThreadOwns();
+    mMonitor->AssertNotCurrentThreadOwns();
     NS_ABORT_IF_FALSE(MSG_ROUTING_NONE != msg->routing_id(), "need a route");
 
     {
-        MonitorAutoLock lock(mMonitor);
+        MonitorAutoLock lock(*mMonitor);
 
         if (!Connected()) {
             ReportConnectionError("AsyncChannel");
             return false;
         }
 
-        SendThroughTransport(msg.forget());
+        mLink->SendMessage(msg.forget());
     }
 
     return true;
 }
 
 bool
 AsyncChannel::Echo(Message* _msg)
 {
     nsAutoPtr<Message> msg(_msg);
     AssertWorkerThread();
-    mMonitor.AssertNotCurrentThreadOwns();
+    mMonitor->AssertNotCurrentThreadOwns();
     NS_ABORT_IF_FALSE(MSG_ROUTING_NONE != msg->routing_id(), "need a route");
 
     {
-        MonitorAutoLock lock(mMonitor);
+        MonitorAutoLock lock(*mMonitor);
 
         if (!Connected()) {
             ReportConnectionError("AsyncChannel");
             return false;
         }
 
-        // NB: Go through this OnMessageReceived indirection so that
-        // echoing this message does the right thing for SyncChannel
-        // and RPCChannel too
-        mIOLoop->PostTask(
-            FROM_HERE,
-            NewRunnableMethod(this, &AsyncChannel::OnEchoMessage, msg.forget()));
-        // OnEchoMessage takes ownership of |msg|
+        mLink->EchoMessage(msg.forget());
     }
 
     return true;
 }
 
 void
 AsyncChannel::OnDispatchMessage(const Message& msg)
 {
@@ -301,41 +371,31 @@ AsyncChannel::OnSpecialMessage(uint16 id
 {
     return false;
 }
 
 void
 AsyncChannel::SendSpecialMessage(Message* msg) const
 {
     AssertWorkerThread();
-    SendThroughTransport(msg);
-}
-
-void
-AsyncChannel::SendThroughTransport(Message* msg) const
-{
-    AssertWorkerThread();
-
-    mIOLoop->PostTask(
-        FROM_HERE,
-        NewRunnableMethod(mTransport, &Transport::Send, msg));
+    mLink->SendMessage(msg);
 }
 
 void
 AsyncChannel::OnNotifyMaybeChannelError()
 {
     AssertWorkerThread();
-    mMonitor.AssertNotCurrentThreadOwns();
+    mMonitor->AssertNotCurrentThreadOwns();
 
     // OnChannelError holds mMonitor when it posts this task and this
     // task cannot be allowed to run until OnChannelError has
     // exited. We enforce that order by grabbing the mutex here which
     // should only continue once OnChannelError has completed.
     {
-        MonitorAutoLock lock(mMonitor);
+        MonitorAutoLock lock(*mMonitor);
         // nothing to do here
     }
 
     if (ShouldDeferNotifyMaybeError()) {
         mChannelErrorTask =
             NewRunnableMethod(this, &AsyncChannel::OnNotifyMaybeChannelError);
         // 10 ms delay is completely arbitrary
         mWorkerLoop->PostDelayedTask(FROM_HERE, mChannelErrorTask, 10);
@@ -343,32 +403,32 @@ AsyncChannel::OnNotifyMaybeChannelError(
     }
 
     NotifyMaybeChannelError();
 }
 
 void
 AsyncChannel::NotifyChannelClosed()
 {
-    mMonitor.AssertNotCurrentThreadOwns();
+    mMonitor->AssertNotCurrentThreadOwns();
 
     if (ChannelClosed != mChannelState)
         NS_RUNTIMEABORT("channel should have been closed!");
 
     // OK, the IO thread just closed the channel normally.  Let the
     // listener know about it.
     mListener->OnChannelClose();
 
     Clear();
 }
 
 void
 AsyncChannel::NotifyMaybeChannelError()
 {
-    mMonitor.AssertNotCurrentThreadOwns();
+    mMonitor->AssertNotCurrentThreadOwns();
 
     // TODO sort out Close() on this side racing with Close() on the
     // other side
     if (ChannelClosing == mChannelState) {
         // the channel closed, but we received a "Goodbye" message
         // warning us about it. no worries
         mChannelState = ChannelClosed;
         NotifyChannelClosed();
@@ -381,26 +441,22 @@ AsyncChannel::NotifyMaybeChannelError()
 
     Clear();
 }
 
 void
 AsyncChannel::Clear()
 {
     mListener = 0;
-    mIOLoop = 0;
     mWorkerLoop = 0;
 
-    if (mTransport) {
-        mTransport->set_listener(0);
+    delete mLink;
+    mLink = 0;
+    mMonitor = 0;
 
-        // we only hold a weak ref to the transport, which is "owned"
-        // by GeckoChildProcess/GeckoThread
-        mTransport = 0;
-    }
     if (mChannelErrorTask) {
         mChannelErrorTask->Cancel();
         mChannelErrorTask = NULL;
     }
 }
 
 static void
 PrintErrorMessage(bool isChild, const char* channelName, const char* msg)
@@ -475,139 +531,159 @@ AsyncChannel::ReportConnectionError(cons
         NS_RUNTIMEABORT("unreached");
     }
 
     PrintErrorMessage(mChild, channelName, errorMsg);
 
     mListener->OnProcessingError(MsgDropped);
 }
 
+void
+AsyncChannel::DispatchOnChannelConnected(int32 peer_pid)
+{
+    AssertWorkerThread();
+    if (mListener)
+        mListener->OnChannelConnected(peer_pid);
+}
+
 //
 // The methods below run in the context of the IO thread
 //
 
 void
-AsyncChannel::OnMessageReceived(const Message& msg)
+AsyncChannel::ProcessLink::OnMessageReceived(const Message& msg)
+{
+    AssertIOThread();
+    NS_ASSERTION(mChan->mChannelState != ChannelError, "Shouldn't get here!");
+    MonitorAutoLock lock(*mChan->mMonitor);
+    mChan->OnMessageReceivedFromLink(msg);
+}
+
+void
+AsyncChannel::ProcessLink::OnEchoMessage(Message* msg)
+{
+    AssertIOThread();
+    OnMessageReceived(*msg);
+    delete msg;
+}
+
+void
+AsyncChannel::ProcessLink::OnChannelOpened()
+{
+    mChan->AssertLinkThread();
+    {
+        MonitorAutoLock lock(*mChan->mMonitor);
+        mChan->mChannelState = ChannelOpening;
+    }
+    /*assert*/mTransport->Connect();
+}
+
+void
+AsyncChannel::ProcessLink::OnChannelConnected(int32 peer_pid)
 {
     AssertIOThread();
-    NS_ASSERTION(mChannelState != ChannelError, "Shouldn't get here!");
+
+    {
+        MonitorAutoLock lock(*mChan->mMonitor);
+        mChan->mChannelState = ChannelConnected;
+        mChan->mMonitor->Notify();
+    }
+
+    if(mExistingListener)
+        mExistingListener->OnChannelConnected(peer_pid);
+
+    mChan->mWorkerLoop->PostTask(
+        FROM_HERE, 
+        NewRunnableMethod(mChan, 
+                          &AsyncChannel::DispatchOnChannelConnected, 
+                          peer_pid));
+}
 
-    MonitorAutoLock lock(mMonitor);
+void
+AsyncChannel::ProcessLink::OnChannelError()
+{
+    AssertIOThread();
+    MonitorAutoLock lock(*mChan->mMonitor);
+    mChan->OnChannelErrorFromLink();
+}
+
+void
+AsyncChannel::ProcessLink::OnCloseChannel()
+{
+    AssertIOThread();
+
+    mTransport->Close();
+
+    MonitorAutoLock lock(*mChan->mMonitor);
+    mChan->mChannelState = ChannelClosed;
+    mChan->mMonitor->Notify();
+}
+
+//
+// The methods below run in the context of the link thread
+//
+
+void
+AsyncChannel::OnMessageReceivedFromLink(const Message& msg)
+{
+    AssertLinkThread();
+    mMonitor->AssertCurrentThreadOwns();
 
     if (!MaybeInterceptSpecialIOMessage(msg))
         // wake up the worker, there's work to do
         mWorkerLoop->PostTask(
             FROM_HERE,
             NewRunnableMethod(this, &AsyncChannel::OnDispatchMessage, msg));
 }
 
 void
-AsyncChannel::OnEchoMessage(Message* msg)
-{
-    AssertIOThread();
-    OnMessageReceived(*msg);
-    delete msg;
-}
-
-void
-AsyncChannel::OnChannelOpened()
-{
-    AssertIOThread();
-    {
-        MonitorAutoLock lock(mMonitor);
-        mChannelState = ChannelOpening;
-    }
-    /*assert*/mTransport->Connect();
-}
-
-void
-AsyncChannel::DispatchOnChannelConnected(int32 peer_pid)
+AsyncChannel::OnChannelErrorFromLink()
 {
-    AssertWorkerThread();
-    if (mListener)
-        mListener->OnChannelConnected(peer_pid);
-}
-
-void
-AsyncChannel::OnChannelConnected(int32 peer_pid)
-{
-    AssertIOThread();
-
-    {
-        MonitorAutoLock lock(mMonitor);
-        mChannelState = ChannelConnected;
-        mMonitor.Notify();
-    }
-
-    if(mExistingListener)
-        mExistingListener->OnChannelConnected(peer_pid);
-
-    mWorkerLoop->PostTask(FROM_HERE, NewRunnableMethod(this, 
-                                                       &AsyncChannel::DispatchOnChannelConnected, 
-                                                       peer_pid));
-}
-
-void
-AsyncChannel::OnChannelError()
-{
-    AssertIOThread();
-
-    MonitorAutoLock lock(mMonitor);
+    AssertLinkThread();
+    mMonitor->AssertCurrentThreadOwns();
 
     if (ChannelClosing != mChannelState)
         mChannelState = ChannelError;
 
     PostErrorNotifyTask();
 }
 
 void
 AsyncChannel::PostErrorNotifyTask()
 {
-    AssertIOThread();
-    mMonitor.AssertCurrentThreadOwns();
+    AssertLinkThread();
+    mMonitor->AssertCurrentThreadOwns();
 
     NS_ASSERTION(!mChannelErrorTask, "OnChannelError called twice?");
 
     // This must be the last code that runs on this thread!
     mChannelErrorTask =
         NewRunnableMethod(this, &AsyncChannel::OnNotifyMaybeChannelError);
     mWorkerLoop->PostTask(FROM_HERE, mChannelErrorTask);
 }
 
-void
-AsyncChannel::OnCloseChannel()
-{
-    AssertIOThread();
-
-    mTransport->Close();
-
-    MonitorAutoLock lock(mMonitor);
-    mChannelState = ChannelClosed;
-    mMonitor.Notify();
-}
-
 bool
 AsyncChannel::MaybeInterceptSpecialIOMessage(const Message& msg)
 {
-    AssertIOThread();
-    mMonitor.AssertCurrentThreadOwns();
+    AssertLinkThread();
+    mMonitor->AssertCurrentThreadOwns();
 
     if (MSG_ROUTING_NONE == msg.routing_id()
         && GOODBYE_MESSAGE_TYPE == msg.type()) {
         ProcessGoodbyeMessage();
         return true;
     }
     return false;
 }
 
 void
 AsyncChannel::ProcessGoodbyeMessage()
 {
-    AssertIOThread();
-    mMonitor.AssertCurrentThreadOwns();
+    AssertLinkThread();
+    mMonitor->AssertCurrentThreadOwns();
 
     // TODO sort out Close() on this side racing with Close() on the
     // other side
     mChannelState = ChannelClosing;
 
     printf("NOTE: %s process received `Goodbye', closing down\n",
            mChild ? "child" : "parent");
 }
--- a/ipc/glue/AsyncChannel.h
+++ b/ipc/glue/AsyncChannel.h
@@ -60,17 +60,28 @@ struct HasResultCodes
         MsgNotAllowed,
         MsgPayloadError,
         MsgProcessingError,
         MsgRouteError,
         MsgValueError,
     };
 };
 
-class AsyncChannel : public Transport::Listener, protected HasResultCodes
+
+class RefCountedMonitor : public Monitor
+{
+public:
+    RefCountedMonitor() 
+        : Monitor("mozilla.ipc.AsyncChannel.mMonitor")
+    {}
+
+    NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedMonitor)
+};
+
+class AsyncChannel : protected HasResultCodes
 {
 protected:
     typedef mozilla::Monitor Monitor;
 
     enum ChannelState {
         ChannelClosed,
         ChannelOpening,
         ChannelConnected,
@@ -120,86 +131,138 @@ public:
     // Asynchronously deliver a message back to this side of the
     // channel
     virtual bool Echo(Message* msg);
 
     // Send OnChannelConnected notification to listeners.
     void DispatchOnChannelConnected(int32 peer_pid);
 
     //
-    // These methods are called on the "IO" thread
+    // Each AsyncChannel is associated with either a ProcessLink or a
+    // ThreadLink via the field mLink.  The type of link is determined
+    // by whether this AsyncChannel is communicating with another
+    // process or another thread.  In the former case, file
+    // descriptors or a socket are used via the I/O queue.  In the
+    // latter case, messages are enqueued directly onto the target
+    // thread's work queue.
     //
 
-    // Implement the Transport::Listener interface
-    NS_OVERRIDE virtual void OnMessageReceived(const Message& msg);
-    NS_OVERRIDE virtual void OnChannelConnected(int32 peer_pid);
-    NS_OVERRIDE virtual void OnChannelError();
+    class Link {
+    protected:
+        AsyncChannel *mChan;
+
+    public:
+        Link(AsyncChannel *aChan);
+        virtual ~Link();
+
+        // n.b.: These methods all require that the channel monitor is
+        // held when they are invoked.
+        virtual void EchoMessage(Message *msg) = 0;
+        virtual void SendMessage(Message *msg) = 0;
+        virtual void SendClose() = 0;
+    };
+
+    class ProcessLink : public Link, public Transport::Listener {
+    protected:
+        Transport* mTransport;
+        MessageLoop* mIOLoop;       // thread where IO happens
+        Transport::Listener* mExistingListener; // channel's previous listener
+    
+        void OnCloseChannel();
+        void OnChannelOpened();
+        void OnEchoMessage(Message* msg);
+
+        void AssertIOThread() const
+        {
+            NS_ABORT_IF_FALSE(mIOLoop == MessageLoop::current(),
+                              "not on I/O thread!");
+        }
+
+    public:
+        ProcessLink(AsyncChannel *chan);
+        virtual ~ProcessLink();
+        void Open(Transport* aTransport, MessageLoop *aIOLoop, Side aSide);
+        
+        // Run on the I/O thread, only when using inter-process link.
+        // These methods acquire the monitor and forward to the
+        // similarly named methods in AsyncChannel below
+        // (OnMessageReceivedFromLink(), etc)
+        NS_OVERRIDE virtual void OnMessageReceived(const Message& msg);
+        NS_OVERRIDE virtual void OnChannelConnected(int32 peer_pid);
+        NS_OVERRIDE virtual void OnChannelError();
+
+        virtual void EchoMessage(Message *msg);
+        virtual void SendMessage(Message *msg);
+        virtual void SendClose();
+    };
 
 protected:
+    // The "link" thread is either the I/O thread (ProcessLink) or the
+    // other actor's work thread (ThreadLink).  In either case, it is
+    // NOT our worker thread.
+    void AssertLinkThread() const
+    {
+        NS_ABORT_IF_FALSE(mWorkerLoop != MessageLoop::current(),
+                          "on worker thread but should not be!");
+    }
+
     // Can be run on either thread
     void AssertWorkerThread() const
     {
         NS_ABORT_IF_FALSE(mWorkerLoop == MessageLoop::current(),
                           "not on worker thread!");
     }
 
-    void AssertIOThread() const
-    {
-        NS_ABORT_IF_FALSE(mIOLoop == MessageLoop::current(),
-                          "not on IO thread!");
+    bool Connected() const {
+        mMonitor->AssertCurrentThreadOwns();
+        return ChannelConnected == mChannelState;
     }
 
-    bool Connected() const {
-        mMonitor.AssertCurrentThreadOwns();
-        return ChannelConnected == mChannelState;
-    }
+    // Return true if |msg| is a special message targeted at the IO
+    // thread, in which case it shouldn't be delivered to the worker.
+    virtual bool MaybeInterceptSpecialIOMessage(const Message& msg);
+    void ProcessGoodbyeMessage();
+
+    // Runs on the link thread. Invoked either from the I/O thread methods above
+    // or directly from the other actor if using a thread-based link.
+    // 
+    // n.b.: mMonitor is always held when these methods are invoked.
+    // In the case of a ProcessLink, it is acquired by the ProcessLink.
+    // In the case of a ThreadLink, it is acquired by the other actor, 
+    // which then invokes these methods directly.
+    virtual void OnMessageReceivedFromLink(const Message& msg);
+    virtual void OnChannelErrorFromLink();
+    void PostErrorNotifyTask();
 
     // Run on the worker thread
     void OnDispatchMessage(const Message& aMsg);
     virtual bool OnSpecialMessage(uint16 id, const Message& msg);
     void SendSpecialMessage(Message* msg) const;
 
     // Tell the IO thread to close the channel and wait for it to ACK.
     void SynchronouslyClose();
 
     bool MaybeHandleError(Result code, const char* channelName);
     void ReportConnectionError(const char* channelName) const;
 
     // Run on the worker thread
 
-    void SendThroughTransport(Message* msg) const;
-
     void OnNotifyMaybeChannelError();
     virtual bool ShouldDeferNotifyMaybeError() const {
         return false;
     }
     void NotifyChannelClosed();
     void NotifyMaybeChannelError();
 
     virtual void Clear();
 
-    // Run on the IO thread
-
-    void OnChannelOpened();
-    void OnCloseChannel();
-    void PostErrorNotifyTask();
-    void OnEchoMessage(Message* msg);
-
-    // Return true if |msg| is a special message targeted at the IO
-    // thread, in which case it shouldn't be delivered to the worker.
-    bool MaybeInterceptSpecialIOMessage(const Message& msg);
-    void ProcessGoodbyeMessage();
-
-    Transport* mTransport;
     AsyncListener* mListener;
     ChannelState mChannelState;
-    Monitor mMonitor;
-    MessageLoop* mIOLoop;       // thread where IO happens
+    nsRefPtr<RefCountedMonitor> mMonitor;
     MessageLoop* mWorkerLoop;   // thread where work is done
     bool mChild;                // am I the child or parent?
     CancelableTask* mChannelErrorTask; // NotifyMaybeChannelError runnable
-    Transport::Listener* mExistingListener; // channel's previous listener
+    Link *mLink;                // link to other thread/process
 };
 
-
 } // namespace ipc
 } // namespace mozilla
 #endif  // ifndef ipc_glue_AsyncChannel_h
--- a/ipc/glue/RPCChannel.cpp
+++ b/ipc/glue/RPCChannel.cpp
@@ -119,17 +119,17 @@ RPCChannel::Clear()
 
     AsyncChannel::Clear();
 }
 
 bool
 RPCChannel::EventOccurred() const
 {
     AssertWorkerThread();
-    mMonitor.AssertCurrentThreadOwns();
+    mMonitor->AssertCurrentThreadOwns();
     RPC_ASSERT(StackDepth() > 0, "not in wait loop");
 
     return (!Connected() ||
             !mPending.empty() ||
             (!mOutOfTurnReplies.empty() &&
              mOutOfTurnReplies.find(mStack.top().seqno())
              != mOutOfTurnReplies.end()));
 }
@@ -150,41 +150,41 @@ RPCChannel::Send(Message* msg, Message* 
     return SyncChannel::Send(msg, reply);
 }
 
 bool
 RPCChannel::Call(Message* _msg, Message* reply)
 {
     nsAutoPtr<Message> msg(_msg);
     AssertWorkerThread();
-    mMonitor.AssertNotCurrentThreadOwns();
+    mMonitor->AssertNotCurrentThreadOwns();
     RPC_ASSERT(!ProcessingSyncMessage(),
                "violation of sync handler invariant");
     RPC_ASSERT(msg->is_rpc(), "can only Call() RPC messages here");
 
 #ifdef OS_WIN
     SyncStackFrame frame(this, true);
 #endif
 
     Message copy = *msg;
     CxxStackFrame f(*this, OUT_MESSAGE, &copy);
 
-    MonitorAutoLock lock(mMonitor);
+    MonitorAutoLock lock(*mMonitor);
 
     if (!Connected()) {
         ReportConnectionError("RPCChannel");
         return false;
     }
 
     msg->set_seqno(NextSeqno());
     msg->set_rpc_remote_stack_depth_guess(mRemoteStackDepthGuess);
     msg->set_rpc_local_stack_depth(1 + StackDepth());
     mStack.push(*msg);
 
-    SendThroughTransport(msg.forget());
+    mLink->SendMessage(msg.forget());
 
     while (1) {
         // if a handler invoked by *Dispatch*() spun a nested event
         // loop, and the connection was broken during that loop, we
         // might have already processed the OnError event. if so,
         // trying another loop iteration will be futile because
         // channel state will have been cleared
         if (!Connected()) {
@@ -234,28 +234,28 @@ RPCChannel::Call(Message* _msg, Message*
             // possible that we got here and nothing happened.  or, we
             // might have a deferred in-call that needs to be
             // processed.  either way, we won't break the inner while
             // loop again until something new happens.
             continue;
         }
 
         if (!recvd.is_sync() && !recvd.is_rpc()) {
-            MonitorAutoUnlock unlock(mMonitor);
+            MonitorAutoUnlock unlock(*mMonitor);
 
             CxxStackFrame f(*this, IN_MESSAGE, &recvd);
             AsyncChannel::OnDispatchMessage(recvd);
 
             continue;
         }
 
         if (recvd.is_sync()) {
             RPC_ASSERT(mPending.empty(),
                        "other side should have been blocked");
-            MonitorAutoUnlock unlock(mMonitor);
+            MonitorAutoUnlock unlock(*mMonitor);
 
             CxxStackFrame f(*this, IN_MESSAGE, &recvd);
             SyncChannel::OnDispatchMessage(recvd);
 
             continue;
         }
 
         RPC_ASSERT(recvd.is_rpc(), "wtf???");
@@ -300,32 +300,32 @@ RPCChannel::Call(Message* _msg, Message*
             return !isError;
         }
 
         // in-call.  process in a new stack frame.
 
         // "snapshot" the current stack depth while we own the Monitor
         size_t stackDepth = StackDepth();
         {
-            MonitorAutoUnlock unlock(mMonitor);
+            MonitorAutoUnlock unlock(*mMonitor);
             // someone called in to us from the other side.  handle the call
             CxxStackFrame f(*this, IN_MESSAGE, &recvd);
             Incall(recvd, stackDepth);
             // FIXME/cjones: error handling
         }
     }
 
     return true;
 }
 
 void
 RPCChannel::MaybeUndeferIncall()
 {
     AssertWorkerThread();
-    mMonitor.AssertCurrentThreadOwns();
+    mMonitor->AssertCurrentThreadOwns();
 
     if (mDeferred.empty())
         return;
 
     size_t stackDepth = StackDepth();
 
     // the other side can only *under*-estimate our actual stack depth
     RPC_ASSERT(mDeferred.top().rpc_remote_stack_depth_guess() <= stackDepth,
@@ -344,17 +344,17 @@ RPCChannel::MaybeUndeferIncall()
 
     mPending.push(call);
 }
 
 void
 RPCChannel::EnqueuePendingMessages()
 {
     AssertWorkerThread();
-    mMonitor.AssertCurrentThreadOwns();
+    mMonitor->AssertCurrentThreadOwns();
 
     MaybeUndeferIncall();
 
     for (size_t i = 0; i < mDeferred.size(); ++i)
         mWorkerLoop->PostTask(
             FROM_HERE,
             new DequeueTask(mDequeueOneTask));
 
@@ -366,20 +366,20 @@ RPCChannel::EnqueuePendingMessages()
             FROM_HERE,
             new DequeueTask(mDequeueOneTask));
 }
 
 void
 RPCChannel::FlushPendingRPCQueue()
 {
     AssertWorkerThread();
-    mMonitor.AssertNotCurrentThreadOwns();
+    mMonitor->AssertNotCurrentThreadOwns();
 
     {
-        MonitorAutoLock lock(mMonitor);
+        MonitorAutoLock lock(*mMonitor);
 
         if (mDeferred.empty()) {
             if (mPending.empty())
                 return;
 
             const Message& last = mPending.back();
             if (!last.is_rpc() || last.is_reply())
                 return;
@@ -391,21 +391,21 @@ RPCChannel::FlushPendingRPCQueue()
 
 bool
 RPCChannel::OnMaybeDequeueOne()
 {
     // XXX performance tuning knob: could process all or k pending
     // messages here
 
     AssertWorkerThread();
-    mMonitor.AssertNotCurrentThreadOwns();
+    mMonitor->AssertNotCurrentThreadOwns();
 
     Message recvd;
     {
-        MonitorAutoLock lock(mMonitor);
+        MonitorAutoLock lock(*mMonitor);
 
         if (!Connected()) {
             ReportConnectionError("RPCChannel");
             return false;
         }
 
         if (!mDeferred.empty())
             MaybeUndeferIncall();
@@ -442,17 +442,17 @@ RPCChannel::RemoteViewOfStackDepth(size_
     AssertWorkerThread();
     return stackDepth - mOutOfTurnReplies.size();
 }
 
 void
 RPCChannel::Incall(const Message& call, size_t stackDepth)
 {
     AssertWorkerThread();
-    mMonitor.AssertNotCurrentThreadOwns();
+    mMonitor->AssertNotCurrentThreadOwns();
     RPC_ASSERT(call.is_rpc() && !call.is_reply(), "wrong message type");
 
     // Race detection: see the long comment near
     // mRemoteStackDepthGuess in RPCChannel.h.  "Remote" stack depth
     // means our side, and "local" means other side.
     if (call.rpc_remote_stack_depth_guess() != RemoteViewOfStackDepth(stackDepth)) {
         // RPC in-calls have raced.
         // the "winner", if there is one, gets to defer processing of
@@ -502,17 +502,17 @@ RPCChannel::Incall(const Message& call, 
 
     DispatchIncall(call);
 }
 
 void
 RPCChannel::DispatchIncall(const Message& call)
 {
     AssertWorkerThread();
-    mMonitor.AssertNotCurrentThreadOwns();
+    mMonitor->AssertNotCurrentThreadOwns();
     RPC_ASSERT(call.is_rpc() && !call.is_reply(),
                "wrong message type");
 
     Message* reply = nsnull;
 
     ++mRemoteStackDepthGuess;
     Result rv = Listener()->OnCallReceived(call, reply);
     --mRemoteStackDepthGuess;
@@ -523,40 +523,44 @@ RPCChannel::DispatchIncall(const Message
         reply->set_rpc();
         reply->set_reply();
         reply->set_reply_error();
     }
 
     reply->set_seqno(call.seqno());
 
     {
-        MonitorAutoLock lock(mMonitor);
+        MonitorAutoLock lock(*mMonitor);
         if (ChannelConnected == mChannelState)
-            SendThroughTransport(reply);
+            mLink->SendMessage(reply);
     }
 }
 
 bool
 RPCChannel::BlockChild()
 {
     AssertWorkerThread();
 
     if (mChild)
         NS_RUNTIMEABORT("child tried to block parent");
+
+    MonitorAutoLock lock(*mMonitor);
     SendSpecialMessage(new BlockChildMessage());
     return true;
 }
 
 bool
 RPCChannel::UnblockChild()
 {
     AssertWorkerThread();
 
     if (mChild)
         NS_RUNTIMEABORT("child tried to unblock parent");
+
+    MonitorAutoLock lock(*mMonitor);
     SendSpecialMessage(new UnblockChildMessage());
     return true;
 }
 
 bool
 RPCChannel::OnSpecialMessage(uint16 id, const Message& msg)
 {
     AssertWorkerThread();
@@ -578,17 +582,17 @@ RPCChannel::OnSpecialMessage(uint16 id, 
 void
 RPCChannel::BlockOnParent()
 {
     AssertWorkerThread();
 
     if (!mChild)
         NS_RUNTIMEABORT("child tried to block parent");
 
-    MonitorAutoLock lock(mMonitor);
+    MonitorAutoLock lock(*mMonitor);
 
     if (mBlockedOnParent || AwaitingSyncReply() || 0 < StackDepth())
         NS_RUNTIMEABORT("attempt to block child when it's already blocked");
 
     mBlockedOnParent = true;
     do {
         // XXX this dispatch loop shares some similarities with the
         // one in Call(), but the logic is simpler and different
@@ -602,17 +606,17 @@ RPCChannel::BlockOnParent()
             ReportConnectionError("RPCChannel");
             break;
         }
 
         if (!mPending.empty()) {
             Message recvd = mPending.front();
             mPending.pop();
 
-            MonitorAutoUnlock unlock(mMonitor);
+            MonitorAutoUnlock unlock(*mMonitor);
 
             CxxStackFrame f(*this, IN_MESSAGE, &recvd);
             if (recvd.is_rpc()) {
                 // stack depth must be 0 here
                 Incall(recvd, 0);
             }
             else if (recvd.is_sync()) {
                 SyncChannel::OnDispatchMessage(recvd);
@@ -628,26 +632,26 @@ RPCChannel::BlockOnParent()
 
 void
 RPCChannel::UnblockFromParent()
 {
     AssertWorkerThread();
 
     if (!mChild)
         NS_RUNTIMEABORT("child tried to block parent");
-    MonitorAutoLock lock(mMonitor);
+    MonitorAutoLock lock(*mMonitor);
     mBlockedOnParent = false;
 }
 
 void
 RPCChannel::ExitedCxxStack()
 {
     Listener()->OnExitedCxxStack();
     if (mSawRPCOutMsg) {
-        MonitorAutoLock lock(mMonitor);
+        MonitorAutoLock lock(*mMonitor);
         // see long comment in OnMaybeDequeueOne()
         EnqueuePendingMessages();
         mSawRPCOutMsg = false;
     }
 }
 
 void
 RPCChannel::DebugAbort(const char* file, int line, const char* cond,
@@ -702,25 +706,25 @@ RPCChannel::DumpRPCStack(FILE* outfile, 
         mCxxStackFrames[i].Describe(&id, &dir, &sems, &name);
 
         fprintf(outfile, "%s[(%u) %s %s %s(actor=%d) ]\n", pfx,
                 i, dir, sems, name, id);
     }
 }
 
 //
-// The methods below run in the context of the IO thread, and can proxy
+// The methods below run in the context of the link thread, and can proxy
 // back to the methods above
 //
 
 void
-RPCChannel::OnMessageReceived(const Message& msg)
+RPCChannel::OnMessageReceivedFromLink(const Message& msg)
 {
-    AssertIOThread();
-    MonitorAutoLock lock(mMonitor);
+    AssertLinkThread();
+    mMonitor->AssertCurrentThreadOwns();
 
     if (MaybeInterceptSpecialIOMessage(msg))
         return;
 
     // regardless of the RPC stack, if we're awaiting a sync reply, we
     // know that it needs to be immediately handled to unblock us.
     if (AwaitingSyncReply() && msg.is_sync()) {
         // wake up worker thread waiting at SyncChannel::Send
@@ -728,35 +732,31 @@ RPCChannel::OnMessageReceived(const Mess
         NotifyWorkerThread();
         return;
     }
 
     mPending.push(msg);
 
     if (0 == StackDepth() && !mBlockedOnParent) {
         // the worker thread might be idle, make sure it wakes up
-        mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
+        mWorkerLoop->PostTask(FROM_HERE,
+                                     new DequeueTask(mDequeueOneTask));
     }
     else if (!AwaitingSyncReply())
         NotifyWorkerThread();
 }
 
 
 void
-RPCChannel::OnChannelError()
+RPCChannel::OnChannelErrorFromLink()
 {
-    AssertIOThread();
-
-    MonitorAutoLock lock(mMonitor);
+    AssertLinkThread();
+    mMonitor->AssertCurrentThreadOwns();
 
-    if (ChannelClosing != mChannelState)
-        mChannelState = ChannelError;
-
-    // skip SyncChannel::OnError(); we subsume its duties
-    if (AwaitingSyncReply() || 0 < StackDepth())
+    if (0 < StackDepth())
         NotifyWorkerThread();
 
-    PostErrorNotifyTask();
+    SyncChannel::OnChannelErrorFromLink();
 }
 
 } // namespace ipc
 } // namespace mozilla
 
--- a/ipc/glue/RPCChannel.h
+++ b/ipc/glue/RPCChannel.h
@@ -157,22 +157,16 @@ public:
     // Return true iff this has code on the C++ stack.
     bool IsOnCxxStack() const {
         return !mCxxStackFrames.empty();
     }
 
     NS_OVERRIDE
     virtual bool OnSpecialMessage(uint16 id, const Message& msg);
 
-    // Override the SyncChannel handler so we can dispatch RPC
-    // messages.  Called on the IO thread only.
-    NS_OVERRIDE
-    virtual void OnMessageReceived(const Message& msg);
-    NS_OVERRIDE
-    virtual void OnChannelError();
 
     /**
      * If there is a pending RPC message, process all pending messages.
      *
      * @note This method is used on Windows when we detect that an outbound
      * OLE RPC call is being made to unblock the parent.
      */
     void FlushPendingRPCQueue();
@@ -181,17 +175,21 @@ public:
     void ProcessNativeEventsInRPCCall();
     static void NotifyGeckoEventDispatch();
 
 protected:
     bool WaitForNotify();
     void SpinInternalEventLoop();
 #endif
 
-  private:
+protected:
+    NS_OVERRIDE virtual void OnMessageReceivedFromLink(const Message& msg);
+    NS_OVERRIDE virtual void OnChannelErrorFromLink();
+
+private:
     // Called on worker thread only
 
     RPCListener* Listener() const {
         return static_cast<RPCListener*>(mListener);
     }
 
     NS_OVERRIDE
     virtual bool ShouldDeferNotifyMaybeError() const {
@@ -320,17 +318,17 @@ protected:
         // disable harmful methods
         CxxStackFrame();
         CxxStackFrame(const CxxStackFrame&);
         CxxStackFrame& operator=(const CxxStackFrame&);
     };
 
     // Called from both threads
     size_t StackDepth() const {
-        mMonitor.AssertCurrentThreadOwns();
+        mMonitor->AssertCurrentThreadOwns();
         return mStack.size();
     }
 
     void DebugAbort(const char* file, int line, const char* cond,
                     const char* why,
                     const char* type="rpc", bool reply=false) const;
 
     // This method is only safe to call on the worker thread, or in a
--- a/ipc/glue/SyncChannel.cpp
+++ b/ipc/glue/SyncChannel.cpp
@@ -83,49 +83,49 @@ SyncChannel::~SyncChannel()
 
 // static
 bool SyncChannel::sIsPumpingMessages = false;
 
 bool
 SyncChannel::EventOccurred()
 {
     AssertWorkerThread();
-    mMonitor.AssertCurrentThreadOwns();
+    mMonitor->AssertCurrentThreadOwns();
     NS_ABORT_IF_FALSE(AwaitingSyncReply(), "not in wait loop");
 
     return (!Connected() || 0 != mRecvd.type() || mRecvd.is_reply_error());
 }
 
 bool
 SyncChannel::Send(Message* _msg, Message* reply)
 {
     nsAutoPtr<Message> msg(_msg);
 
     AssertWorkerThread();
-    mMonitor.AssertNotCurrentThreadOwns();
+    mMonitor->AssertNotCurrentThreadOwns();
     NS_ABORT_IF_FALSE(!ProcessingSyncMessage(),
                       "violation of sync handler invariant");
     NS_ABORT_IF_FALSE(msg->is_sync(), "can only Send() sync messages here");
 
 #ifdef OS_WIN
     SyncStackFrame frame(this, false);
 #endif
 
     msg->set_seqno(NextSeqno());
 
-    MonitorAutoLock lock(mMonitor);
+    MonitorAutoLock lock(*mMonitor);
 
     if (!Connected()) {
         ReportConnectionError("SyncChannel");
         return false;
     }
 
     mPendingReply = msg->type() + 1;
     int32 msgSeqno = msg->seqno();
-    SendThroughTransport(msg.forget());
+    mLink->SendMessage(msg.forget());
 
     while (1) {
         bool maybeTimedOut = !SyncChannel::WaitForNotify();
 
         if (EventOccurred())
             break;
 
         if (maybeTimedOut && !ShouldContinueFromTimeout())
@@ -181,37 +181,38 @@ SyncChannel::OnDispatchMessage(const Mes
         reply->set_sync();
         reply->set_reply();
         reply->set_reply_error();
     }
 
     reply->set_seqno(msg.seqno());
 
     {
-        MonitorAutoLock lock(mMonitor);
+        MonitorAutoLock lock(*mMonitor);
         if (ChannelConnected == mChannelState)
-            SendThroughTransport(reply);
+            mLink->SendMessage(reply);
     }
 }
 
 //
-// The methods below run in the context of the IO thread, and can proxy
+// The methods below run in the context of the link thread, and can proxy
 // back to the methods above
 //
 
 void
-SyncChannel::OnMessageReceived(const Message& msg)
+SyncChannel::OnMessageReceivedFromLink(const Message& msg)
 {
-    AssertIOThread();
+    AssertLinkThread();
+    mMonitor->AssertCurrentThreadOwns();
+
     if (!msg.is_sync()) {
-        return AsyncChannel::OnMessageReceived(msg);
+        AsyncChannel::OnMessageReceivedFromLink(msg);
+        return;
     }
 
-    MonitorAutoLock lock(mMonitor);
-
     if (MaybeInterceptSpecialIOMessage(msg))
         return;
 
     if (!AwaitingSyncReply()) {
         // wake up the worker, there's work to do
         mWorkerLoop->PostTask(
             FROM_HERE,
             NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, msg));
@@ -219,29 +220,25 @@ SyncChannel::OnMessageReceived(const Mes
     else {
         // let the worker know a new sync message has arrived
         mRecvd = msg;
         NotifyWorkerThread();
     }
 }
 
 void
-SyncChannel::OnChannelError()
+SyncChannel::OnChannelErrorFromLink()
 {
-    AssertIOThread();
-
-    MonitorAutoLock lock(mMonitor);
-
-    if (ChannelClosing != mChannelState)
-        mChannelState = ChannelError;
+    AssertLinkThread();
+    mMonitor->AssertCurrentThreadOwns();
 
     if (AwaitingSyncReply())
         NotifyWorkerThread();
 
-    PostErrorNotifyTask();
+    AsyncChannel::OnChannelErrorFromLink();
 }
 
 //
 // Synchronization between worker and IO threads
 //
 
 namespace {
 
@@ -253,21 +250,21 @@ IsTimeoutExpired(PRIntervalTime aStart, 
 }
 
 } // namespace <anon>
 
 bool
 SyncChannel::ShouldContinueFromTimeout()
 {
     AssertWorkerThread();
-    mMonitor.AssertCurrentThreadOwns();
+    mMonitor->AssertCurrentThreadOwns();
 
     bool cont;
     {
-        MonitorAutoUnlock unlock(mMonitor);
+        MonitorAutoUnlock unlock(*mMonitor);
         cont = static_cast<SyncListener*>(mListener)->OnReplyTimeout();
     }
 
     if (!cont) {
         // NB: there's a sublety here.  If parents were allowed to
         // send sync messages to children, then it would be possible
         // for this synchronous close-on-timeout to race with async
         // |OnMessageReceived| tasks arriving from the child, posted
@@ -295,26 +292,26 @@ bool
 SyncChannel::WaitForNotify()
 {
     PRIntervalTime timeout = (kNoTimeout == mTimeoutMs) ?
                              PR_INTERVAL_NO_TIMEOUT :
                              PR_MillisecondsToInterval(mTimeoutMs);
     // XXX could optimize away this syscall for "no timeout" case if desired
     PRIntervalTime waitStart = PR_IntervalNow();
 
-    mMonitor.Wait(timeout);
+    mMonitor->Wait(timeout);
 
     // if the timeout didn't expire, we know we received an event.
     // The converse is not true.
     return !IsTimeoutExpired(waitStart, timeout);
 }
 
 void
 SyncChannel::NotifyWorkerThread()
 {
-    mMonitor.Notify();
+    mMonitor->Notify();
 }
 
 #endif  // ifndef OS_WIN
 
 
 } // namespace ipc
 } // namespace mozilla
--- a/ipc/glue/SyncChannel.h
+++ b/ipc/glue/SyncChannel.h
@@ -81,28 +81,25 @@ public:
     // Synchronously send |msg| (i.e., wait for |reply|)
     virtual bool Send(Message* msg, Message* reply);
 
     void SetReplyTimeoutMs(int32 aTimeoutMs) {
         AssertWorkerThread();
         mTimeoutMs = (aTimeoutMs <= 0) ? kNoTimeout : aTimeoutMs;
     }
 
-    // Override the AsyncChannel handler so we can dispatch sync messages
-    NS_OVERRIDE virtual void OnMessageReceived(const Message& msg);
-    NS_OVERRIDE virtual void OnChannelError();
-
     static bool IsPumpingMessages() {
         return sIsPumpingMessages;
     }
     static void SetIsPumpingMessages(bool aIsPumping) {
         sIsPumpingMessages = aIsPumping;
     }
 
 #ifdef OS_WIN
+public:
     struct NS_STACK_CLASS SyncStackFrame
     {
         SyncStackFrame(SyncChannel* channel, bool rpc);
         ~SyncStackFrame();
 
         bool mRPC;
         bool mSpinNestedEvents;
         bool mListenerNotified;
@@ -130,30 +127,28 @@ protected:
     /* the deepest sync stack frame for this channel */
     SyncStackFrame* mTopFrame;
 
     /* the deepest sync stack frame on any channel */
     static SyncStackFrame* sStaticTopFrame;
 #endif // OS_WIN
 
 protected:
+    // Executed on the link thread
+    // Override the AsyncChannel handler so we can dispatch sync messages
+    NS_OVERRIDE virtual void OnMessageReceivedFromLink(const Message& msg);
+    NS_OVERRIDE virtual void OnChannelErrorFromLink();
+
     // Executed on the worker thread
     bool ProcessingSyncMessage() const {
         return mProcessingSyncMessage;
     }
 
     void OnDispatchMessage(const Message& aMsg);
 
-    NS_OVERRIDE
-    bool OnSpecialMessage(uint16 id, const Message& msg)
-    {
-        // SyncChannel doesn't care about any special messages yet
-        return AsyncChannel::OnSpecialMessage(id, msg);
-    }
-
     //
     // Return true if the wait ended because a notification was
     // received.  That is, true => event received.
     //
     // Return false if the time elapsed from when we started the
     // process of waiting until afterwards exceeded the currently
     // allotted timeout.  That *DOES NOT* mean false => "no event" (==
     // timeout); there are many circumstances that could cause the
@@ -167,17 +162,17 @@ protected:
 
     bool ShouldContinueFromTimeout();
 
     // Executed on the IO thread.
     void NotifyWorkerThread();
 
     // On both
     bool AwaitingSyncReply() const {
-        mMonitor.AssertCurrentThreadOwns();
+        mMonitor->AssertCurrentThreadOwns();
         return mPendingReply != 0;
     }
 
     int32 NextSeqno() {
         AssertWorkerThread();
         return mChild ? --mNextSeqno : ++mNextSeqno;
     }
 
--- a/ipc/glue/WindowsMessageLoop.cpp
+++ b/ipc/glue/WindowsMessageLoop.cpp
@@ -689,17 +689,17 @@ RPCChannel::SpinInternalEventLoop()
   // Note, when we return, we always reset the notify worker event. So there's
   // no need to reset it on return here.
 
   do {
     MSG msg = { 0 };
 
     // Don't get wrapped up in here if the child connection dies.
     {
-      MonitorAutoLock lock(mMonitor);
+      MonitorAutoLock lock(*mMonitor);
       if (!Connected()) {
         return;
       }
     }
 
     // Retrieve window or thread messages
     if (PeekMessageW(&msg, NULL, 0, 0, PM_REMOVE)) {
       // The child UI should have been destroyed before the app is closed, in
@@ -726,25 +726,25 @@ RPCChannel::SpinInternalEventLoop()
       return;
     }
   } while (true);
 }
 
 bool
 SyncChannel::WaitForNotify()
 {
-  mMonitor.AssertCurrentThreadOwns();
+  mMonitor->AssertCurrentThreadOwns();
 
   // Initialize global objects used in deferred messaging.
   Init();
 
   NS_ASSERTION(mTopFrame && !mTopFrame->mRPC,
                "Top frame is not a sync frame!");
 
-  MonitorAutoUnlock unlock(mMonitor);
+  MonitorAutoUnlock unlock(*mMonitor);
 
   bool retval = true;
 
   UINT_PTR timerId = NULL;
   TimeoutData timeoutData = { 0 };
 
   if (mTimeoutMs != kNoTimeout) {
     InitTimeoutData(&timeoutData, mTimeoutMs);
@@ -764,17 +764,17 @@ SyncChannel::WaitForNotify()
                                       NULL, gUIThreadId);
   NS_ASSERTION(windowHook, "Failed to set hook!");
 
   {
     while (1) {
       MSG msg = { 0 };
       // Don't get wrapped up in here if the child connection dies.
       {
-        MonitorAutoLock lock(mMonitor);
+        MonitorAutoLock lock(*mMonitor);
         if (!Connected()) {
           break;
         }
       }
 
       // Wait until we have a message in the queue. MSDN docs are a bit unclear
       // but it seems that windows from two different threads (and it should be
       // noted that a thread in another process counts as a "different thread")
@@ -848,30 +848,30 @@ SyncChannel::WaitForNotify()
   SyncChannel::SetIsPumpingMessages(false);
 
   return retval;
 }
 
 bool
 RPCChannel::WaitForNotify()
 {
-  mMonitor.AssertCurrentThreadOwns();
+  mMonitor->AssertCurrentThreadOwns();
 
   if (!StackDepth() && !mBlockedOnParent) {
     // There is currently no way to recover from this condition.
     NS_RUNTIMEABORT("StackDepth() is 0 in call to RPCChannel::WaitForNotify!");
   }
 
   // Initialize global objects used in deferred messaging.
   Init();
 
   NS_ASSERTION(mTopFrame && mTopFrame->mRPC,
                "Top frame is not a sync frame!");
 
-  MonitorAutoUnlock unlock(mMonitor);
+  MonitorAutoUnlock unlock(*mMonitor);
 
   bool retval = true;
 
   UINT_PTR timerId = NULL;
   TimeoutData timeoutData = { 0 };
 
   // windowHook is used as a flag variable for the loop below: if it is set
   // and we start to spin a nested event loop, we need to clear the hook and
@@ -924,17 +924,17 @@ RPCChannel::WaitForNotify()
         NS_ASSERTION(timerId, "SetTimer failed!");
       }
     }
 
     MSG msg = { 0 };
 
     // Don't get wrapped up in here if the child connection dies.
     {
-      MonitorAutoLock lock(mMonitor);
+      MonitorAutoLock lock(*mMonitor);
       if (!Connected()) {
         break;
       }
     }
 
     DWORD result = MsgWaitForMultipleObjects(1, &mEvent, FALSE, INFINITE,
                                              QS_ALLINPUT);
     if (result == WAIT_OBJECT_0) {
@@ -988,17 +988,17 @@ RPCChannel::WaitForNotify()
   SyncChannel::SetIsPumpingMessages(false);
 
   return retval;
 }
 
 void
 SyncChannel::NotifyWorkerThread()
 {
-  mMonitor.AssertCurrentThreadOwns();
+  mMonitor->AssertCurrentThreadOwns();
   NS_ASSERTION(mEvent, "No signal event to set, this is really bad!");
   if (!SetEvent(mEvent)) {
     NS_WARNING("Failed to set NotifyWorkerThread event!");
   }
 }
 
 void
 DeferredSendMessage::Run()