first cut at AsyncChannel and SyncChannel. only RPCChannel is currently under warranty.
authorChris Jones <jones.chris.g@gmail.com>
Mon, 13 Jul 2009 16:55:04 -0500
changeset 35778 2e27ae79e54420a28a406df7abbf3c9e6073ebc4
parent 35777 1518970f774b70f2632b60b07f9711f46019d4d6
child 35779 1769c830e612ef844c2b7df27028f570e34af760
push id10694
push userbsmedberg@mozilla.com
push dateMon, 14 Dec 2009 15:23:10 +0000
treeherdermozilla-central@683dfdc4adf0 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
milestone1.9.2a1pre
first cut at AsyncChannel and SyncChannel. only RPCChannel is currently under warranty.
ipc/chromium/src/chrome/common/ipc_message.h
ipc/glue/AsyncChannel.cpp
ipc/glue/AsyncChannel.h
ipc/glue/Makefile.in
ipc/glue/RPCChannel.cpp
ipc/glue/RPCChannel.h
ipc/glue/SyncChannel.cpp
ipc/glue/SyncChannel.h
ipc/ipdl/ipdl/lower.py
--- a/ipc/chromium/src/chrome/common/ipc_message.h
+++ b/ipc/chromium/src/chrome/common/ipc_message.h
@@ -13,16 +13,20 @@
 #ifndef NDEBUG
 #define IPC_MESSAGE_LOG_ENABLED
 #endif
 
 #if defined(OS_POSIX)
 #include "base/ref_counted.h"
 #endif
 
+#if defined(CHROMIUM_MOZILLA_BUILD)
+#define IPC_MESSAGE_ENABLE_RPC
+#endif
+
 namespace base {
 class FileDescriptor;
 }
 
 class FileDescriptorSet;
 
 namespace IPC {
 
@@ -72,16 +76,23 @@ class Message : public Pickle {
     return static_cast<PriorityValue>(header()->flags & PRIORITY_MASK);
   }
 
   // True if this is a synchronous message.
   bool is_sync() const {
     return (header()->flags & SYNC_BIT) != 0;
   }
 
+#if defined(IPC_MESSAGE_ENABLE_RPC)
+  // True if this is a synchronous message.
+  bool is_rpc() const {
+    return (header()->flags & RPC_BIT) != 0;
+  }
+#endif
+
   // Set this on a reply to a synchronous message.
   void set_reply() {
     header()->flags |= REPLY_BIT;
   }
 
   bool is_reply() const {
     return (header()->flags & REPLY_BIT) != 0;
   }
@@ -191,34 +202,49 @@ class Message : public Pickle {
   // reply message, so that when it's sent and we have the output parameters
   // we can log it.  As such, we set a flag on the sent message to not log it.
   void set_sync_log_data(LogData* data) const { log_data_ = data; }
   LogData* sync_log_data() const { return log_data_; }
   void set_dont_log() const { dont_log_ = true; }
   bool dont_log() const { return dont_log_; }
 #endif
 
+#if !defined(CHROMIUM_MOZILLA_BUILD)
  protected:
+#endif
   friend class Channel;
   friend class MessageReplyDeserializer;
   friend class SyncMessage;
 
   void set_sync() {
     header()->flags |= SYNC_BIT;
   }
 
+#if defined(IPC_MESSAGE_ENABLE_RPC)
+  void set_rpc() {
+    header()->flags |= RPC_BIT;
+  }
+#endif
+
+#if defined(CHROMIUM_MOZILLA_BUILD)
+ protected:
+#endif
+
   // flags
   enum {
     PRIORITY_MASK   = 0x0003,
     SYNC_BIT        = 0x0004,
     REPLY_BIT       = 0x0008,
     REPLY_ERROR_BIT = 0x0010,
     UNBLOCK_BIT     = 0x0020,
     PUMPING_MSGS_BIT= 0x0040,
     HAS_SENT_TIME_BIT = 0x0080,
+#if defined(IPC_MESSAGE_ENABLE_RPC)
+    RPC_BIT        = 0x0100,
+#endif
   };
 
 #pragma pack(push, 2)
   struct Header : Pickle::Header {
     int32 routing;  // ID of the view that this message is destined for
     uint16 type;    // specifies the user-defined message type
     uint16 flags;   // specifies control flags for the message
 #if defined(OS_POSIX)
new file mode 100644
--- /dev/null
+++ b/ipc/glue/AsyncChannel.cpp
@@ -0,0 +1,175 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
+ * vim: sw=4 ts=4 et :
+ */
+/* ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0/LGPL 2.1
+ *
+ * The contents of this file are subject to the Mozilla Public License Version
+ * 1.1 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS" basis,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+ * for the specific language governing rights and limitations under the
+ * License.
+ *
+ * The Original Code is Mozilla Plugin App.
+ *
+ * The Initial Developer of the Original Code is
+ *   Chris Jones <jones.chris.g@gmail.com>
+ * Portions created by the Initial Developer are Copyright (C) 2009
+ * the Initial Developer. All Rights Reserved.
+ *
+ * Contributor(s):
+ *
+ * Alternatively, the contents of this file may be used under the terms of
+ * either the GNU General Public License Version 2 or later (the "GPL"), or
+ * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
+ * in which case the provisions of the GPL or the LGPL are applicable instead
+ * of those above. If you wish to allow use of your version of this file only
+ * under the terms of either the GPL or the LGPL, and not to allow others to
+ * use your version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the notice
+ * and other provisions required by the GPL or the LGPL. If you do not delete
+ * the provisions above, a recipient may use your version of this file under
+ * the terms of any one of the MPL, the GPL or the LGPL.
+ *
+ * ***** END LICENSE BLOCK ***** */
+
+#include "mozilla/ipc/AsyncChannel.h"
+#include "mozilla/ipc/GeckoThread.h"
+
+#include "nsDebug.h"
+
+template<>
+struct RunnableMethodTraits<mozilla::ipc::AsyncChannel>
+{
+    static void RetainCallee(mozilla::ipc::AsyncChannel* obj) { }
+    static void ReleaseCallee(mozilla::ipc::AsyncChannel* obj) { }
+};
+
+namespace mozilla {
+namespace ipc {
+
+
+bool
+AsyncChannel::Open(Transport* aTransport, MessageLoop* aIOLoop)
+{
+    NS_PRECONDITION(!mTransport, "Open() called > once");
+    NS_PRECONDITION(aTransport, "need transport layer");
+
+    // FIXME need to check for valid channel
+
+    mTransport = aTransport;
+    mTransport->set_listener(this);
+
+    // FIXME do away with this
+    bool needOpen = true;
+    if(!aIOLoop) {
+        needOpen = false;
+        aIOLoop = BrowserProcessSubThread
+                  ::GetMessageLoop(BrowserProcessSubThread::IO);
+    }
+
+    mIOLoop = aIOLoop;
+    mWorkerLoop = MessageLoop::current();
+
+    NS_ASSERTION(mIOLoop, "need an IO loop");
+    NS_ASSERTION(mWorkerLoop, "need a worker loop");
+
+    if (needOpen) {
+        mIOLoop->PostTask(FROM_HERE, 
+                          NewRunnableMethod(this,
+                                            &AsyncChannel::OnChannelOpened));
+    }
+
+    return true;
+}
+
+void
+AsyncChannel::Close()
+{
+    // FIXME impl
+
+    mChannelState = ChannelClosed;
+}
+
+bool
+AsyncChannel::Send(Message* msg)
+{
+    NS_PRECONDITION(MSG_ROUTING_NONE != msg->routing_id(), "need a route");
+    mIOLoop->PostTask(FROM_HERE,
+                      NewRunnableMethod(this, &AsyncChannel::OnSend, msg));
+    return true;
+}
+
+void
+AsyncChannel::OnDispatchMessage(const Message& msg)
+{
+    NS_ASSERTION(!msg.is_reply(), "can't process replies here");
+    NS_ASSERTION(!(msg.is_sync() || msg.is_rpc()), "async dispatch only");
+
+    switch (mListener->OnMessageReceived(msg)) {
+    case Listener::MsgProcessed:
+        return;
+
+    case Listener::MsgNotKnown:
+    case Listener::MsgNotAllowed:
+    case Listener::MsgPayloadError:
+    case Listener::MsgRouteError:
+    case Listener::MsgValueError:
+        // FIXME/cjones: error handling; OnError()?
+        return;
+
+    default:
+        NOTREACHED();
+        return;
+    }
+}
+
+//
+// The methods below run in the context of the IO thread, and can proxy
+// back to the methods above
+//
+
+void
+AsyncChannel::OnMessageReceived(const Message& msg)
+{
+    // wake up the worker, there's work to do
+    mWorkerLoop->PostTask(FROM_HERE,
+                          NewRunnableMethod(this,
+                                            &AsyncChannel::OnDispatchMessage,
+                                            msg));
+}
+
+void
+AsyncChannel::OnChannelConnected(int32 peer_pid)
+{
+    mChannelState = ChannelIdle;
+}
+
+void
+AsyncChannel::OnChannelError()
+{
+    // FIXME/cjones impl
+    mChannelState = ChannelError;
+}
+
+void
+AsyncChannel::OnChannelOpened()
+{
+    mChannelState = ChannelOpening;
+    /*assert*/mTransport->Connect();
+}
+
+void
+AsyncChannel::OnSend(Message* aMsg)
+{
+    mTransport->Send(aMsg);
+    // mTransport deletes aMsg
+}
+
+
+} // namespace ipc
+} // namespace mozilla
new file mode 100644
--- /dev/null
+++ b/ipc/glue/AsyncChannel.h
@@ -0,0 +1,132 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
+ * vim: sw=4 ts=4 et :
+ */
+/* ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0/LGPL 2.1
+ *
+ * The contents of this file are subject to the Mozilla Public License Version
+ * 1.1 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS" basis,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+ * for the specific language governing rights and limitations under the
+ * License.
+ *
+ * The Original Code is Mozilla Plugin App.
+ *
+ * The Initial Developer of the Original Code is
+ *   Chris Jones <jones.chris.g@gmail.com>
+ * Portions created by the Initial Developer are Copyright (C) 2009
+ * the Initial Developer. All Rights Reserved.
+ *
+ * Contributor(s):
+ *
+ * Alternatively, the contents of this file may be used under the terms of
+ * either the GNU General Public License Version 2 or later (the "GPL"), or
+ * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
+ * in which case the provisions of the GPL or the LGPL are applicable instead
+ * of those above. If you wish to allow use of your version of this file only
+ * under the terms of either the GPL or the LGPL, and not to allow others to
+ * use your version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the notice
+ * and other provisions required by the GPL or the LGPL. If you do not delete
+ * the provisions above, a recipient may use your version of this file under
+ * the terms of any one of the MPL, the GPL or the LGPL.
+ *
+ * ***** END LICENSE BLOCK ***** */
+
+#ifndef ipc_glue_AsyncChannel_h
+#define ipc_glue_AsyncChannel_h 1
+
+#include "base/basictypes.h"
+#include "base/message_loop.h"
+#include "chrome/common/ipc_channel.h"
+
+namespace mozilla {
+namespace ipc {
+//-----------------------------------------------------------------------------
+
+class AsyncChannel : public IPC::Channel::Listener
+{
+protected:
+    enum ChannelState {
+        ChannelClosed,
+        ChannelOpening,
+        ChannelIdle,            // => connected
+        ChannelWaiting,         // => connected
+        ChannelError
+    };
+
+public:
+    typedef IPC::Channel Transport;
+    typedef IPC::Message Message;
+
+    class /*NS_INTERFACE_CLASS*/ Listener
+    {
+    public:
+        enum Result {
+            MsgProcessed,
+            MsgNotKnown,
+            MsgNotAllowed,
+            MsgPayloadError,
+            MsgRouteError,
+            MsgValueError,
+        };
+
+        virtual ~Listener() { }
+        virtual Result OnMessageReceived(const Message& aMessage) = 0;
+    };
+
+    AsyncChannel(Listener* aListener) :
+        mTransport(0),
+        mListener(aListener),
+        mChannelState(ChannelClosed),
+        mIOLoop(),
+        mWorkerLoop()
+    {
+    }
+
+    virtual ~AsyncChannel()
+    {
+        if (mTransport)
+            Close();
+        mTransport = 0;
+    }
+
+    // Open  from the perspective of the RPC layer; the transport
+    // should already be connected, or ready to connect.
+    bool Open(Transport* aTransport, MessageLoop* aIOLoop=0);
+    
+    // Close from the perspective of the RPC layer; leaves the
+    // underlying transport channel open, however.
+    void Close();
+
+    // Asynchronously send a message to the other side of the channel
+    bool Send(Message* msg);
+
+    // Implement the IPC::Channel::Listener interface
+    virtual void OnMessageReceived(const Message& msg);
+    virtual void OnChannelConnected(int32 peer_pid);
+    virtual void OnChannelError();
+
+protected:
+    // Additional methods that execute on the worker thread
+    void OnDispatchMessage(const Message& aMsg);
+
+    // Additional methods that execute on the IO thread
+    void OnChannelOpened();
+    void OnSend(Message* aMsg);
+
+    Transport* mTransport;
+    Listener* mListener;
+    ChannelState mChannelState;
+    MessageLoop* mIOLoop;       // thread where IO happens
+    MessageLoop* mWorkerLoop;   // thread where work is done
+};
+
+
+} // namespace ipc
+} // namespace mozilla
+#endif  // ifndef ipc_glue_AsyncChannel_h
--- a/ipc/glue/Makefile.in
+++ b/ipc/glue/Makefile.in
@@ -49,33 +49,37 @@ EXPORT_LIBRARY = 1
 
 EXPORTS_NAMESPACES = IPC mozilla/ipc
 
 EXPORTS_IPC =					\
   IPCMessageUtils.h 				\
   $(NULL)
 
 EXPORTS_mozilla/ipc =				\
+  AsyncChannel.h				\
   GeckoChildProcessHost.h			\
   GeckoThread.h 				\
   MessageTypes.h 				\
   ProtocolUtils.h				\
   RPCChannel.h					\
+  SyncChannel.h					\
   ScopedXREEmbed.h				\
   $(NULL)
 
 ENABLE_CXX_EXCEPTIONS = 1
 
-CPPSRCS += \
-  GeckoChildProcessHost.cpp \
-  GeckoThread.cpp \
-  MessagePump.cpp \
-  RPCChannel.cpp \
-  ScopedXREEmbed.cpp \
-  StringUtil.cpp \
+CPPSRCS +=					\
+  AsyncChannel.cpp				\
+  GeckoChildProcessHost.cpp			\
+  GeckoThread.cpp				\
+  MessagePump.cpp				\
+  RPCChannel.cpp				\
+  ScopedXREEmbed.cpp				\
+  StringUtil.cpp				\
+  SyncChannel.cpp				\
   $(NULL)
 
 include $(topsrcdir)/ipc/app/defs.mk
 DEFINES += -DMOZ_CHILD_PROCESS_NAME="\"$(MOZ_CHILD_PROCESS_NAME)\""
 
 include $(topsrcdir)/config/config.mk
 include $(topsrcdir)/ipc/chromium/chromium-config.mk
 include $(topsrcdir)/config/rules.mk
--- a/ipc/glue/RPCChannel.cpp
+++ b/ipc/glue/RPCChannel.cpp
@@ -1,11 +1,12 @@
 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
  * vim: sw=4 ts=4 et :
- * ***** BEGIN LICENSE BLOCK *****
+ */
+/* ***** BEGIN LICENSE BLOCK *****
  * Version: MPL 1.1/GPL 2.0/LGPL 2.1
  *
  * The contents of this file are subject to the Mozilla Public License Version
  * 1.1 (the "License"); you may not use this file except in compliance with
  * the License. You may obtain a copy of the License at
  * http://www.mozilla.org/MPL/
  *
  * Software distributed under the License is distributed on an "AS IS" basis,
@@ -50,187 +51,130 @@ struct RunnableMethodTraits<mozilla::ipc
     static void ReleaseCallee(mozilla::ipc::RPCChannel* obj) { }
 };
 
 namespace mozilla {
 namespace ipc {
 
 
 bool
-RPCChannel::Open(Transport* aTransport, MessageLoop* aIOLoop)
-{
-    NS_PRECONDITION(!mTransport, "Open() called > once");
-    NS_PRECONDITION(aTransport, "need transport layer");
-
-    // FIXME need to check for valid channel
-
-    mTransport = aTransport;
-    mTransport->set_listener(this);
-
-    // FIXME do away with this
-    bool needOpen = true;
-    if(!aIOLoop) {
-        needOpen = false;
-        aIOLoop = BrowserProcessSubThread
-                  ::GetMessageLoop(BrowserProcessSubThread::IO);
-    }
-
-    mIOLoop = aIOLoop;
-    mWorkerLoop = MessageLoop::current();
-
-    NS_ASSERTION(mIOLoop, "need an IO loop");
-    NS_ASSERTION(mWorkerLoop, "need a worker loop");
-
-    if (needOpen) {
-        mIOLoop->PostTask(FROM_HERE, 
-                          NewRunnableMethod(this,
-                                            &RPCChannel::OnChannelOpened));
-    }
-
-    return true;
-}
-
-void
-RPCChannel::Close()
-{
-    // FIXME impl
-
-    mChannelState = ChannelClosed;
-}
-
-bool
 RPCChannel::Call(Message* msg, Message* reply)
 {
-    NS_PRECONDITION(MSG_ROUTING_NONE != msg->routing_id(), "need a route");
+    NS_PRECONDITION(msg->is_rpc(), "can only Call() RPC messages here");
 
     mMutex.Lock();
 
+    mChannelState = ChannelWaiting;
+
     mPending.push(*msg);
-    mIOLoop->PostTask(FROM_HERE, NewRunnableMethod(this,
-                                                   &RPCChannel::SendCall,
-                                                   msg));
+    AsyncChannel::Send(msg);
+
     while (1) {
-        // here we're waiting for something to happen.  it may either
-        // be a reply to an outstanding message, or a recursive call
-        // from the other side
+        // here we're waiting for something to happen.  it may either:
+        //  (1) a reply to an outstanding message
+        //  (2) a recursive call from the other side
+        // or
+        //  (3) any other message
         mCvar.Wait();
 
         Message recvd = mPending.top();
         mPending.pop();
 
-        if (recvd.is_reply()) {
+        if (!recvd.is_rpc()) {
+            SyncChannel::OnDispatchMessage(recvd);
+            // FIXME/cjones: error handling
+        }
+        // RPC reply message
+        else if (recvd.is_reply()) {
+            NS_ASSERTION(0 < mPending.size(), "invalid RPC stack");
+
+            const Message& pending = mPending.top();
+            if (recvd.type() != (pending.type()+1)) {
+                // FIXME/cjones: handle error
+                NS_ASSERTION(0, "somebody's misbehavin'");
+            }
+
             // we received a reply to our most recent message.  pop this
             // frame and return the reply
-            NS_ASSERTION(0 < mPending.size(), "invalid RPC stack");
             mPending.pop();
             *reply = recvd;
 
+            if (!WaitingForReply()) {
+                mChannelState = ChannelIdle;
+            }
+
             mMutex.Unlock();
             return true;
         }
+        // RPC in-call
         else {
             mMutex.Unlock();
 
             // someone called in to us from the other side.  handle the call
-            if (!ProcessIncomingCall(recvd))
-                return false;
+            OnDispatchMessage(recvd);
+            // FIXME/cjones: error handling
 
             mMutex.Lock();
         }
     }
 
     delete msg;
 
     return true;
 }
 
-bool
-RPCChannel::ProcessIncomingCall(Message call)
+void
+RPCChannel::OnDispatchMessage(const Message& call)
 {
-   Message* reply;
+    if (!call.is_rpc()) {
+        return SyncChannel::OnDispatchMessage(call);
+    }
 
-    switch (mListener->OnCallReceived(call, reply)) {
+    Message* reply;
+    switch (static_cast<Listener*>(mListener)->OnCallReceived(call, reply)) {
     case Listener::MsgProcessed:
         mIOLoop->PostTask(FROM_HERE,
                           NewRunnableMethod(this,
-                                            &RPCChannel::SendReply,
+                                            &RPCChannel::OnSendReply,
                                             reply));
-        return true;
+        return;
 
     case Listener::MsgNotKnown:
     case Listener::MsgNotAllowed:
     case Listener::MsgPayloadError:
     case Listener::MsgRouteError:
     case Listener::MsgValueError:
-        //OnError()?
-        return false;
+        // FIXME/cjones: error handling; OnError()?
+        return;
 
     default:
         NOTREACHED();
-        return false;
+        return;
     }
 }
 
-void
-RPCChannel::OnIncomingCall(Message msg)
-{
-    NS_ASSERTION(0 == mPending.size(),
-                 "woke up the worker thread when it had outstanding work!");
-    ProcessIncomingCall(msg);
-}
-
 //
 // The methods below run in the context of the IO thread, and can proxy
 // back to the methods above
 //
 
 void
 RPCChannel::OnMessageReceived(const Message& msg)
-{MutexAutoLock lock(mMutex);
+{
+    MutexAutoLock lock(mMutex);
+
     if (0 == mPending.size()) {
         // wake up the worker, there's work to do
         mWorkerLoop->PostTask(FROM_HERE,
                               NewRunnableMethod(this,
-                                                &RPCChannel::OnIncomingCall,
+                                                &RPCChannel::OnDispatchMessage,
                                                 msg));
     }
     else {
         // let the worker know something new has happened
         mPending.push(msg);
         mCvar.Notify();
     }
 }
 
-void
-RPCChannel::OnChannelConnected(int32 peer_pid)
-{
-    mChannelState = ChannelConnected;
-}
-
-void
-RPCChannel::OnChannelError()
-{
-    // FIXME/cjones impl
-    mChannelState = ChannelError;
-}
-
-void
-RPCChannel::OnChannelOpened()
-{
-    mChannelState = ChannelOpening;
-    /*assert*/mTransport->Connect();
-}
-
-void
-RPCChannel::SendCall(Message* aCall)
-{
-    mTransport->Send(aCall);
-}
-
-void
-RPCChannel::SendReply(Message* aReply)
-{
-    mTransport->Send(aReply);
-}
-
 
 } // namespace ipc
 } // namespace mozilla
--- a/ipc/glue/RPCChannel.h
+++ b/ipc/glue/RPCChannel.h
@@ -37,117 +37,60 @@
  * ***** END LICENSE BLOCK ***** */
 
 #ifndef ipc_glue_RPCChannel_h
 #define ipc_glue_RPCChannel_h 1
 
 // FIXME/cjones probably shouldn't depend on this
 #include <stack>
 
-#include "base/basictypes.h"
-#include "base/message_loop.h"
-#include "chrome/common/ipc_channel.h"
-
-#include "mozilla/CondVar.h"
-#include "mozilla/Mutex.h"
+#include "mozilla/ipc/SyncChannel.h"
 
 namespace mozilla {
 namespace ipc {
 //-----------------------------------------------------------------------------
 
-class RPCChannel : public IPC::Channel::Listener
+class RPCChannel : public SyncChannel
 {
-private:
-    typedef mozilla::CondVar CondVar;
-    typedef mozilla::Mutex Mutex;
-
-    enum ChannelState {
-        ChannelClosed,
-        ChannelOpening,
-        ChannelConnected,
-        ChannelError
-    };
-
 public:
-    typedef IPC::Channel Transport;
-    typedef IPC::Message Message;
-
-    class Listener
+    class Listener : public SyncChannel::Listener
     {
     public:
-        enum Result {
-            MsgProcessed,
-            MsgNotKnown,
-            MsgNotAllowed,
-            MsgPayloadError,
-            MsgRouteError,
-            MsgValueError,
-        };
-
         virtual ~Listener() { }
+        virtual Result OnMessageReceived(const Message& aMessage) = 0;
+        virtual Result OnMessageReceived(const Message& aMessage,
+                                         Message*& aReply) = 0;
         virtual Result OnCallReceived(const Message& aMessage,
                                       Message*& aReply) = 0;
     };
 
-    /**
-     * Convert the asynchronous channel |aChannel| into a channel with
-     * RPC semantics.  Received messages are passed down to
-     * |aListener|.
-     *
-     * FIXME do away with |aMode|
-     */
     RPCChannel(Listener* aListener) :
-        mTransport(0),
-        mListener(aListener),
-        mChannelState(ChannelClosed),
-        mMutex("mozilla.ipc.RPCChannel.mMutex"),
-        mCvar(mMutex, "mozilla.ipc.RPCChannel.mCvar")
+        SyncChannel(aListener)
     {
     }
 
     virtual ~RPCChannel()
     {
-        if (mTransport)
-            Close();
-        mTransport = 0;
+        // FIXME/cjones: impl
     }
 
-    // Open  from the perspective of the RPC layer; the transport
-    // should already be connected, or ready to connect.
-    bool Open(Transport* aTransport, MessageLoop* aIOLoop=0);
-    
-    // Close from the perspective of the RPC layer; leaves the
-    // underlying transport channel open, however.
-    void Close();
-
-    // Implement the IPC::Channel::Listener interface
-    virtual void OnMessageReceived(const Message& msg);
-    virtual void OnChannelConnected(int32 peer_pid);
-    virtual void OnChannelError();
-
     // Make an RPC to the other side of the channel
-    virtual bool Call(Message* msg, Message* reply);
+    bool Call(Message* msg, Message* reply);
+
+    // Override the SyncChannel handler so we can dispatch RPC messages
+    virtual void OnMessageReceived(const Message& msg);
 
 private:
-    // Task created when we're idle (wrt this channel), and the other 
-    // side has made an RPC to us
-    void OnIncomingCall(Message msg);
-    // Process an RPC made from the other side to here
-    bool ProcessIncomingCall(Message msg);
+    // Executed on worker thread
+    virtual bool WaitingForReply() {
+        mMutex.AssertCurrentThreadOwns();
+        return mPending.size() > 0 || SyncChannel::WaitingForReply();
+    }
 
-    void OnChannelOpened();
-    void SendCall(Message* aCall);
-    void SendReply(Message* aReply);
+    void OnDispatchMessage(const Message& msg);
 
-    Transport* mTransport;
-    Listener* mListener;
-    ChannelState mChannelState;
-    MessageLoop* mIOLoop;       // thread where IO happens
-    MessageLoop* mWorkerLoop;   // thread where work is done
-    Mutex mMutex;
-    CondVar mCvar;
     std::stack<Message> mPending;
 };
 
 
 } // namespace ipc
 } // namespace mozilla
 #endif  // ifndef ipc_glue_RPCChannel_h
new file mode 100644
--- /dev/null
+++ b/ipc/glue/SyncChannel.cpp
@@ -0,0 +1,174 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
+ * vim: sw=4 ts=4 et :
+ */
+/* ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0/LGPL 2.1
+ *
+ * The contents of this file are subject to the Mozilla Public License Version
+ * 1.1 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS" basis,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+ * for the specific language governing rights and limitations under the
+ * License.
+ *
+ * The Original Code is Mozilla Plugin App.
+ *
+ * The Initial Developer of the Original Code is
+ *   Chris Jones <jones.chris.g@gmail.com>
+ * Portions created by the Initial Developer are Copyright (C) 2009
+ * the Initial Developer. All Rights Reserved.
+ *
+ * Contributor(s):
+ *
+ * Alternatively, the contents of this file may be used under the terms of
+ * either the GNU General Public License Version 2 or later (the "GPL"), or
+ * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
+ * in which case the provisions of the GPL or the LGPL are applicable instead
+ * of those above. If you wish to allow use of your version of this file only
+ * under the terms of either the GPL or the LGPL, and not to allow others to
+ * use your version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the notice
+ * and other provisions required by the GPL or the LGPL. If you do not delete
+ * the provisions above, a recipient may use your version of this file under
+ * the terms of any one of the MPL, the GPL or the LGPL.
+ *
+ * ***** END LICENSE BLOCK ***** */
+
+#include "mozilla/ipc/SyncChannel.h"
+#include "mozilla/ipc/GeckoThread.h"
+
+#include "nsDebug.h"
+
+using mozilla::MutexAutoLock;
+
+template<>
+struct RunnableMethodTraits<mozilla::ipc::SyncChannel>
+{
+    static void RetainCallee(mozilla::ipc::SyncChannel* obj) { }
+    static void ReleaseCallee(mozilla::ipc::SyncChannel* obj) { }
+};
+
+namespace mozilla {
+namespace ipc {
+
+bool
+SyncChannel::Send(Message* msg, Message* reply)
+{
+    NS_PRECONDITION(msg->is_sync(), "can only Send() sync messages here");
+
+    MutexAutoLock lock(mMutex);
+
+    mChannelState = ChannelWaiting;
+    mPendingReply = msg->type() + 1;
+    /*assert*/AsyncChannel::Send(msg);
+
+    while (1) {
+        // here we're waiting for something to happen.  it may be either:
+        //  (1) the reply we're waiting for (mPendingReply)
+        // or
+        //  (2) any other message
+        //
+        // In case (1), we return this reply back to the caller.
+        // In case (2), we defer processing of the message until our reply
+        // comes back.
+        mCvar.Wait();
+
+        if (mRecvd.is_reply() && mPendingReply == mRecvd.type()) {
+            // case (1)
+            mPendingReply = 0;
+            *reply = mRecvd;
+
+            if (!WaitingForReply()) {
+                mChannelState = ChannelIdle;
+            }
+
+            return true;
+        }
+        else {
+            // case (2)
+            NS_ASSERTION(!mRecvd.is_reply(), "can't process replies here");
+            // post a task to our own event loop
+            mWorkerLoop->PostTask(
+                FROM_HERE,
+                NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, mRecvd));
+        }
+    }
+}
+
+void
+SyncChannel::OnDispatchMessage(const Message& msg)
+{
+    NS_ASSERTION(!msg.is_reply(), "can't process replies here");
+    NS_ASSERTION(!msg.is_rpc(), "sync or async only here");
+
+    if (!msg.is_sync()) {
+        return AsyncChannel::OnDispatchMessage(msg);
+    }
+
+    Message* reply;
+    switch (static_cast<Listener*>(mListener)->OnMessageReceived(msg, reply)) {
+    case Listener::MsgProcessed:
+        mIOLoop->PostTask(FROM_HERE,
+                          NewRunnableMethod(this,
+                                            &SyncChannel::OnSendReply,
+                                            reply));
+        return;
+
+    case Listener::MsgNotKnown:
+    case Listener::MsgNotAllowed:
+    case Listener::MsgPayloadError:
+    case Listener::MsgRouteError:
+    case Listener::MsgValueError:
+        // FIXME/cjones: error handling; OnError()?
+        return;
+
+    default:
+        NOTREACHED();
+        return;
+    }
+}
+
+//
+// The methods below run in the context of the IO thread, and can proxy
+// back to the methods above
+//
+
+void
+SyncChannel::OnMessageReceived(const Message& msg)
+{
+    MutexAutoLock lock(mMutex);
+
+    if (ChannelIdle == mChannelState) {
+        // wake up the worker, there's work to do
+        if (msg.is_sync()) {
+            mWorkerLoop->PostTask(
+                FROM_HERE,
+                NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, msg));
+        }
+        else {
+            return AsyncChannel::OnMessageReceived(msg);
+        }
+    }
+    else if (ChannelWaiting == mChannelState) {
+        // let the worker know something new has happened
+        mRecvd = msg;
+        mCvar.Notify();
+    }
+    else {
+        // FIXME/cjones: could reach here in error conditions.  impl me
+        NOTREACHED();
+    }
+}
+
+void
+SyncChannel::OnSendReply(Message* aReply)
+{
+    mTransport->Send(aReply);
+}
+
+
+} // namespace ipc
+} // namespace mozilla
new file mode 100644
--- /dev/null
+++ b/ipc/glue/SyncChannel.h
@@ -0,0 +1,116 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
+ * vim: sw=4 ts=4 et :
+ */
+/* ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0/LGPL 2.1
+ *
+ * The contents of this file are subject to the Mozilla Public License Version
+ * 1.1 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS" basis,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+ * for the specific language governing rights and limitations under the
+ * License.
+ *
+ * The Original Code is Mozilla Plugin App.
+ *
+ * The Initial Developer of the Original Code is
+ *   Chris Jones <jones.chris.g@gmail.com>
+ * Portions created by the Initial Developer are Copyright (C) 2009
+ * the Initial Developer. All Rights Reserved.
+ *
+ * Contributor(s):
+ *
+ * Alternatively, the contents of this file may be used under the terms of
+ * either the GNU General Public License Version 2 or later (the "GPL"), or
+ * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
+ * in which case the provisions of the GPL or the LGPL are applicable instead
+ * of those above. If you wish to allow use of your version of this file only
+ * under the terms of either the GPL or the LGPL, and not to allow others to
+ * use your version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the notice
+ * and other provisions required by the GPL or the LGPL. If you do not delete
+ * the provisions above, a recipient may use your version of this file under
+ * the terms of any one of the MPL, the GPL or the LGPL.
+ *
+ * ***** END LICENSE BLOCK ***** */
+
+#ifndef ipc_glue_SyncChannel_h
+#define ipc_glue_SyncChannel_h 1
+
+#include <queue>
+
+#include "mozilla/CondVar.h"
+#include "mozilla/Mutex.h"
+#include "mozilla/ipc/AsyncChannel.h"
+
+namespace mozilla {
+namespace ipc {
+//-----------------------------------------------------------------------------
+
+class SyncChannel : public AsyncChannel
+{
+protected:
+    typedef mozilla::CondVar CondVar;
+    typedef mozilla::Mutex Mutex;
+    typedef uint16 MessageId;
+    typedef std::queue<Message> MessageQueue;
+
+public:
+    class /*NS_INTERFACE_CLASS*/ Listener :
+        public AsyncChannel::Listener
+    {
+    public:
+        virtual ~Listener() { }
+        virtual Result OnMessageReceived(const Message& aMessage) = 0;
+        virtual Result OnMessageReceived(const Message& aMessage,
+                                         Message*& aReply) = 0;
+    };
+
+    SyncChannel(Listener* aListener) :
+        AsyncChannel(aListener),
+        mMutex("mozilla.ipc.SyncChannel.mMutex"),
+        mCvar(mMutex, "mozilla.ipc.SyncChannel.mCvar")
+    {
+    }
+
+    virtual ~SyncChannel()
+    {
+        // FIXME/cjones: impl
+    }
+
+    
+    bool Send(Message* msg) {
+        return AsyncChannel::Send(msg);
+    }
+
+    // Synchronously send |msg| (i.e., wait for |reply|)
+    bool Send(Message* msg, Message* reply);
+
+    // Override the AsyncChannel handler so we can dispatch sync messages
+    virtual void OnMessageReceived(const Message& msg);
+
+protected:
+    // Executed on the worker thread
+    virtual bool WaitingForReply() {
+        mMutex.AssertCurrentThreadOwns();
+        return mPendingReply != 0;
+    }
+
+    void OnDispatchMessage(const Message& aMsg);
+
+    // Executed on the IO thread.
+    void OnSendReply(Message* msg);
+
+    Mutex mMutex;
+    CondVar mCvar;
+    MessageId mPendingReply;
+    Message mRecvd;
+};
+
+
+} // namespace ipc
+} // namespace mozilla
+#endif  // ifndef ipc_glue_SyncChannel_h
--- a/ipc/ipdl/ipdl/lower.py
+++ b/ipc/ipdl/ipdl/lower.py
@@ -822,19 +822,34 @@ class GenerateProtocolActorHeader(Visito
             else:
                 route = cxx.ExprVar('MSG_ROUTING_CONTROL')
 
             impl.addstmt(cxx.StmtExpr(cxx.ExprAssn(msgvar, msgctor)))
             impl.addstmt(cxx.StmtExpr(
                     cxx.ExprCall(cxx.ExprSelect(msgvar, '->', 'set_routing_id'),
                                  [ route ])))
 
+            if md.decl.type.isAsync():
+                sendmethod = 'Send'
+            elif md.decl.type.isSync():
+                sendmethod = 'Send'
+                impl.addstmt(cxx.StmtExpr(
+                        cxx.ExprCall(cxx.ExprSelect(msgvar, '->', 'set_sync'),
+                                     [ ])))
+            elif md.decl.type.isRpc():
+                sendmethod = 'Call'
+                impl.addstmt(cxx.StmtExpr(
+                        cxx.ExprCall(cxx.ExprSelect(msgvar, '->', 'set_rpc'),
+                                     [ ])))
+            else:
+                assert 0
+
             sendcall = cxx.ExprCall(
                 cxx.ExprSelect(
-                    cxx.ExprVar('mChannel'), self.channelsel, 'Call'),
+                    cxx.ExprVar('mChannel'), self.channelsel, sendmethod),
                 [ msgvar ])
             if hasreply:
                 sendcall.args.append(cxx.ExprAddrOf(replyvar))
 
             failif = cxx.StmtIf(cxx.ExprPrefixUnop(sendcall, '!'))
             failif.ifb.addstmt(cxx.StmtReturn(failerrcode))
             impl.addstmt(failif)
 
@@ -1013,16 +1028,27 @@ class GenerateProtocolActorHeader(Visito
                 if md.decl.type.hasImplicitActorParam():
                     replymsgctor.args.append(ahvar)
                 block.addstmt(cxx.StmtExpr(cxx.ExprAssn(replyvar,
                                                         replymsgctor)))
                 block.addstmt(cxx.StmtExpr(cxx.ExprCall(
                             cxx.ExprSelect(replyvar, '->', 'set_reply'),
                             [ ])))
 
+                if md.decl.type.isSync():
+                    block.addstmt(cxx.StmtExpr(cxx.ExprCall(
+                            cxx.ExprSelect(replyvar, '->', 'set_sync'),
+                            [ ])))
+                elif md.decl.type.isRpc():
+                    block.addstmt(cxx.StmtExpr(cxx.ExprCall(
+                            cxx.ExprSelect(replyvar, '->', 'set_rpc'),
+                            [ ])))
+                else:
+                    assert 0
+
             block.addstmt(cxx.StmtReturn(cxx.ExprVar('MsgProcessed')))
 
             if md.decl.type.isAsync():
                 self.asyncswitch.addcase(case, block)
             elif md.decl.type.isSync():
                 self.syncswitch.addcase(case, block)
             else:
                 self.rpcswitch.addcase(case, block)