Bug 540886, part 2: Offer a BlockChild() interface to RPC protocols that allows parents to prevent children from sending messages back of their own volition until the parent calls UnblockChild(). r=bent
authorChris Jones <jones.chris.g@gmail.com>
Wed, 27 Jan 2010 00:41:32 -0600
changeset 38100 b89339a2523d8ace64c6ce60fa38fc8eb2067c27
parent 38099 07ed72e5400b3bc715ec6d48e9d39ef14e3fcdb0
child 38101 db49481bae2f924218ec9cb08c0d7bb41d282b9b
push id11617
push usercjones@mozilla.com
push dateFri, 12 Feb 2010 05:46:47 +0000
treeherdermozilla-central@c5ca3076da1b [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersbent
bugs540886
milestone1.9.3a2pre
Bug 540886, part 2: Offer a BlockChild() interface to RPC protocols that allows parents to prevent children from sending messages back of their own volition until the parent calls UnblockChild(). r=bent
ipc/glue/ProtocolUtils.h
ipc/glue/RPCChannel.cpp
ipc/glue/RPCChannel.h
ipc/glue/SyncChannel.h
ipc/ipdl/ipdl/lower.py
--- a/ipc/glue/ProtocolUtils.h
+++ b/ipc/glue/ProtocolUtils.h
@@ -47,16 +47,18 @@
 #include "prenv.h"
 
 #include "mozilla/ipc/Shmem.h"
 
 // WARNING: this takes into account the private, special-message-type
 // enum in ipc_channel.h.  They need to be kept in sync.
 namespace {
 enum {
+    UNBLOCK_CHILD_MESSAGE_TYPE = kuint16max - 4,
+    BLOCK_CHILD_MESSAGE_TYPE   = kuint16max - 3,
     SHMEM_CREATED_MESSAGE_TYPE = kuint16max - 2,
     GOODBYE_MESSAGE_TYPE       = kuint16max - 1,
 };
 }
 
 namespace mozilla {
 namespace ipc {
 
--- a/ipc/glue/RPCChannel.cpp
+++ b/ipc/glue/RPCChannel.cpp
@@ -33,16 +33,17 @@
  * decision by deleting the provisions above and replace them with the notice
  * and other provisions required by the GPL or the LGPL. If you do not delete
  * the provisions above, a recipient may use your version of this file under
  * the terms of any one of the MPL, the GPL or the LGPL.
  *
  * ***** END LICENSE BLOCK ***** */
 
 #include "mozilla/ipc/RPCChannel.h"
+#include "mozilla/ipc/ProtocolUtils.h"
 
 #include "nsDebug.h"
 #include "nsTraceRefcnt.h"
 
 #define RPC_ASSERT(_cond, ...)                                      \
     do {                                                            \
         if (!(_cond))                                               \
             DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__);  \
@@ -53,28 +54,56 @@ using mozilla::MutexAutoUnlock;
 
 template<>
 struct RunnableMethodTraits<mozilla::ipc::RPCChannel>
 {
     static void RetainCallee(mozilla::ipc::RPCChannel* obj) { }
     static void ReleaseCallee(mozilla::ipc::RPCChannel* obj) { }
 };
 
+
+namespace
+{
+
+// Async (from the sending side's perspective)
+class BlockChildMessage : public IPC::Message
+{
+public:
+    enum { ID = BLOCK_CHILD_MESSAGE_TYPE };
+    BlockChildMessage() :
+        Message(MSG_ROUTING_NONE, ID, IPC::Message::PRIORITY_NORMAL)
+    { }
+};
+
+// Async
+class UnblockChildMessage : public IPC::Message
+{
+public:
+    enum { ID = UNBLOCK_CHILD_MESSAGE_TYPE };
+    UnblockChildMessage() :
+        Message(MSG_ROUTING_NONE, ID, IPC::Message::PRIORITY_NORMAL)
+    { }
+};
+
+} // namespace <anon>
+
+
 namespace mozilla {
 namespace ipc {
 
 RPCChannel::RPCChannel(RPCListener* aListener,
                        RacyRPCPolicy aPolicy)
   : SyncChannel(aListener),
     mPending(),
     mStack(),
     mOutOfTurnReplies(),
     mDeferred(),
     mRemoteStackDepthGuess(0),
-    mRacePolicy(aPolicy)
+    mRacePolicy(aPolicy),
+    mBlockedOnParent(false)
 {
     MOZ_COUNT_CTOR(RPCChannel);
 }
 
 RPCChannel::~RPCChannel()
 {
     MOZ_COUNT_DTOR(RPCChannel);
     // FIXME/cjones: impl
@@ -382,16 +411,121 @@ RPCChannel::DispatchIncall(const Message
 
     reply->set_seqno(call.seqno());
 
     mIOLoop->PostTask(
         FROM_HERE,
         NewRunnableMethod(this, &RPCChannel::OnSend, reply));
 }
 
+bool
+RPCChannel::BlockChild()
+{
+    AssertWorkerThread();
+
+    if (mChild)
+        NS_RUNTIMEABORT("child tried to block parent");
+    SendSpecialMessage(new BlockChildMessage());
+    return true;
+}
+
+bool
+RPCChannel::UnblockChild()
+{
+    AssertWorkerThread();
+
+    if (mChild)
+        NS_RUNTIMEABORT("child tried to unblock parent");
+    SendSpecialMessage(new UnblockChildMessage());
+    return true;
+}
+
+bool
+RPCChannel::OnSpecialMessage(uint16 id, const Message& msg)
+{
+    AssertWorkerThread();
+
+    switch (id) {
+    case BLOCK_CHILD_MESSAGE_TYPE:
+        BlockOnParent();
+        return true;
+
+    case UNBLOCK_CHILD_MESSAGE_TYPE:
+        UnblockFromParent();
+        return true;
+
+    default:
+        return SyncChannel::OnSpecialMessage(id, msg);
+    }
+}
+
+void
+RPCChannel::BlockOnParent()
+{
+    AssertWorkerThread();
+
+    if (!mChild)
+        NS_RUNTIMEABORT("child tried to block parent");
+
+    MutexAutoLock lock(mMutex);
+
+    if (mBlockedOnParent || AwaitingSyncReply() || 0 < StackDepth())
+        NS_RUNTIMEABORT("attempt to block child when it's already blocked");
+
+    mBlockedOnParent = true;
+    while (1) {
+        // XXX this dispatch loop shares some similarities with the
+        // one in Call(), but the logic is simpler and different
+        // enough IMHO to warrant its own dispatch loop
+        while (Connected() && mPending.empty() && mBlockedOnParent) {
+            WaitForNotify();
+        }
+
+        if (!Connected()) {
+            mBlockedOnParent = false;
+            ReportConnectionError("RPCChannel");
+            break;
+        }
+
+        if (!mPending.empty()) {
+            Message recvd = mPending.front();
+            mPending.pop();
+
+            MutexAutoUnlock unlock(mMutex);
+            if (recvd.is_rpc()) {
+                // stack depth must be 0 here
+                Incall(recvd, 0);
+            }
+            else if (recvd.is_sync()) {
+                SyncChannel::OnDispatchMessage(recvd);
+            }
+            else {
+                AsyncChannel::OnDispatchMessage(recvd);
+            }
+        }
+        
+        // the last message, if async, may have been the one that
+        // unblocks us
+        if (!mBlockedOnParent)
+            break;
+    }
+
+    EnqueuePendingMessages();
+}
+
+void
+RPCChannel::UnblockFromParent()
+{
+    AssertWorkerThread();
+
+    if (!mChild)
+        NS_RUNTIMEABORT("child tried to block parent");
+    MutexAutoLock lock(mMutex);
+    mBlockedOnParent = false;
+}
 
 void
 RPCChannel::DebugAbort(const char* file, int line, const char* cond,
                        const char* why,
                        const char* type, bool reply)
 {
     fprintf(stderr,
             "###!!! [RPCChannel][%s][%s:%d] "
@@ -439,17 +573,17 @@ RPCChannel::OnMessageReceived(const Mess
         // wake up worker thread waiting at SyncChannel::Send
         mRecvd = msg;
         NotifyWorkerThread();
         return;
     }
 
     mPending.push(msg);
 
-    if (0 == StackDepth())
+    if (0 == StackDepth() && !mBlockedOnParent)
         // the worker thread might be idle, make sure it wakes up
         mWorkerLoop->PostTask(
             FROM_HERE,
             NewRunnableMethod(this, &RPCChannel::OnMaybeDequeueOne));
     else
         NotifyWorkerThread();
 }
 
--- a/ipc/glue/RPCChannel.h
+++ b/ipc/glue/RPCChannel.h
@@ -76,31 +76,62 @@ public:
 
     RPCChannel(RPCListener* aListener, RacyRPCPolicy aPolicy=RRPChildWins);
 
     virtual ~RPCChannel();
 
     // Make an RPC to the other side of the channel
     bool Call(Message* msg, Message* reply);
 
+    // Asynchronously, send the child a message that puts it in such a
+    // state that it can't send messages to the parent unless the
+    // parent sends a message to it first.  The child stays in this
+    // state until the parent calls |UnblockChild()|.
+    //
+    // It is an error to
+    //  - call this on the child side of the channel.
+    //  - nest |BlockChild()| calls
+    //  - call this when the child is already blocked on a sync or RPC
+    //    in-/out- message/call
+    //
+    // Return true iff successful.
+    bool BlockChild();
+
+    // Asynchronously undo |BlockChild()|.
+    //
+    // It is an error to
+    //  - call this on the child side of the channel
+    //  - call this without a matching |BlockChild()|
+    //
+    // Return true iff successful.
+    bool UnblockChild();
+
+    NS_OVERRIDE
+    virtual bool OnSpecialMessage(uint16 id, const Message& msg);
+
     // Override the SyncChannel handler so we can dispatch RPC
     // messages.  Called on the IO thread only.
-    NS_OVERRIDE virtual void OnMessageReceived(const Message& msg);
-    NS_OVERRIDE virtual void OnChannelError();
+    NS_OVERRIDE
+    virtual void OnMessageReceived(const Message& msg);
+    NS_OVERRIDE
+    virtual void OnChannelError();
 
 private:
     // Called on worker thread only
 
     void MaybeProcessDeferredIncall();
     void EnqueuePendingMessages();
 
     void OnMaybeDequeueOne();
     void Incall(const Message& call, size_t stackDepth);
     void DispatchIncall(const Message& call);
 
+    void BlockOnParent();
+    void UnblockFromParent();
+
     // Called from both threads
     size_t StackDepth() {
         mMutex.AssertCurrentThreadOwns();
         return mStack.size();
     }
 
     void DebugAbort(const char* file, int line, const char* cond,
                     const char* why,
@@ -192,14 +223,17 @@ private:
     // |mPending|.
     //
     // One nice aspect of this race detection is that it is symmetric;
     // if one side detects a race, then the other side must also 
     // detect the same race.
     //
     size_t mRemoteStackDepthGuess;
     RacyRPCPolicy mRacePolicy;
+
+    // True iff the parent has put us in a |BlockChild()| state.
+    bool mBlockedOnParent;
 };
 
 
 } // namespace ipc
 } // namespace mozilla
 #endif  // ifndef ipc_glue_RPCChannel_h
--- a/ipc/glue/SyncChannel.h
+++ b/ipc/glue/SyncChannel.h
@@ -88,16 +88,24 @@ public:
 
 protected:
     // Executed on the worker thread
     bool ProcessingSyncMessage() {
         return mProcessingSyncMessage;
     }
 
     void OnDispatchMessage(const Message& aMsg);
+
+    NS_OVERRIDE
+    bool OnSpecialMessage(uint16 id, const Message& msg)
+    {
+        // SyncChannel doesn't care about any special messages yet
+        return AsyncChannel::OnSpecialMessage(id, msg);
+    }
+
     void WaitForNotify();
 
     // Executed on the IO thread.
     void OnSendReply(Message* msg);
     void NotifyWorkerThread();
 
     // On both
     bool AwaitingSyncReply() {
--- a/ipc/ipdl/ipdl/lower.py
+++ b/ipc/ipdl/ipdl/lower.py
@@ -2612,27 +2612,26 @@ class _GenerateProtocolActorCode(ipdl.as
                 virtual=1, pure=1)))
 
             self.cls.addstmt(StmtDecl(MethodDecl(
                 _deallocMethod(managed).name,
                 params=[ Decl(actortype, 'actor') ],
                 ret=Type.BOOL,
                 virtual=1, pure=1)))
 
-        # optional Shutdown() method; default is no-op
+        # optional ActorDestroy() method; default is no-op
         self.cls.addstmts([
             Whitespace.NL,
             MethodDefn(MethodDecl(
                 _destroyMethod().name,
                 params=[ Decl(_DestroyReason.Type(), 'why') ],
-                virtual=1))
+                virtual=1)),
+            Whitespace.NL
         ])
 
-        self.cls.addstmt(Whitespace.NL)
-
         self.cls.addstmts((
             [ Label.PRIVATE ]
             + self.standardTypedefs()
             + [ Whitespace.NL ]
         ))
 
         self.cls.addstmt(Label.PUBLIC)
         # Actor()
@@ -2874,16 +2873,32 @@ class _GenerateProtocolActorCode(ipdl.as
                     args=[ ExprCall(otherpidvar), dumpvar ])),
                 CppDirective('else'),
                 StmtReturn(ExprLiteral.FALSE),
                 CppDirective('endif')
             ])
             self.cls.addstmts([ otherpid, Whitespace.NL,
                                 getdump, Whitespace.NL ])
 
+        if (p.decl.type.isToplevel() and self.side is 'parent'
+            and p.decl.type.talksRpc()):
+            # offer BlockChild() and UnblockChild().
+            # See ipc/glue/RPCChannel.h
+            blockchild = MethodDefn(MethodDecl(
+                'BlockChild', ret=Type.BOOL))
+            blockchild.addstmt(StmtReturn(ExprCall(
+                ExprSelect(p.channelVar(), '.', 'BlockChild'))))
+
+            unblockchild = MethodDefn(MethodDecl(
+                'UnblockChild', ret=Type.BOOL))
+            unblockchild.addstmt(StmtReturn(ExprCall(
+                ExprSelect(p.channelVar(), '.', 'UnblockChild'))))
+
+            self.cls.addstmts([ blockchild, unblockchild, Whitespace.NL ])
+
         ## private methods
         self.cls.addstmt(Label.PRIVATE)
 
         ## FatalError()       
         msgvar = ExprVar('msg')
         fatalerror = MethodDefn(MethodDecl(
             'FatalError',
             params=[ Decl(Type('char', const=1, ptrconst=1), msgvar.name) ],