Bug 544518: Send Messages directly through the Transport on the IO thread rather than through a no-added-value AsyncChannel indirection. r=bent a=LegNeato
authorChris Jones <jones.chris.g@gmail.com>
Thu, 22 Apr 2010 18:53:30 -0500
changeset 34110 ca6df0574ed6908d840791225848291ea910f63d
parent 34109 92e3606b6700126c6e05048f14ea34c7f5bf0664
child 34111 2e12fc0d07f4e7618094297bcb60a195dc095fd1
push id1278
push usercjones@mozilla.com
push dateFri, 23 Apr 2010 18:23:11 +0000
reviewersbent, LegNeato
bugs544518
milestone1.9.2.5pre
Bug 544518: Send Messages directly through the Transport on the IO thread rather than through a no-added-value AsyncChannel indirection. r=bent a=LegNeato
ipc/glue/AsyncChannel.cpp
ipc/glue/AsyncChannel.h
ipc/glue/RPCChannel.cpp
ipc/glue/SyncChannel.cpp
ipc/glue/SyncChannel.h
--- a/ipc/glue/AsyncChannel.cpp
+++ b/ipc/glue/AsyncChannel.cpp
@@ -49,16 +49,35 @@ using mozilla::MutexAutoLock;
 
 template<>
 struct RunnableMethodTraits<mozilla::ipc::AsyncChannel>
 {
     static void RetainCallee(mozilla::ipc::AsyncChannel* obj) { }
     static void ReleaseCallee(mozilla::ipc::AsyncChannel* obj) { }
 };
 
+// We rely on invariants about the lifetime of the transport:
+//
+//  - outlives this AsyncChannel
+//  - deleted on the IO thread
+//
+// These invariants allow us to send messages directly through the
+// transport without having to worry about orphaned Send() tasks on
+// the IO thread touching AsyncChannel memory after it's been deleted
+// on the worker thread.  We also don't need to refcount the
+// Transport, because whatever task triggers its deletion only runs on
+// the IO thread, and only runs after this AsyncChannel is done with
+// the Transport.
+template<>
+struct RunnableMethodTraits<mozilla::ipc::AsyncChannel::Transport>
+{
+    static void RetainCallee(mozilla::ipc::AsyncChannel::Transport* obj) { }
+    static void ReleaseCallee(mozilla::ipc::AsyncChannel::Transport* obj) { }
+};
+
 namespace {
 
 // This is an async message
 class GoodbyeMessage : public IPC::Message
 {
 public:
     enum { ID = GOODBYE_MESSAGE_TYPE };
     GoodbyeMessage() :
@@ -212,18 +231,17 @@ AsyncChannel::Send(Message* msg)
     {
         MutexAutoLock lock(mMutex);
 
         if (!Connected()) {
             ReportConnectionError("AsyncChannel");
             return false;
         }
 
-        mIOLoop->PostTask(FROM_HERE,
-                          NewRunnableMethod(this, &AsyncChannel::OnSend, msg));
+        SendThroughTransport(msg);
     }
 
     return true;
 }
 
 void
 AsyncChannel::OnDispatchMessage(const Message& msg)
 {
@@ -249,20 +267,27 @@ AsyncChannel::OnSpecialMessage(uint16 id
 {
     return false;
 }
 
 void
 AsyncChannel::SendSpecialMessage(Message* msg)
 {
     AssertWorkerThread();
+    SendThroughTransport(msg);
+}
+
+void
+AsyncChannel::SendThroughTransport(Message* msg)
+{
+    AssertWorkerThread();
 
     mIOLoop->PostTask(
         FROM_HERE,
-        NewRunnableMethod(this, &AsyncChannel::OnSend, msg));
+        NewRunnableMethod(mTransport, &Transport::Send, msg));
 }
 
 void
 AsyncChannel::OnNotifyMaybeChannelError()
 {
     AssertWorkerThread();
     mMutex.AssertNotCurrentThreadOwns();
 
@@ -460,24 +485,16 @@ AsyncChannel::PostErrorNotifyTask()
 
     // This must be the last code that runs on this thread!
     mChannelErrorTask =
         NewRunnableMethod(this, &AsyncChannel::OnNotifyMaybeChannelError);
     mWorkerLoop->PostTask(FROM_HERE, mChannelErrorTask);
 }
 
 void
-AsyncChannel::OnSend(Message* aMsg)
-{
-    AssertIOThread();
-    mTransport->Send(aMsg);
-    // mTransport assumes ownership of aMsg
-}
-
-void
 AsyncChannel::OnCloseChannel()
 {
     AssertIOThread();
 
     mTransport->Close();
 
     MutexAutoLock lock(mMutex);
     mChannelState = ChannelClosed;
--- a/ipc/glue/AsyncChannel.h
+++ b/ipc/glue/AsyncChannel.h
@@ -156,29 +156,30 @@ protected:
     void PrintErrorMessage(const char* channelName, const char* msg)
     {
         fprintf(stderr, "\n###!!! [%s][%s] Error: %s\n\n",
                 mChild ? "Child" : "Parent", channelName, msg);
     }
 
     // Run on the worker thread
 
+    void SendThroughTransport(Message* msg);
+
     void OnNotifyMaybeChannelError();
     virtual bool ShouldDeferNotifyMaybeError() {
         return false;
     }
     void NotifyChannelClosed();
     void NotifyMaybeChannelError();
 
     virtual void Clear();
 
     // Run on the IO thread
 
     void OnChannelOpened();
-    void OnSend(Message* aMsg);
     void OnCloseChannel();
     void PostErrorNotifyTask();
 
     // Return true if |msg| is a special message targeted at the IO
     // thread, in which case it shouldn't be delivered to the worker.
     bool MaybeInterceptSpecialIOMessage(const Message& msg);
     void ProcessGoodbyeMessage();
 
--- a/ipc/glue/RPCChannel.cpp
+++ b/ipc/glue/RPCChannel.cpp
@@ -175,19 +175,17 @@ RPCChannel::Call(Message* msg, Message* 
         return false;
     }
 
     msg->set_seqno(NextSeqno());
     msg->set_rpc_remote_stack_depth_guess(mRemoteStackDepthGuess);
     msg->set_rpc_local_stack_depth(1 + StackDepth());
     mStack.push(*msg);
 
-    mIOLoop->PostTask(
-        FROM_HERE,
-        NewRunnableMethod(this, &RPCChannel::OnSend, msg));
+    SendThroughTransport(msg);
 
     while (1) {
         // if a handler invoked by *Dispatch*() spun a nested event
         // loop, and the connection was broken during that loop, we
         // might have already processed the OnError event. if so,
         // trying another loop iteration will be futile because
         // channel state will have been cleared
         if (!Connected()) {
@@ -493,19 +491,17 @@ RPCChannel::DispatchIncall(const Message
         reply->set_reply_error();
     }
 
     reply->set_seqno(call.seqno());
 
     {
         MutexAutoLock lock(mMutex);
         if (ChannelConnected == mChannelState)
-            mIOLoop->PostTask(
-                FROM_HERE,
-                NewRunnableMethod(this, &RPCChannel::OnSend, reply));
+            SendThroughTransport(reply);
     }
 }
 
 bool
 RPCChannel::BlockChild()
 {
     AssertWorkerThread();
 
--- a/ipc/glue/SyncChannel.cpp
+++ b/ipc/glue/SyncChannel.cpp
@@ -106,19 +106,17 @@ SyncChannel::Send(Message* msg, Message*
 
     if (!Connected()) {
         ReportConnectionError("SyncChannel");
         return false;
     }
 
     mPendingReply = msg->type() + 1;
     int32 msgSeqno = msg->seqno();
-    mIOLoop->PostTask(
-        FROM_HERE,
-        NewRunnableMethod(this, &SyncChannel::OnSend, msg));
+    SendThroughTransport(msg);
 
     while (1) {
         bool maybeTimedOut = !SyncChannel::WaitForNotify();
 
         if (EventOccurred())
             break;
 
         if (maybeTimedOut && !ShouldContinueFromTimeout())
@@ -173,19 +171,17 @@ SyncChannel::OnDispatchMessage(const Mes
         reply->set_reply_error();
     }
 
     reply->set_seqno(msg.seqno());
 
     {
         MutexAutoLock lock(mMutex);
         if (ChannelConnected == mChannelState)
-            mIOLoop->PostTask(
-                FROM_HERE,
-                NewRunnableMethod(this, &SyncChannel::OnSend, reply));
+            SendThroughTransport(reply);
     }
 }
 
 //
 // The methods below run in the context of the IO thread, and can proxy
 // back to the methods above
 //
 
--- a/ipc/glue/SyncChannel.h
+++ b/ipc/glue/SyncChannel.h
@@ -124,17 +124,16 @@ protected:
     // So in sum: true is a meaningful return value; false isn't,
     // necessarily.
     //
     bool WaitForNotify();
 
     bool ShouldContinueFromTimeout();
 
     // Executed on the IO thread.
-    void OnSendReply(Message* msg);
     void NotifyWorkerThread();
 
     // On both
     bool AwaitingSyncReply() {
         mMutex.AssertCurrentThreadOwns();
         return mPendingReply != 0;
     }