Add urgent message semantics to IPC and IPDL (bug 867013, r=cjones).
authorDavid Anderson <danderson@mozilla.com>
Tue, 02 Jul 2013 20:37:33 -0700
changeset 137276 e10afc0c67e086414aacc38118ab3115d8cb7321
parent 137275 657f8910c1129dbeee37bdc1d94a69ab52ebd0b3
child 137277 9ed886b5122b023b958e1dcbc7fffc388645a514
push id1824
push userryanvm@gmail.com
push dateWed, 03 Jul 2013 18:16:56 +0000
treeherderfx-team@dcbbfcdf7bb4 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerscjones
bugs867013
milestone25.0a1
Add urgent message semantics to IPC and IPDL (bug 867013, r=cjones).
ipc/glue/RPCChannel.cpp
ipc/glue/RPCChannel.h
ipc/glue/SyncChannel.cpp
ipc/glue/SyncChannel.h
ipc/ipdl/ipdl/ast.py
ipc/ipdl/ipdl/cgen.py
ipc/ipdl/ipdl/lower.py
ipc/ipdl/ipdl/parser.py
ipc/ipdl/ipdl/type.py
ipc/ipdl/test/cxx/Makefile.in
ipc/ipdl/test/cxx/PTestUrgency.ipdl
ipc/ipdl/test/cxx/TestUrgency.cpp
ipc/ipdl/test/cxx/TestUrgency.h
ipc/ipdl/test/cxx/ipdl.mk
--- a/ipc/glue/RPCChannel.cpp
+++ b/ipc/glue/RPCChannel.cpp
@@ -65,16 +65,17 @@ bool
 RPCChannel::EventOccurred() const
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
     RPC_ASSERT(StackDepth() > 0, "not in wait loop");
 
     return (!Connected() ||
             !mPending.empty() ||
+            !mUrgent.empty() ||
             (!mOutOfTurnReplies.empty() &&
              mOutOfTurnReplies.find(mStack.top().seqno())
              != mOutOfTurnReplies.end()));
 }
 
 bool
 RPCChannel::Send(Message* msg)
 {
@@ -89,20 +90,22 @@ RPCChannel::Send(Message* msg, Message* 
     Message copy = *msg;
     CxxStackFrame f(*this, OUT_MESSAGE, &copy);
     return SyncChannel::Send(msg, reply);
 }
 
 bool
 RPCChannel::Call(Message* _msg, Message* reply)
 {
+    RPC_ASSERT(!mPendingReply, "should not be waiting for a reply");
+
     nsAutoPtr<Message> msg(_msg);
     AssertWorkerThread();
     mMonitor->AssertNotCurrentThreadOwns();
-    RPC_ASSERT(!ProcessingSyncMessage(),
+    RPC_ASSERT(!ProcessingSyncMessage() || msg->priority() == IPC::Message::PRIORITY_HIGH,
                "violation of sync handler invariant");
     RPC_ASSERT(msg->is_rpc(), "can only Call() RPC messages here");
 
 #ifdef OS_WIN
     SyncStackFrame frame(this, true);
 #endif
 
     Message copy = *msg;
@@ -110,16 +113,18 @@ RPCChannel::Call(Message* _msg, Message*
 
     MonitorAutoLock lock(*mMonitor);
 
     if (!Connected()) {
         ReportConnectionError("RPCChannel");
         return false;
     }
 
+    bool urgent = (copy.priority() == IPC::Message::PRIORITY_HIGH);
+
     msg->set_seqno(NextSeqno());
     msg->set_rpc_remote_stack_depth_guess(mRemoteStackDepthGuess);
     msg->set_rpc_local_stack_depth(1 + StackDepth());
     mStack.push(*msg);
 
     mLink->SendMessage(msg.forget());
 
     while (1) {
@@ -161,46 +166,49 @@ RPCChannel::Call(Message* _msg, Message*
         Message recvd;
         MessageMap::iterator it;
         if (!mOutOfTurnReplies.empty() &&
             ((it = mOutOfTurnReplies.find(mStack.top().seqno())) !=
             mOutOfTurnReplies.end())) {
             recvd = it->second;
             mOutOfTurnReplies.erase(it);
         }
+        else if (!mUrgent.empty()) {
+            recvd = mUrgent.front();
+            mUrgent.pop_front();
+        }
         else if (!mPending.empty()) {
             recvd = mPending.front();
             mPending.pop_front();
         }
         else {
             // because of subtleties with nested event loops, it's
             // possible that we got here and nothing happened.  or, we
             // might have a deferred in-call that needs to be
             // processed.  either way, we won't break the inner while
             // loop again until something new happens.
             continue;
         }
 
-        if (!recvd.is_sync() && !recvd.is_rpc()) {
-            MonitorAutoUnlock unlock(*mMonitor);
-
-            CxxStackFrame f(*this, IN_MESSAGE, &recvd);
-            AsyncChannel::OnDispatchMessage(recvd);
-
-            continue;
-        }
-
-        if (recvd.is_sync()) {
-            RPC_ASSERT(mPending.empty(),
-                       "other side should have been blocked");
-            MonitorAutoUnlock unlock(*mMonitor);
-
-            CxxStackFrame f(*this, IN_MESSAGE, &recvd);
-            SyncChannel::OnDispatchMessage(recvd);
-
+        if (!recvd.is_rpc()) {
+            if (urgent && recvd.priority() != IPC::Message::PRIORITY_HIGH) {
+                // If we're waiting for an urgent reply, don't process any
+                // messages yet.
+                mNonUrgentDeferred.push_back(recvd);
+            } else if (recvd.is_sync()) {
+                RPC_ASSERT(mPending.empty(),
+                           "other side should have been blocked");
+                MonitorAutoUnlock unlock(*mMonitor);
+                CxxStackFrame f(*this, IN_MESSAGE, &recvd);
+                SyncChannel::OnDispatchMessage(recvd);
+            } else {
+                MonitorAutoUnlock unlock(*mMonitor);
+                CxxStackFrame f(*this, IN_MESSAGE, &recvd);
+                AsyncChannel::OnDispatchMessage(recvd);
+            }
             continue;
         }
 
         RPC_ASSERT(recvd.is_rpc(), "wtf???");
 
         if (recvd.is_reply()) {
             RPC_ASSERT(0 < mStack.size(), "invalid RPC stack");
 
@@ -289,28 +297,27 @@ RPCChannel::MaybeUndeferIncall()
 void
 RPCChannel::EnqueuePendingMessages()
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
 
     MaybeUndeferIncall();
 
-    for (size_t i = 0; i < mDeferred.size(); ++i)
-        mWorkerLoop->PostTask(
-            FROM_HERE,
-            new DequeueTask(mDequeueOneTask));
+    for (size_t i = 0; i < mDeferred.size(); ++i) {
+        mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
+    }
 
     // XXX performance tuning knob: could process all or k pending
     // messages here, rather than enqueuing for later processing
 
-    for (size_t i = 0; i < mPending.size(); ++i)
-        mWorkerLoop->PostTask(
-            FROM_HERE,
-            new DequeueTask(mDequeueOneTask));
+    size_t total = mPending.size() + mUrgent.size() + mNonUrgentDeferred.size();
+    for (size_t i = 0; i < total; ++i) {
+        mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
+    }
 }
 
 void
 RPCChannel::FlushPendingRPCQueue()
 {
     AssertWorkerThread();
     mMonitor->AssertNotCurrentThreadOwns();
 
@@ -346,21 +353,26 @@ RPCChannel::OnMaybeDequeueOne()
         if (!Connected()) {
             ReportConnectionError("RPCChannel");
             return false;
         }
 
         if (!mDeferred.empty())
             MaybeUndeferIncall();
 
-        if (mPending.empty())
+        MessageQueue *queue = mUrgent.empty()
+                              ? mNonUrgentDeferred.empty()
+                                ? &mPending
+                                : &mNonUrgentDeferred
+                              : &mUrgent;
+        if (queue->empty())
             return false;
 
-        recvd = mPending.front();
-        mPending.pop_front();
+        recvd = queue->front();
+        queue->pop_front();
     }
 
     if (IsOnCxxStack() && recvd.is_rpc() && recvd.is_reply()) {
         // We probably just received a reply in a nested loop for an
         // RPC call sent before entering that loop.
         mOutOfTurnReplies[recvd.seqno()] = recvd;
         return false;
     }
@@ -559,42 +571,67 @@ RPCChannel::OnMessageReceivedFromLink(co
     // know that it needs to be immediately handled to unblock us.
     if (AwaitingSyncReply() && msg.is_sync()) {
         // wake up worker thread waiting at SyncChannel::Send
         mRecvd = msg;
         NotifyWorkerThread();
         return;
     }
 
-    bool compressMessage = (msg.compress() && !mPending.empty() &&
-                            mPending.back().type() == msg.type() &&
-                            mPending.back().routing_id() == msg.routing_id());
+    MessageQueue *queue = (msg.priority() == IPC::Message::PRIORITY_HIGH)
+                          ? &mUrgent
+                          : &mPending;
+
+    bool compressMessage = (msg.compress() && !queue->empty() &&
+                            queue->back().type() == msg.type() &&
+                            queue->back().routing_id() == msg.routing_id());
     if (compressMessage) {
         // This message type has compression enabled, and the back of
         // the queue was the same message type and routed to the same
         // destination.  Replace it with the newer message.
-        MOZ_ASSERT(mPending.back().compress());
-        mPending.pop_back();
+        MOZ_ASSERT(queue->back().compress());
+        queue->pop_back();
     }
 
-    mPending.push_back(msg);
+    queue->push_back(msg);
 
-    if (0 == StackDepth()) {
-        // the worker thread might be idle, make sure it wakes up
+    // There are three cases we're concerned about, relating to the state of
+    // the main thread:
+    //
+    // (1) We are waiting on a sync reply - main thread is blocked on the IPC monitor.
+    //   - If the message is high priority, we wake up the main thread to
+    //     deliver the message. Otherwise, we leave it in the mPending queue,
+    //     posting a task to the main event loop, where it will be processed
+    //     once the synchronous reply has been received.
+    //
+    // (2) We are waiting on an RPC reply - main thread is blocked on the IPC monitor.
+    //   - Always wake up the main thread to deliver the message.
+    //
+    // (3) We are not waiting on a reply.
+    //   - We post a task to the main event loop.
+    //
+    bool waiting_rpc = (0 != StackDepth());
+    bool urgent = (msg.priority() == IPC::Message::PRIORITY_HIGH);
+
+    if (waiting_rpc || (AwaitingSyncReply() && urgent)) {
+        // Always wake up our RPC waiter, and wake up sync waiters for urgent
+        // messages.
+        NotifyWorkerThread();
+    } else {
+        // Worker thread is either not blocked on a reply, or this is an
+        // incoming RPC that raced with outgoing sync and needs to be deferred
+        // to a later event-loop iteration.
         if (!compressMessage) {
             // If we compressed away the previous message, we'll reuse
             // its pending task.
             mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
         }
     }
-    else if (!AwaitingSyncReply())
-        NotifyWorkerThread();
 }
 
-
 void
 RPCChannel::OnChannelErrorFromLink()
 {
     AssertLinkThread();
     mMonitor->AssertCurrentThreadOwns();
 
     if (0 < StackDepth())
         NotifyWorkerThread();
--- a/ipc/glue/RPCChannel.h
+++ b/ipc/glue/RPCChannel.h
@@ -307,16 +307,21 @@ private:
     // sending a blocking message |(S< | C<)|.  If we had more than
     // one RPC call on our stack, the other side *better* not have
     // sent us another blocking message, because it's blocked on a
     // reply from us.
     //
     typedef std::deque<Message> MessageQueue;
     MessageQueue mPending;
 
+    // List of async and sync messages that have been received while waiting
+    // for an urgent reply, that need to be deferred until that reply has been
+    // received.
+    MessageQueue mNonUrgentDeferred;
+
     // 
     // Stack of all the RPC out-calls on which this RPCChannel is
     // awaiting a response.
     //
     std::stack<Message> mStack;
 
     //
     // Map of replies received "out of turn", because of RPC
--- a/ipc/glue/SyncChannel.cpp
+++ b/ipc/glue/SyncChannel.cpp
@@ -3,16 +3,17 @@
  */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include "mozilla/DebugOnly.h"
 
 #include "mozilla/ipc/SyncChannel.h"
+#include "mozilla/ipc/RPCChannel.h"
 
 #include "nsDebug.h"
 #include "nsTraceRefcnt.h"
 
 using mozilla::MonitorAutoLock;
 
 template<>
 struct RunnableMethodTraits<mozilla::ipc::SyncChannel>
@@ -57,22 +58,57 @@ bool SyncChannel::sIsPumpingMessages = f
 
 bool
 SyncChannel::EventOccurred()
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
     NS_ABORT_IF_FALSE(AwaitingSyncReply(), "not in wait loop");
 
-    return (!Connected() || 0 != mRecvd.type() || mRecvd.is_reply_error());
+    return !Connected() ||
+           mRecvd.type() != 0 ||
+           mRecvd.is_reply_error() ||
+           !mUrgent.empty();
+}
+
+bool
+SyncChannel::ProcessUrgentMessages()
+{
+    while (!mUrgent.empty()) {
+        Message msg = mUrgent.front();
+        mUrgent.pop_front();
+
+        MOZ_ASSERT(msg.priority() == IPC::Message::PRIORITY_HIGH);
+
+        {
+            MOZ_ASSERT(msg.is_sync() || msg.is_rpc());
+
+            MonitorAutoUnlock unlock(*mMonitor);
+            SyncChannel::OnDispatchMessage(msg);
+        }
+
+        // Check state that could have been changed during dispatch.
+        if (!Connected()) {
+            ReportConnectionError("SyncChannel");
+            return false;
+        }
+
+        // We should not have received another synchronous reply,
+        // because we cannot send synchronous messages in this state.
+        MOZ_ASSERT(mRecvd.type() == 0);
+    }
+
+    return true;
 }
 
 bool
 SyncChannel::Send(Message* _msg, Message* reply)
 {
+    MOZ_ASSERT(!mPendingReply);
+
     nsAutoPtr<Message> msg(_msg);
 
     AssertWorkerThread();
     mMonitor->AssertNotCurrentThreadOwns();
     NS_ABORT_IF_FALSE(!ProcessingSyncMessage(),
                       "violation of sync handler invariant");
     NS_ABORT_IF_FALSE(msg->is_sync(), "can only Send() sync messages here");
 
@@ -89,72 +125,104 @@ SyncChannel::Send(Message* _msg, Message
         return false;
     }
 
     mPendingReply = msg->type() + 1;
     DebugOnly<int32_t> msgSeqno = msg->seqno();
     mLink->SendMessage(msg.forget());
 
     while (1) {
-        bool maybeTimedOut = !SyncChannel::WaitForNotify();
+        // if a handler invoked by *Dispatch*() spun a nested event
+        // loop, and the connection was broken during that loop, we
+        // might have already processed the OnError event. if so,
+        // trying another loop iteration will be futile because
+        // channel state will have been cleared
+        if (!Connected()) {
+            ReportConnectionError("SyncChannel");
+            return false;
+        }
+
+        while (!EventOccurred()) {
+            bool maybeTimedOut = !SyncChannel::WaitForNotify();
 
-        if (EventOccurred())
-            break;
+            if (EventOccurred())
+                break;
+
+            // we might have received a "subtly deferred" message
+            // in a nested loop that it's now time to process
+            if (!mUrgent.empty())
+                break;
+
+            if (maybeTimedOut && !ShouldContinueFromTimeout())
+                return false;
+        }
+
+        if (!Connected()) {
+            ReportConnectionError("SyncChannel");
 
-        if (maybeTimedOut && !ShouldContinueFromTimeout())
             return false;
-    }
+        }
+
+        // Process all urgent messages. We forbid nesting synchronous sends,
+        // so mPendingReply etc will still be valid.
+        if (!ProcessUrgentMessages())
+            return false;
 
-    if (!Connected()) {
-        ReportConnectionError("SyncChannel");
-        return false;
+        if (mRecvd.is_reply_error() || mRecvd.type() != 0) {
+            // we just received a synchronous message from the other side.
+            // If it's not the reply we were awaiting, there's a serious
+            // error: either a mistimed/malformed message or a sync in-message
+            // that raced with our sync out-message.
+            // (NB: IPDL prevents the latter from occuring in actor code)
+            // FIXME/cjones: real error handling
+            NS_ABORT_IF_FALSE(mRecvd.is_sync() && mRecvd.is_reply() &&
+                              (mRecvd.is_reply_error() ||
+                               (mPendingReply == mRecvd.type() &&
+                                msgSeqno == mRecvd.seqno())),
+                              "unexpected sync message");
+
+            mPendingReply = 0;
+            if (mRecvd.is_reply_error())
+                return false;
+
+            *reply = TakeReply();
+
+            MOZ_ASSERT(mUrgent.empty());
+            return true;
+        }
     }
 
-    // we just received a synchronous message from the other side.
-    // If it's not the reply we were awaiting, there's a serious
-    // error: either a mistimed/malformed message or a sync in-message
-    // that raced with our sync out-message.
-    // (NB: IPDL prevents the latter from occuring in actor code)
-
-    // FIXME/cjones: real error handling
-    bool replyIsError = mRecvd.is_reply_error();
-    NS_ABORT_IF_FALSE(mRecvd.is_sync() && mRecvd.is_reply() &&
-                      (replyIsError ||
-                       (mPendingReply == mRecvd.type() &&
-                        msgSeqno == mRecvd.seqno())),
-                      "unexpected sync message");
-
-    mPendingReply = 0;
-    if (!replyIsError) {
-        *reply = mRecvd;
-    }
-    mRecvd = Message();
-
-    return !replyIsError;
+    return true;
 }
 
 void
 SyncChannel::OnDispatchMessage(const Message& msg)
 {
     AssertWorkerThread();
-    NS_ABORT_IF_FALSE(msg.is_sync(), "only sync messages here");
+    NS_ABORT_IF_FALSE(msg.is_sync() || msg.is_rpc(), "only sync messages here");
     NS_ABORT_IF_FALSE(!msg.is_reply(), "wasn't awaiting reply");
 
     Message* reply = 0;
 
     mProcessingSyncMessage = true;
-    Result rv =
-        static_cast<SyncListener*>(mListener.get())->OnMessageReceived(msg, reply);
+    Result rv;
+    if (msg.is_sync())
+        rv = static_cast<SyncListener*>(mListener.get())->OnMessageReceived(msg, reply);
+    else
+        rv = static_cast<RPCChannel::RPCListener*>(mListener.get())->OnCallReceived(msg, reply);
     mProcessingSyncMessage = false;
 
     if (!MaybeHandleError(rv, "SyncChannel")) {
         // FIXME/cjones: error handling; OnError()?
         delete reply;
         reply = new Message();
-        reply->set_sync();
+        if (msg.is_sync())
+            reply->set_sync();
+        else if (msg.is_rpc())
+            reply->set_rpc();
         reply->set_reply();
         reply->set_reply_error();
     }
 
     reply->set_seqno(msg.seqno());
 
     {
         MonitorAutoLock lock(*mMonitor);
@@ -169,24 +237,38 @@ SyncChannel::OnDispatchMessage(const Mes
 //
 
 void
 SyncChannel::OnMessageReceivedFromLink(const Message& msg)
 {
     AssertLinkThread();
     mMonitor->AssertCurrentThreadOwns();
 
+    if (MaybeInterceptSpecialIOMessage(msg))
+        return;
+
+    if (msg.priority() == IPC::Message::PRIORITY_HIGH) {
+        // If the message is high priority, we skip the worker entirely, and
+        // wake up the loop that's spinning for a reply.
+        if (!AwaitingSyncReply()) {
+            mWorkerLoop->PostTask(
+                FROM_HERE,
+                NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, msg));
+        } else {
+            mUrgent.push_back(msg);
+            NotifyWorkerThread();
+        }
+        return;
+    }
+
     if (!msg.is_sync()) {
         AsyncChannel::OnMessageReceivedFromLink(msg);
         return;
     }
 
-    if (MaybeInterceptSpecialIOMessage(msg))
-        return;
-
     if (!AwaitingSyncReply()) {
         // wake up the worker, there's work to do
         mWorkerLoop->PostTask(
             FROM_HERE,
             NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, msg));
     }
     else {
         // let the worker know a new sync message has arrived
--- a/ipc/glue/SyncChannel.h
+++ b/ipc/glue/SyncChannel.h
@@ -138,16 +138,22 @@ protected:
     void NotifyWorkerThread();
 
     // On both
     bool AwaitingSyncReply() const {
         mMonitor->AssertCurrentThreadOwns();
         return mPendingReply != 0;
     }
 
+    Message TakeReply() {
+        Message reply = mRecvd;
+        mRecvd = Message();
+        return reply;
+    }
+
     int32_t NextSeqno() {
         AssertWorkerThread();
         return mChild ? --mNextSeqno : ++mNextSeqno;
     }
 
     msgid_t mPendingReply;
     bool mProcessingSyncMessage;
     Message mRecvd;
@@ -160,20 +166,23 @@ protected:
     // Timeout periods are broken up in two to prevent system suspension from
     // triggering an abort. This method (called by WaitForNotify with a 'did
     // timeout' flag) decides if we should wait again for half of mTimeoutMs
     // or give up.
     bool WaitResponse(bool aWaitTimedOut);
     bool mInTimeoutSecondHalf;
     int32_t mTimeoutMs;
 
+    std::deque<Message> mUrgent;
+
 #ifdef OS_WIN
     HANDLE mEvent;
 #endif
 
 private:
     bool EventOccurred();
+    bool ProcessUrgentMessages();
 };
 
 
 } // namespace ipc
 } // namespace mozilla
 #endif  // ifndef ipc_glue_SyncChannel_h
--- a/ipc/ipdl/ipdl/ast.py
+++ b/ipc/ipdl/ipdl/ast.py
@@ -204,16 +204,22 @@ class RPC:
     @classmethod
     def __str__(cls):  return cls.pretty
 class SYNC:
     pretty = 'sync'
     @classmethod
     def __hash__(cls): return hash(cls.pretty)
     @classmethod
     def __str__(cls):  return cls.pretty
+class URGENT:
+    pretty = 'urgent'
+    @classmethod
+    def __hash__(cls): return hash(cls.pretty)
+    @classmethod
+    def __str__(cls):  return cls.pretty
 
 class INOUT:
     pretty = 'inout'
     @classmethod
     def __hash__(cls): return hash(cls.pretty)
     @classmethod
     def __str__(cls):  return cls.pretty
 class IN:
@@ -231,20 +237,22 @@ class OUT:
     @classmethod
     def __str__(cls):  return cls.pretty
     @staticmethod
     def prettySS(ss): return _prettyTable['out'][ss.pretty]
 
 _prettyTable = {
     IN  : { 'async': 'AsyncRecv',
             'sync': 'SyncRecv',
-            'rpc': 'RpcAnswer' },
+            'rpc': 'RpcAnswer',
+            'urgent': 'UrgentAnswer' },
     OUT : { 'async': 'AsyncSend',
             'sync': 'SyncSend',
-            'rpc': 'RpcCall' }
+            'rpc': 'RpcCall',
+            'urgent': 'UrgentCall' }
     # inout doesn't make sense here
 }
 
 
 class Namespace(Node):
     def __init__(self, loc, namespace):
         Node.__init__(self, loc)
         self.name = namespace
--- a/ipc/ipdl/ipdl/cgen.py
+++ b/ipc/ipdl/ipdl/cgen.py
@@ -1,16 +1,16 @@
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 import os, sys
 
 from ipdl.ast import Visitor
-from ipdl.ast import IN, OUT, INOUT, ASYNC, SYNC, RPC
+from ipdl.ast import IN, OUT, INOUT, ASYNC, SYNC, RPC, URGENT
 
 class CodePrinter:
     def __init__(self, outf=sys.stdout, indentCols=4):
         self.outf = outf
         self.col = 0
         self.indentCols = indentCols
 
     def write(self, str):
--- a/ipc/ipdl/ipdl/lower.py
+++ b/ipc/ipdl/ipdl/lower.py
@@ -1549,24 +1549,24 @@ class _GenerateProtocolCode(ipdl.ast.Vis
 
         tfDecl, tfDefn = _splitFuncDeclDefn(self.genTransitionFunc())
         ns.addstmts([ tfDecl, Whitespace.NL ])
         self.funcDefns.append(tfDefn)
 
         typedefs = self.protocol.decl.cxxtypedefs
         for md in p.messageDecls:
             ns.addstmts([
-                _generateMessageClass(md.msgClass(), md.msgId(),
+                _generateMessageClass(md, md.msgClass(), md.msgId(),
                                       typedefs, md.prettyMsgName(p.name+'::'),
                                       md.decl.type.compress),
                 Whitespace.NL ])
             if md.hasReply():
                 ns.addstmts([
                     _generateMessageClass(
-                        md.replyClass(), md.replyId(),
+                        md, md.replyClass(), md.replyId(),
                         typedefs, md.prettyReplyName(p.name+'::'),
                         md.decl.type.compress),
                     Whitespace.NL ])
 
         ns.addstmts([ Whitespace.NL, Whitespace.NL ])
 
 
     def genBridgeFunc(self, bridge):
@@ -1746,39 +1746,43 @@ class _GenerateProtocolCode(ipdl.ast.Vis
             # all --> Error transitions break to here
             StmtExpr(ExprAssn(ExprDeref(nextvar), _errorState())),
             StmtReturn(ExprLiteral.FALSE)
         ])
         return transitionfunc
 
 ##--------------------------------------------------
 
-def _generateMessageClass(clsname, msgid, typedefs, prettyName, compress):
+def _generateMessageClass(md, clsname, msgid, typedefs, prettyName, compress):
     cls = Class(name=clsname, inherits=[ Inherit(Type('IPC::Message')) ])
     cls.addstmt(Label.PRIVATE)
     cls.addstmts(typedefs)
     cls.addstmt(Whitespace.NL)
 
     cls.addstmt(Label.PUBLIC)
 
     idenum = TypeEnum()
     idenum.addId('ID', msgid)
     cls.addstmt(StmtDecl(Decl(idenum, '')))
 
     # make the message constructor
     if compress:
         compression = ExprVar('COMPRESSION_ENABLED')
     else:
         compression = ExprVar('COMPRESSION_NONE')
+    if md.decl.type.isUrgent():
+        priority = 'PRIORITY_HIGH'
+    else:
+        priority = 'PRIORITY_NORMAL'
     ctor = ConstructorDefn(
         ConstructorDecl(clsname),
         memberinits=[ ExprMemberInit(ExprVar('IPC::Message'),
                                      [ ExprVar('MSG_ROUTING_NONE'),
                                        ExprVar('ID'),
-                                       ExprVar('PRIORITY_NORMAL'),
+                                       ExprVar(priority),
                                        compression,
                                        ExprLiteral.String(prettyName) ]) ])
     cls.addstmts([ ctor, Whitespace.NL ])
 
     # generate a logging function
     # 'pfx' will be something like "[FooParent] sent"
     pfxvar = ExprVar('__pfx')
     outfvar = ExprVar('__outf')
@@ -4507,17 +4511,17 @@ class _GenerateProtocolActorCode(ipdl.as
         helpermethod = None
         recvlbl, recvcase = None, None
 
         def addRecvCase(lbl, case):
             if sems is ipdl.ast.ASYNC:
                 self.asyncSwitch.addcase(lbl, case)
             elif sems is ipdl.ast.SYNC:
                 self.syncSwitch.addcase(lbl, case)
-            elif sems is ipdl.ast.RPC:
+            elif sems is ipdl.ast.RPC or sems is ipdl.ast.URGENT:
                 self.rpcSwitch.addcase(lbl, case)
             else: assert 0
 
         if self.sendsMessage(md):
             isasync = (sems is ipdl.ast.ASYNC)
 
             if isctor:
                 self.cls.addstmts([ self.genHelperCtor(md), Whitespace.NL ])
--- a/ipc/ipdl/ipdl/parser.py
+++ b/ipc/ipdl/ipdl/parser.py
@@ -139,16 +139,17 @@ reserved = set((
         'rpc',
         'send',
         'spawns',
         'start',
         'state',
         'struct',
         'sync',
         'union',
+        'urgent',
         'using'))
 tokens = [
     'COLONCOLON', 'ID', 'STRING'
 ] + [ r.upper() for r in reserved ]
 
 t_COLONCOLON = '::'
 
 literals = '(){}[]<>;:,~'
@@ -600,21 +601,23 @@ def p_OptionalSendSemanticsQual(p):
     """OptionalSendSemanticsQual : SendSemanticsQual
                                  | """
     if 2 == len(p): p[0] = p[1]
     else:           p[0] = ASYNC
 
 def p_SendSemanticsQual(p):
     """SendSemanticsQual : ASYNC
                          | RPC
+                         | URGENT
                          | SYNC"""
     s = p[1]
-    if 'async' == s: p[0] = ASYNC
-    elif 'rpc' == s: p[0] = RPC
-    elif 'sync'== s: p[0] = SYNC
+    if 'async' == s: p[0] =    ASYNC
+    elif 'rpc' == s: p[0] =    RPC
+    elif 'sync' == s: p[0] =   SYNC
+    elif 'urgent' == s: p[0] = URGENT
     else:
         assert 0
 
 def p_ParamList(p):
     """ParamList : ParamList ',' Param
                  | Param
                  | """
     if 1 == len(p):
--- a/ipc/ipdl/ipdl/type.py
+++ b/ipc/ipdl/ipdl/type.py
@@ -1,15 +1,16 @@
+# vim: set ts=4 sw=4 tw=99 et:
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 import os, sys
 
-from ipdl.ast import CxxInclude, Decl, Loc, QualifiedId, State, StructDecl, TransitionStmt, TypeSpec, UnionDecl, UsingStmt, Visitor, ASYNC, SYNC, RPC, IN, OUT, INOUT, ANSWER, CALL, RECV, SEND
+from ipdl.ast import CxxInclude, Decl, Loc, QualifiedId, State, StructDecl, TransitionStmt, TypeSpec, UnionDecl, UsingStmt, Visitor, ASYNC, SYNC, RPC, IN, OUT, INOUT, ANSWER, CALL, RECV, SEND, URGENT
 import ipdl.builtin as builtin
 
 _DELETE_MSG = '__delete__'
 
 
 def _otherside(side):
     if side == 'parent':  return 'child'
     elif side == 'child': return 'parent'
@@ -198,17 +199,18 @@ class IPDLType(Type):
     def isAtom(self):  return True
     def isCompound(self): return False
     def isShmem(self): return False
     def isChmod(self): return False
     def isFD(self): return False
 
     def isAsync(self): return self.sendSemantics is ASYNC
     def isSync(self): return self.sendSemantics is SYNC
-    def isRpc(self): return self.sendSemantics is RPC
+    def isRpc(self): return self.sendSemantics is RPC or self.sendSemantics is URGENT
+    def isUrgent(self): return self.sendSemantics is URGENT
 
     def talksAsync(self): return True
     def talksSync(self): return self.isSync() or self.isRpc()
     def talksRpc(self): return self.isRpc()
 
     def hasReply(self):  return self.isSync() or self.isRpc()
 
     def needsMoreJuiceThan(self, o):
--- a/ipc/ipdl/test/cxx/Makefile.in
+++ b/ipc/ipdl/test/cxx/Makefile.in
@@ -39,16 +39,17 @@ IPDLTESTS = \
   TestSelfManageRoot \
   TestShmem \
   TestShutdown \
   TestStackHooks \
   TestSyncError \
   TestSyncHang \
   TestSyncWakeup \
   TestBadActor \
+  TestUrgency \
   $(NULL)
 
 ifeq ($(OS_ARCH),Linux)
 IPDLTESTS += TestSysVShmem
 endif
 
 EXTRA_PROTOCOLS = \
   TestBridgeSub \
new file mode 100644
--- /dev/null
+++ b/ipc/ipdl/test/cxx/PTestUrgency.ipdl
@@ -0,0 +1,18 @@
+namespace mozilla {
+namespace _ipdltest {
+
+rpc protocol PTestUrgency
+{
+parent:
+    sync Test1() returns (uint32_t result);
+    async Test2();
+    sync Test3() returns (uint32_t result);
+
+child:
+    async Start();
+    urgent Reply1() returns (uint32_t result);
+    urgent Reply2() returns (uint32_t result);
+};
+
+} // namespace _ipdltest
+} // namespace mozilla
new file mode 100644
--- /dev/null
+++ b/ipc/ipdl/test/cxx/TestUrgency.cpp
@@ -0,0 +1,142 @@
+#include "TestUrgency.h"
+
+#include "IPDLUnitTests.h"      // fail etc.
+#include <unistd.h>
+
+namespace mozilla {
+namespace _ipdltest {
+
+//-----------------------------------------------------------------------------
+// parent
+
+TestUrgencyParent::TestUrgencyParent()
+  : inreply_(false)
+{
+    MOZ_COUNT_CTOR(TestUrgencyParent);
+}
+
+TestUrgencyParent::~TestUrgencyParent()
+{
+    MOZ_COUNT_DTOR(TestUrgencyParent);
+}
+
+void
+TestUrgencyParent::Main()
+{
+  if (!SendStart())
+    fail("sending Start");
+}
+
+bool
+TestUrgencyParent::RecvTest1(uint32_t *value)
+{
+  if (!CallReply1(value))
+    fail("sending Reply1");
+  if (*value != 99)
+    fail("bad value");
+  return true;
+}
+
+bool
+TestUrgencyParent::RecvTest2()
+{
+  uint32_t value;
+  inreply_ = true;
+  if (!CallReply2(&value))
+    fail("sending Reply2");
+  inreply_ = false;
+  if (value != 500)
+    fail("bad value");
+  return true;
+}
+
+bool
+TestUrgencyParent::RecvTest3(uint32_t *value)
+{
+  if (inreply_)
+    fail("nested non-urgent on top of urgent rpc");
+  *value = 1000;
+  return true;
+}
+
+//-----------------------------------------------------------------------------
+// child
+
+enum {
+  kFirstTestBegin = 1,
+  kFirstTestGotReply,
+  kSecondTestBegin,
+  kSecondTestGotReply,
+};
+
+bool
+TestUrgencyChild::RecvStart()
+{
+  uint32_t result;
+  
+  // Send a synchronous message, expect to get an urgent message while
+  // blocked.
+  test_ = kFirstTestBegin;
+  if (!SendTest1(&result))
+    fail("calling SendTest1");
+  if (result != 99)
+    fail("bad result in RecvStart");
+  if (test_ != kFirstTestGotReply)
+    fail("never received urgent message");
+
+  // Initiate the next test by sending an asynchronous message, then becoming
+  // blocked. This tests that the urgent message is still delivered properly,
+  // and that the parent does not try to service the sync 
+  test_ = kSecondTestBegin;
+  if (!SendTest2())
+    fail("calling SendTest2");
+  if (!SendTest3(&result))
+    fail("calling SendTest3");
+  if (test_ != kSecondTestGotReply)
+    fail("never received urgent message #2");
+  if (result != 1000)
+    fail("wrong value from test3");
+
+  Close();
+
+  return true;
+}
+
+bool
+TestUrgencyChild::AnswerReply1(uint32_t *reply)
+{
+  if (test_ != kFirstTestBegin)
+    fail("wrong test # in AnswerReply1");
+
+  *reply = 99;
+  test_ = kFirstTestGotReply;
+  return true;
+}
+
+bool
+TestUrgencyChild::AnswerReply2(uint32_t *reply)
+{
+  if (test_ != kSecondTestBegin)
+    fail("wrong test # in AnswerReply2");
+
+  // sleep for 5 seconds so the parent process tries to deliver more messages.
+  sleep(5);
+
+  *reply = 500;
+  test_ = kSecondTestGotReply;
+  return true;
+}
+
+TestUrgencyChild::TestUrgencyChild()
+  : test_(0)
+{
+    MOZ_COUNT_CTOR(TestUrgencyChild);
+}
+
+TestUrgencyChild::~TestUrgencyChild()
+{
+    MOZ_COUNT_DTOR(TestUrgencyChild);
+}
+
+} // namespace _ipdltest
+} // namespace mozilla
new file mode 100644
--- /dev/null
+++ b/ipc/ipdl/test/cxx/TestUrgency.h
@@ -0,0 +1,68 @@
+#ifndef mozilla__ipdltest_TestUrgency_h
+#define mozilla__ipdltest_TestUrgency_h 1
+
+#include "mozilla/_ipdltest/IPDLUnitTests.h"
+
+#include "mozilla/_ipdltest/PTestUrgencyParent.h"
+#include "mozilla/_ipdltest/PTestUrgencyChild.h"
+
+namespace mozilla {
+namespace _ipdltest {
+
+
+class TestUrgencyParent :
+    public PTestUrgencyParent
+{
+public:
+    TestUrgencyParent();
+    virtual ~TestUrgencyParent();
+
+    static bool RunTestInProcesses() { return true; }
+    static bool RunTestInThreads() { return true; }
+
+    void Main();
+
+    bool RecvTest1(uint32_t *value);
+    bool RecvTest2();
+    bool RecvTest3(uint32_t *value);
+
+    virtual void ActorDestroy(ActorDestroyReason why) MOZ_OVERRIDE
+    {
+        if (NormalShutdown != why)
+            fail("unexpected destruction!");  
+        passed("ok");
+        QuitParent();
+    }
+private:
+    bool inreply_;
+};
+
+
+class TestUrgencyChild :
+    public PTestUrgencyChild
+{
+public:
+    TestUrgencyChild();
+    virtual ~TestUrgencyChild();
+
+    bool RecvStart();
+    bool AnswerReply1(uint32_t *reply);
+    bool AnswerReply2(uint32_t *reply);
+
+    virtual void ActorDestroy(ActorDestroyReason why) MOZ_OVERRIDE
+    {
+        if (NormalShutdown != why)
+            fail("unexpected destruction!");
+        QuitChild();
+    }
+
+private:
+    uint32_t test_;
+};
+
+
+} // namespace _ipdltest
+} // namespace mozilla
+
+
+#endif // ifndef mozilla__ipdltest_TestUrgency_h
--- a/ipc/ipdl/test/cxx/ipdl.mk
+++ b/ipc/ipdl/test/cxx/ipdl.mk
@@ -44,13 +44,14 @@ IPDLSRCS =					\
   PTestShutdownSubsub.ipdl			\
   PTestStackHooks.ipdl				\
   PTestSyncError.ipdl                           \
   PTestSyncHang.ipdl                            \
   PTestSyncWakeup.ipdl				\
   PTestSysVShmem.ipdl				\
   PTestBadActor.ipdl                            \
   PTestBadActorSub.ipdl                         \
+  PTestUrgency.ipdl \
   PTestIndirectProtocolParam.ipdlh	        \
   PTestIndirectProtocolParamManage.ipdl         \
   PTestIndirectProtocolParamFirst.ipdl	        \
   PTestIndirectProtocolParamSecond.ipdl	        \
   $(NULL)