Bug 745148, part 2: GeckoChildProcessHost can't drop messages on the floor. Queue them and hand them off to the *Channel. r=bent
authorChris Jones <jones.chris.g@gmail.com>
Sat, 14 Jul 2012 14:21:32 -0700
changeset 101129 d3f19e4f90e77456d688778b3a1109a4bca60ce9
parent 101128 453481dbb8282834f25d062cb588d2262ac9e900
child 101130 5c293947bf9ce76b1acbce96627f49f6d771a5b4
push id18
push usershu@rfrn.org
push dateMon, 06 Aug 2012 22:42:45 +0000
reviewersbent
bugs745148
milestone16.0a1
Bug 745148, part 2: GeckoChildProcessHost can't drop messages on the floor. Queue them and hand them off to the *Channel. r=bent
ipc/chromium/src/chrome/common/child_process_host.cc
ipc/chromium/src/chrome/common/child_process_host.h
ipc/chromium/src/chrome/common/ipc_channel.h
ipc/glue/AsyncChannel.cpp
ipc/glue/AsyncChannel.h
ipc/glue/GeckoChildProcessHost.cpp
ipc/glue/GeckoChildProcessHost.h
--- a/ipc/chromium/src/chrome/common/child_process_host.cc
+++ b/ipc/chromium/src/chrome/common/child_process_host.cc
@@ -175,16 +175,19 @@ void ChildProcessHost::ListenerHook::OnC
   host_->Notify(NotificationType::CHILD_PROCESS_HOST_CONNECTED);
 }
 
 void ChildProcessHost::ListenerHook::OnChannelError() {
   host_->opening_channel_ = false;
   host_->OnChannelError();
 }
 
+void ChildProcessHost::ListenerHook::GetQueuedMessages(std::queue<IPC::Message>& queue) {
+  host_->GetQueuedMessages(queue);
+}
 
 ChildProcessHost::Iterator::Iterator() : all_(true) {
   iterator_ = Singleton<ChildProcessList>::get()->begin();
 }
 
 ChildProcessHost::Iterator::Iterator(ProcessType type)
     : all_(false), type_(type) {
   iterator_ = Singleton<ChildProcessList>::get()->begin();
--- a/ipc/chromium/src/chrome/common/child_process_host.h
+++ b/ipc/chromium/src/chrome/common/child_process_host.h
@@ -94,16 +94,17 @@ class ChildProcessHost :
   // OnMessageReceived/OnChannelConnected and do our own processing before
   // calling the subclass' implementation.
   class ListenerHook : public IPC::Channel::Listener {
    public:
     ListenerHook(ChildProcessHost* host);
     virtual void OnMessageReceived(const IPC::Message& msg);
     virtual void OnChannelConnected(int32 peer_pid);
     virtual void OnChannelError();
+    virtual void GetQueuedMessages(std::queue<IPC::Message>& queue);
    private:
     ChildProcessHost* host_;
   };
 
   ListenerHook listener_;
 
   ResourceDispatcherHost* resource_dispatcher_host_;
 
--- a/ipc/chromium/src/chrome/common/ipc_channel.h
+++ b/ipc/chromium/src/chrome/common/ipc_channel.h
@@ -1,15 +1,16 @@
 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
 #ifndef CHROME_COMMON_IPC_CHANNEL_H_
 #define CHROME_COMMON_IPC_CHANNEL_H_
 
+#include <queue>
 #include "chrome/common/ipc_message.h"
 
 namespace IPC {
 
 //------------------------------------------------------------------------------
 
 class Channel : public Message::Sender {
   // Security tests need access to the pipe handle.
@@ -26,16 +27,20 @@ class Channel : public Message::Sender {
 
     // Called when the channel is connected and we have received the internal
     // Hello message from the peer.
     virtual void OnChannelConnected(int32 peer_pid) {}
 
     // Called when an error is detected that causes the channel to close.
     // This method is not called when a channel is closed normally.
     virtual void OnChannelError() {}
+
+    // If the listener has queued messages, swap them for |queue| like so
+    //   swap(impl->my_queued_messages, queue);
+    virtual void GetQueuedMessages(std::queue<Message>& queue) {}
   };
 
   enum Mode {
     MODE_SERVER,
     MODE_CLIENT
   };
 
   enum {
--- a/ipc/glue/AsyncChannel.cpp
+++ b/ipc/glue/AsyncChannel.cpp
@@ -8,17 +8,18 @@
 #include "mozilla/ipc/AsyncChannel.h"
 #include "mozilla/ipc/BrowserProcessSubThread.h"
 #include "mozilla/ipc/ProtocolUtils.h"
 
 #include "nsDebug.h"
 #include "nsTraceRefcnt.h"
 #include "nsXULAppAPI.h"
 
-using mozilla::MonitorAutoLock;
+using namespace mozilla;
+using namespace std;
 
 template<>
 struct RunnableMethodTraits<mozilla::ipc::AsyncChannel>
 {
     static void RetainCallee(mozilla::ipc::AsyncChannel* obj) { }
     static void ReleaseCallee(mozilla::ipc::AsyncChannel* obj) { }
 };
 
@@ -110,48 +111,57 @@ AsyncChannel::ProcessLink::Open(mozilla:
                                 MessageLoop *aIOLoop,
                                 Side aSide)
 {
     NS_PRECONDITION(aTransport, "need transport layer");
 
     // FIXME need to check for valid channel
 
     mTransport = aTransport;
-    mExistingListener = mTransport->set_listener(this);
 
     // FIXME figure out whether we're in parent or child, grab IO loop
     // appropriately
     bool needOpen = true;
     if(aIOLoop) {
         // We're a child or using the new arguments.  Either way, we
         // need an open.
         needOpen = true;
         mChan->mChild = (aSide == AsyncChannel::Unknown) || (aSide == AsyncChannel::Child);
     } else {
         NS_PRECONDITION(aSide == Unknown, "expected default side arg");
 
         // parent
         mChan->mChild = false;
         needOpen = false;
         aIOLoop = XRE_GetIOMessageLoop();
-        // FIXME assuming that the parent waits for the OnConnected event.
-        // FIXME see GeckoChildProcessHost.cpp.  bad assumption!
-        mChan->mChannelState = ChannelConnected;
     }
 
     mIOLoop = aIOLoop;
 
     NS_ASSERTION(mIOLoop, "need an IO loop");
     NS_ASSERTION(mChan->mWorkerLoop, "need a worker loop");
 
-    if (needOpen) {             // child process
+    {
         MonitorAutoLock lock(*mChan->mMonitor);
 
-        mIOLoop->PostTask(FROM_HERE, 
-                          NewRunnableMethod(this, &ProcessLink::OnChannelOpened));
+        if (needOpen) {
+            // Transport::Connect() has not been called.  Call it so
+            // we start polling our pipe and processing outgoing
+            // messages.
+            mIOLoop->PostTask(
+                FROM_HERE,
+                NewRunnableMethod(this, &ProcessLink::OnChannelOpened));
+        } else {
+            // Transport::Connect() has already been called.  Take
+            // over the channel from the previous listener and process
+            // any queued messages.
+            mIOLoop->PostTask(
+                FROM_HERE,
+                NewRunnableMethod(this, &ProcessLink::OnTakeConnectedChannel));
+        }
 
         // FIXME/cjones: handle errors
         while (!mChan->Connected()) {
             mChan->mMonitor->Wait();
         }
     }
 }
 
@@ -665,23 +675,58 @@ AsyncChannel::ProcessLink::OnEchoMessage
 }
 
 void
 AsyncChannel::ProcessLink::OnChannelOpened()
 {
     mChan->AssertLinkThread();
     {
         MonitorAutoLock lock(*mChan->mMonitor);
+
+        mExistingListener = mTransport->set_listener(this);
+#ifdef DEBUG
+        if (mExistingListener) {
+            queue<Message> pending;
+            mExistingListener->GetQueuedMessages(pending);
+            MOZ_ASSERT(pending.empty());
+        }
+#endif  // DEBUG
+
         mChan->mChannelState = ChannelOpening;
         lock.Notify();
     }
     /*assert*/mTransport->Connect();
 }
 
 void
+AsyncChannel::ProcessLink::OnTakeConnectedChannel()
+{
+    AssertIOThread();
+
+    queue<Message> pending;
+    {
+        MonitorAutoLock lock(*mChan->mMonitor);
+
+        mChan->mChannelState = ChannelConnected;
+
+        mExistingListener = mTransport->set_listener(this);
+        if (mExistingListener) {
+            mExistingListener->GetQueuedMessages(pending);
+        }
+        lock.Notify();
+    }
+
+    // Dispatch whatever messages the previous listener had queued up.
+    while (!pending.empty()) {
+        OnMessageReceived(pending.front());
+        pending.pop();
+    }
+}
+
+void
 AsyncChannel::ProcessLink::OnChannelConnected(int32 peer_pid)
 {
     AssertIOThread();
 
     {
         MonitorAutoLock lock(*mChan->mMonitor);
         mChan->mChannelState = ChannelConnected;
         mChan->mMonitor->Notify();
--- a/ipc/glue/AsyncChannel.h
+++ b/ipc/glue/AsyncChannel.h
@@ -145,16 +145,17 @@ public:
     class ProcessLink : public Link, public Transport::Listener {
     protected:
         Transport* mTransport;
         MessageLoop* mIOLoop;       // thread where IO happens
         Transport::Listener* mExistingListener; // channel's previous listener
     
         void OnCloseChannel();
         void OnChannelOpened();
+        void OnTakeConnectedChannel();
         void OnEchoMessage(Message* msg);
 
         void AssertIOThread() const
         {
             NS_ABORT_IF_FALSE(mIOLoop == MessageLoop::current(),
                               "not on I/O thread!");
         }
 
--- a/ipc/glue/GeckoChildProcessHost.cpp
+++ b/ipc/glue/GeckoChildProcessHost.cpp
@@ -688,27 +688,37 @@ GeckoChildProcessHost::OnChannelConnecte
   mLaunched = true;
 
   if (!base::OpenPrivilegedProcessHandle(peer_pid, &mChildProcessHandle))
       NS_RUNTIMEABORT("can't open handle to child process");
 
   lock.Notify();
 }
 
-// XXX/cjones: these next two methods should basically never be called.
-// after the process is launched, its channel will be used to create
-// one of our channels, AsyncChannel et al.
 void
 GeckoChildProcessHost::OnMessageReceived(const IPC::Message& aMsg)
 {
+  // We never process messages ourself, just save them up for the next
+  // listener.
+  mQueue.push(aMsg);
 }
+
 void
 GeckoChildProcessHost::OnChannelError()
 {
-  // XXXbent Notify that the child process is gone?
+  // FIXME/bug 773925: save up this error for the next listener.
+}
+
+void
+GeckoChildProcessHost::GetQueuedMessages(std::queue<IPC::Message>& queue)
+{
+  // If this is called off the IO thread, bad things will happen.
+  DCHECK(MessageLoopForIO::current());
+  swap(queue, mQueue);
+  // We expect the next listener to take over processing of our queue.
 }
 
 void
 GeckoChildProcessHost::OnWaitableEventSignaled(base::WaitableEvent *event)
 {
   if (mDelegate) {
     mDelegate->OnWaitableEventSignaled(event);
   }
--- a/ipc/glue/GeckoChildProcessHost.h
+++ b/ipc/glue/GeckoChildProcessHost.h
@@ -41,16 +41,17 @@ public:
                   base::ProcessArchitecture arch=base::GetCurrentProcessArchitecture());
   bool AsyncLaunch(std::vector<std::string> aExtraOpts=std::vector<std::string>());
   bool PerformAsyncLaunch(std::vector<std::string> aExtraOpts=std::vector<std::string>(),
                           base::ProcessArchitecture arch=base::GetCurrentProcessArchitecture());
 
   virtual void OnChannelConnected(int32 peer_pid);
   virtual void OnMessageReceived(const IPC::Message& aMsg);
   virtual void OnChannelError();
+  virtual void GetQueuedMessages(std::queue<IPC::Message>& queue);
 
   void InitializeChannel();
 
   virtual bool CanShutdown() { return true; }
 
   virtual void OnWaitableEventSignaled(base::WaitableEvent *event);
 
   IPC::Channel* GetChannel() {
@@ -98,14 +99,23 @@ protected:
 #endif
 
 private:
   DISALLOW_EVIL_CONSTRUCTORS(GeckoChildProcessHost);
 
   // Does the actual work for AsyncLaunch, on the IO thread.
   bool PerformAsyncLaunchInternal(std::vector<std::string>& aExtraOpts,
                                   base::ProcessArchitecture arch);
+
+  // In between launching the subprocess and handing off its IPC
+  // channel, there's a small window of time in which *we* might still
+  // be the channel listener, and receive messages.  That's bad
+  // because we have no idea what to do with those messages.  So queue
+  // them here until we hand off the eventual listener.
+  //
+  // FIXME/cjones: this strongly indicates bad design.  Shame on us.
+  std::queue<IPC::Message> mQueue;
 };
 
 } /* namespace ipc */
 } /* namespace mozilla */
 
 #endif /* __IPC_GLUE_GECKOCHILDPROCESSHOST_H__ */