Bug 544518: Send Messages directly through the Transport on the IO thread rather than through a no-added-value AsyncChannel indirection. r=bent a=LegNeato
--- a/ipc/glue/AsyncChannel.cpp
+++ b/ipc/glue/AsyncChannel.cpp
@@ -49,16 +49,35 @@ using mozilla::MutexAutoLock;
template<>
struct RunnableMethodTraits<mozilla::ipc::AsyncChannel>
{
static void RetainCallee(mozilla::ipc::AsyncChannel* obj) { }
static void ReleaseCallee(mozilla::ipc::AsyncChannel* obj) { }
};
+// We rely on invariants about the lifetime of the transport:
+//
+// - outlives this AsyncChannel
+// - deleted on the IO thread
+//
+// These invariants allow us to send messages directly through the
+// transport without having to worry about orphaned Send() tasks on
+// the IO thread touching AsyncChannel memory after it's been deleted
+// on the worker thread. We also don't need to refcount the
+// Transport, because whatever task triggers its deletion only runs on
+// the IO thread, and only runs after this AsyncChannel is done with
+// the Transport.
+template<>
+struct RunnableMethodTraits<mozilla::ipc::AsyncChannel::Transport>
+{
+ static void RetainCallee(mozilla::ipc::AsyncChannel::Transport* obj) { }
+ static void ReleaseCallee(mozilla::ipc::AsyncChannel::Transport* obj) { }
+};
+
namespace {
// This is an async message
class GoodbyeMessage : public IPC::Message
{
public:
enum { ID = GOODBYE_MESSAGE_TYPE };
GoodbyeMessage() :
@@ -212,18 +231,17 @@ AsyncChannel::Send(Message* msg)
{
MutexAutoLock lock(mMutex);
if (!Connected()) {
ReportConnectionError("AsyncChannel");
return false;
}
- mIOLoop->PostTask(FROM_HERE,
- NewRunnableMethod(this, &AsyncChannel::OnSend, msg));
+ SendThroughTransport(msg);
}
return true;
}
void
AsyncChannel::OnDispatchMessage(const Message& msg)
{
@@ -249,20 +267,27 @@ AsyncChannel::OnSpecialMessage(uint16 id
{
return false;
}
void
AsyncChannel::SendSpecialMessage(Message* msg)
{
AssertWorkerThread();
+ SendThroughTransport(msg);
+}
+
+void
+AsyncChannel::SendThroughTransport(Message* msg)
+{
+ AssertWorkerThread();
mIOLoop->PostTask(
FROM_HERE,
- NewRunnableMethod(this, &AsyncChannel::OnSend, msg));
+ NewRunnableMethod(mTransport, &Transport::Send, msg));
}
void
AsyncChannel::OnNotifyMaybeChannelError()
{
AssertWorkerThread();
mMutex.AssertNotCurrentThreadOwns();
@@ -460,24 +485,16 @@ AsyncChannel::PostErrorNotifyTask()
// This must be the last code that runs on this thread!
mChannelErrorTask =
NewRunnableMethod(this, &AsyncChannel::OnNotifyMaybeChannelError);
mWorkerLoop->PostTask(FROM_HERE, mChannelErrorTask);
}
void
-AsyncChannel::OnSend(Message* aMsg)
-{
- AssertIOThread();
- mTransport->Send(aMsg);
- // mTransport assumes ownership of aMsg
-}
-
-void
AsyncChannel::OnCloseChannel()
{
AssertIOThread();
mTransport->Close();
MutexAutoLock lock(mMutex);
mChannelState = ChannelClosed;
--- a/ipc/glue/AsyncChannel.h
+++ b/ipc/glue/AsyncChannel.h
@@ -156,29 +156,30 @@ protected:
void PrintErrorMessage(const char* channelName, const char* msg)
{
fprintf(stderr, "\n###!!! [%s][%s] Error: %s\n\n",
mChild ? "Child" : "Parent", channelName, msg);
}
// Run on the worker thread
+ void SendThroughTransport(Message* msg);
+
void OnNotifyMaybeChannelError();
virtual bool ShouldDeferNotifyMaybeError() {
return false;
}
void NotifyChannelClosed();
void NotifyMaybeChannelError();
virtual void Clear();
// Run on the IO thread
void OnChannelOpened();
- void OnSend(Message* aMsg);
void OnCloseChannel();
void PostErrorNotifyTask();
// Return true if |msg| is a special message targeted at the IO
// thread, in which case it shouldn't be delivered to the worker.
bool MaybeInterceptSpecialIOMessage(const Message& msg);
void ProcessGoodbyeMessage();
--- a/ipc/glue/RPCChannel.cpp
+++ b/ipc/glue/RPCChannel.cpp
@@ -175,19 +175,17 @@ RPCChannel::Call(Message* msg, Message*
return false;
}
msg->set_seqno(NextSeqno());
msg->set_rpc_remote_stack_depth_guess(mRemoteStackDepthGuess);
msg->set_rpc_local_stack_depth(1 + StackDepth());
mStack.push(*msg);
- mIOLoop->PostTask(
- FROM_HERE,
- NewRunnableMethod(this, &RPCChannel::OnSend, msg));
+ SendThroughTransport(msg);
while (1) {
// if a handler invoked by *Dispatch*() spun a nested event
// loop, and the connection was broken during that loop, we
// might have already processed the OnError event. if so,
// trying another loop iteration will be futile because
// channel state will have been cleared
if (!Connected()) {
@@ -493,19 +491,17 @@ RPCChannel::DispatchIncall(const Message
reply->set_reply_error();
}
reply->set_seqno(call.seqno());
{
MutexAutoLock lock(mMutex);
if (ChannelConnected == mChannelState)
- mIOLoop->PostTask(
- FROM_HERE,
- NewRunnableMethod(this, &RPCChannel::OnSend, reply));
+ SendThroughTransport(reply);
}
}
bool
RPCChannel::BlockChild()
{
AssertWorkerThread();
--- a/ipc/glue/SyncChannel.cpp
+++ b/ipc/glue/SyncChannel.cpp
@@ -106,19 +106,17 @@ SyncChannel::Send(Message* msg, Message*
if (!Connected()) {
ReportConnectionError("SyncChannel");
return false;
}
mPendingReply = msg->type() + 1;
int32 msgSeqno = msg->seqno();
- mIOLoop->PostTask(
- FROM_HERE,
- NewRunnableMethod(this, &SyncChannel::OnSend, msg));
+ SendThroughTransport(msg);
while (1) {
bool maybeTimedOut = !SyncChannel::WaitForNotify();
if (EventOccurred())
break;
if (maybeTimedOut && !ShouldContinueFromTimeout())
@@ -173,19 +171,17 @@ SyncChannel::OnDispatchMessage(const Mes
reply->set_reply_error();
}
reply->set_seqno(msg.seqno());
{
MutexAutoLock lock(mMutex);
if (ChannelConnected == mChannelState)
- mIOLoop->PostTask(
- FROM_HERE,
- NewRunnableMethod(this, &SyncChannel::OnSend, reply));
+ SendThroughTransport(reply);
}
}
//
// The methods below run in the context of the IO thread, and can proxy
// back to the methods above
//
--- a/ipc/glue/SyncChannel.h
+++ b/ipc/glue/SyncChannel.h
@@ -124,17 +124,16 @@ protected:
// So in sum: true is a meaningful return value; false isn't,
// necessarily.
//
bool WaitForNotify();
bool ShouldContinueFromTimeout();
// Executed on the IO thread.
- void OnSendReply(Message* msg);
void NotifyWorkerThread();
// On both
bool AwaitingSyncReply() {
mMutex.AssertCurrentThreadOwns();
return mPendingReply != 0;
}