Backed out 3 changesets (bug 1253123) for build bustage CLOSED TREE
authorWes Kocher <wkocher@mozilla.com>
Wed, 30 Mar 2016 10:20:20 -0700
changeset 328905 444648c7d761f14aa5176fe9bc044f8f39f3d7ca
parent 328904 5dbb8c2b35f4ea34d543a346f04e934e1a85686a
child 328906 39e3c3e2970e8a3f59d99349f54d81493d50b1f8
push id6048
push userkmoir@mozilla.com
push dateMon, 06 Jun 2016 19:02:08 +0000
treeherdermozilla-beta@46d72a56c57d [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
bugs1253123
milestone48.0a1
backs outf0dd577131c74afcd7118f64a138be78c39ec786
d345149b216921e65dfe6c1ed86cc7c61941bb78
6c8278e8047ee59720ecafffe1a3c568e10a22e7
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Backed out 3 changesets (bug 1253123) for build bustage CLOSED TREE Backed out changeset f0dd577131c7 (bug 1253123) Backed out changeset d345149b2169 (bug 1253123) Backed out changeset 6c8278e8047e (bug 1253123) MozReview-Commit-ID: HL9U2qqeDnY
ipc/chromium/moz.build
ipc/chromium/src/chrome/common/child_thread.cc
ipc/chromium/src/chrome/common/child_thread.h
ipc/chromium/src/chrome/common/ipc_channel_proxy.cc
ipc/chromium/src/chrome/common/ipc_channel_proxy.h
ipc/chromium/src/chrome/common/ipc_message_utils.h
ipc/chromium/src/chrome/common/ipc_sync_channel.cc
ipc/chromium/src/chrome/common/ipc_sync_channel.h
ipc/chromium/src/chrome/common/ipc_sync_message.cc
ipc/chromium/src/chrome/common/ipc_sync_message.h
ipc/chromium/src/chrome/common/message_router.cc
ipc/chromium/src/chrome/common/message_router.h
--- a/ipc/chromium/moz.build
+++ b/ipc/chromium/moz.build
@@ -29,17 +29,21 @@ UNIFIED_SOURCES += [
     'src/base/tracked.cc',
     'src/base/tracked_objects.cc',
     'src/chrome/common/child_process.cc',
     'src/chrome/common/child_process_host.cc',
     'src/chrome/common/child_process_info.cc',
     'src/chrome/common/child_thread.cc',
     'src/chrome/common/chrome_switches.cc',
     'src/chrome/common/ipc_channel.cc',
+    'src/chrome/common/ipc_channel_proxy.cc',
     'src/chrome/common/ipc_message.cc',
+    'src/chrome/common/ipc_sync_channel.cc',
+    'src/chrome/common/ipc_sync_message.cc',
+    'src/chrome/common/message_router.cc',
     'src/chrome/common/notification_service.cc',
 ]
 
 if os_win:
     SOURCES += [
         'src/base/condition_variable_win.cc',
         'src/base/cpu.cc',
         'src/base/file_util_win.cc',
--- a/ipc/chromium/src/chrome/common/child_thread.cc
+++ b/ipc/chromium/src/chrome/common/child_thread.cc
@@ -57,19 +57,33 @@ bool ChildThread::Send(IPC::Message* msg
   if (!channel_.get()) {
     delete msg;
     return false;
   }
 
   return channel_->Send(msg);
 }
 
+void ChildThread::AddRoute(int32_t routing_id, IPC::Channel::Listener* listener) {
+  DCHECK(MessageLoop::current() == message_loop());
+
+  router_.AddRoute(routing_id, listener);
+}
+
+void ChildThread::RemoveRoute(int32_t routing_id) {
+  DCHECK(MessageLoop::current() == message_loop());
+
+  router_.RemoveRoute(routing_id);
+}
+
 void ChildThread::OnMessageReceived(const IPC::Message& msg) {
   if (msg.routing_id() == MSG_ROUTING_CONTROL) {
     OnControlMessageReceived(msg);
+  } else {
+    router_.OnMessageReceived(msg);
   }
 }
 
 ChildThread* ChildThread::current() {
   return ChildProcess::current()->child_thread();
 }
 
 void ChildThread::Init() {
--- a/ipc/chromium/src/chrome/common/child_thread.h
+++ b/ipc/chromium/src/chrome/common/child_thread.h
@@ -1,17 +1,18 @@
 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
 #ifndef CHROME_COMMON_CHILD_THREAD_H_
 #define CHROME_COMMON_CHILD_THREAD_H_
 
 #include "base/thread.h"
-#include "chrome/common/ipc_channel.h"
+#include "chrome/common/ipc_sync_channel.h"
+#include "chrome/common/message_router.h"
 #include "mozilla/UniquePtr.h"
 
 class ResourceDispatcher;
 
 // Child processes's background thread should derive from this class.
 class ChildThread : public IPC::Channel::Listener,
                     public IPC::Message::Sender,
                     public base::Thread {
@@ -66,16 +67,20 @@ class ChildThread : public IPC::Channel:
 #endif
 
   // The message loop used to run tasks on the thread that started this thread.
   MessageLoop* owner_loop_;
 
   std::wstring channel_name_;
   mozilla::UniquePtr<IPC::Channel> channel_;
 
+  // Used only on the background render thread to implement message routing
+  // functionality to the consumers of the ChildThread.
+  MessageRouter router_;
+
   Thread::Options options_;
 
   // If true, checks with the browser process before shutdown.  This avoids race
   // conditions if the process refcount is 0 but there's an IPC message inflight
   // that would addref it.
   bool check_with_browser_before_shutdown_;
 
   DISALLOW_EVIL_CONSTRUCTORS(ChildThread);
new file mode 100644
--- /dev/null
+++ b/ipc/chromium/src/chrome/common/ipc_channel_proxy.cc
@@ -0,0 +1,275 @@
+// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/message_loop.h"
+#include "base/thread.h"
+#include "chrome/common/ipc_channel_proxy.h"
+#include "chrome/common/ipc_message_utils.h"
+
+namespace IPC {
+
+//-----------------------------------------------------------------------------
+
+ChannelProxy::Context::Context(Channel::Listener* listener,
+                               MessageFilter* filter,
+                               MessageLoop* ipc_message_loop)
+    : listener_message_loop_(MessageLoop::current()),
+      listener_(listener),
+      ipc_message_loop_(ipc_message_loop),
+      channel_(NULL),
+      peer_pid_(0),
+      channel_connected_called_(false) {
+  if (filter)
+    filters_.push_back(filter);
+}
+
+void ChannelProxy::Context::CreateChannel(const std::wstring& id,
+                                          const Channel::Mode& mode) {
+  DCHECK(channel_ == NULL);
+  channel_id_ = id;
+  channel_ = new Channel(id, mode, this);
+}
+
+bool ChannelProxy::Context::TryFilters(const Message& message) {
+
+  for (size_t i = 0; i < filters_.size(); ++i) {
+    if (filters_[i]->OnMessageReceived(message)) {
+      return true;
+    }
+  }
+  return false;
+}
+
+// Called on the IPC::Channel thread
+void ChannelProxy::Context::OnMessageReceived(const Message& message) {
+  // First give a chance to the filters to process this message.
+  if (!TryFilters(message))
+    OnMessageReceivedNoFilter(message);
+}
+
+// Called on the IPC::Channel thread
+void ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
+  // NOTE: This code relies on the listener's message loop not going away while
+  // this thread is active.  That should be a reasonable assumption, but it
+  // feels risky.  We may want to invent some more indirect way of referring to
+  // a MessageLoop if this becomes a problem.
+  listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
+      this, &Context::OnDispatchMessage, message));
+}
+
+// Called on the IPC::Channel thread
+void ChannelProxy::Context::OnChannelConnected(int32_t peer_pid) {
+  peer_pid_ = peer_pid;
+  for (size_t i = 0; i < filters_.size(); ++i)
+    filters_[i]->OnChannelConnected(peer_pid);
+
+  // See above comment about using listener_message_loop_ here.
+  listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
+      this, &Context::OnDispatchConnected));
+}
+
+// Called on the IPC::Channel thread
+void ChannelProxy::Context::OnChannelError() {
+  for (size_t i = 0; i < filters_.size(); ++i)
+    filters_[i]->OnChannelError();
+
+  // See above comment about using listener_message_loop_ here.
+  listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
+      this, &Context::OnDispatchError));
+}
+
+// Called on the IPC::Channel thread
+void ChannelProxy::Context::OnChannelOpened() {
+  DCHECK(channel_ != NULL);
+
+  // Assume a reference to ourselves on behalf of this thread.  This reference
+  // will be released when we are closed.
+  AddRef();
+
+  if (!channel_->Connect()) {
+    OnChannelError();
+    return;
+  }
+
+  for (size_t i = 0; i < filters_.size(); ++i)
+    filters_[i]->OnFilterAdded(channel_);
+}
+
+// Called on the IPC::Channel thread
+void ChannelProxy::Context::OnChannelClosed() {
+  // It's okay for IPC::ChannelProxy::Close to be called more than once, which
+  // would result in this branch being taken.
+  if (!channel_)
+    return;
+
+  for (size_t i = 0; i < filters_.size(); ++i) {
+    filters_[i]->OnChannelClosing();
+    filters_[i]->OnFilterRemoved();
+  }
+
+  // We don't need the filters anymore.
+  filters_.clear();
+
+  delete channel_;
+  channel_ = NULL;
+
+  // Balance with the reference taken during startup.  This may result in
+  // self-destruction.
+  Release();
+}
+
+// Called on the IPC::Channel thread
+void ChannelProxy::Context::OnSendMessage(Message* message) {
+  if (!channel_->Send(message))
+    OnChannelError();
+}
+
+// Called on the IPC::Channel thread
+void ChannelProxy::Context::OnAddFilter(MessageFilter* filter) {
+  filters_.push_back(filter);
+
+  // If the channel has already been created, then we need to send this message
+  // so that the filter gets access to the Channel.
+  if (channel_)
+    filter->OnFilterAdded(channel_);
+
+  // Balances the AddRef in ChannelProxy::AddFilter.
+  filter->Release();
+}
+
+// Called on the IPC::Channel thread
+void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
+  for (size_t i = 0; i < filters_.size(); ++i) {
+    if (filters_[i].get() == filter) {
+      filter->OnFilterRemoved();
+      filters_.erase(filters_.begin() + i);
+      return;
+    }
+  }
+
+  NOTREACHED() << "filter to be removed not found";
+}
+
+// Called on the listener's thread
+void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
+  if (!listener_)
+    return;
+
+  OnDispatchConnected();
+
+
+  listener_->OnMessageReceived(message);
+
+}
+
+// Called on the listener's thread
+void ChannelProxy::Context::OnDispatchConnected() {
+  if (channel_connected_called_)
+    return;
+
+  channel_connected_called_ = true;
+  if (listener_)
+    listener_->OnChannelConnected(peer_pid_);
+}
+
+// Called on the listener's thread
+void ChannelProxy::Context::OnDispatchError() {
+  if (listener_)
+    listener_->OnChannelError();
+}
+
+//-----------------------------------------------------------------------------
+
+ChannelProxy::ChannelProxy(const std::wstring& channel_id, Channel::Mode mode,
+                           Channel::Listener* listener, MessageFilter* filter,
+                           MessageLoop* ipc_thread)
+    : context_(new Context(listener, filter, ipc_thread)) {
+  Init(channel_id, mode, ipc_thread, true);
+}
+
+ChannelProxy::ChannelProxy(const std::wstring& channel_id, Channel::Mode mode,
+                           MessageLoop* ipc_thread, Context* context,
+                           bool create_pipe_now)
+    : context_(context) {
+  Init(channel_id, mode, ipc_thread, create_pipe_now);
+}
+
+void ChannelProxy::Init(const std::wstring& channel_id, Channel::Mode mode,
+                        MessageLoop* ipc_thread_loop, bool create_pipe_now) {
+  if (create_pipe_now) {
+    // Create the channel immediately.  This effectively sets up the
+    // low-level pipe so that the client can connect.  Without creating
+    // the pipe immediately, it is possible for a listener to attempt
+    // to connect and get an error since the pipe doesn't exist yet.
+    context_->CreateChannel(channel_id, mode);
+  } else {
+#if defined(OS_POSIX)
+    // TODO(playmobil): On POSIX, IPC::Channel uses a socketpair(), one side of
+    // which needs to be mapped into the child process' address space.
+    // To know the value of the client side FD we need to have already
+    // created a socketpair which currently occurs in IPC::Channel's
+    // constructor.
+    // If we lazilly construct the IPC::Channel then the caller has no way
+    // of knowing the FD #.
+    //
+    // We can solve this either by having the Channel's creation launch the
+    // subprocess itself or by creating the socketpair() externally.
+    NOTIMPLEMENTED();
+#endif  // defined(OS_POSIX)
+    context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+        context_.get(), &Context::CreateChannel, channel_id, mode));
+  }
+
+  // complete initialization on the background thread
+  context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+      context_.get(), &Context::OnChannelOpened));
+}
+
+void ChannelProxy::Close() {
+  // Clear the backpointer to the listener so that any pending calls to
+  // Context::OnDispatchMessage or OnDispatchError will be ignored.  It is
+  // possible that the channel could be closed while it is receiving messages!
+  context_->Clear();
+
+  if (context_->ipc_message_loop()) {
+    context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+        context_.get(), &Context::OnChannelClosed));
+  }
+}
+
+bool ChannelProxy::Send(Message* message) {
+
+  context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+      context_.get(), &Context::OnSendMessage, message));
+  return true;
+}
+
+void ChannelProxy::AddFilter(MessageFilter* filter) {
+  // We want to addref the filter to prevent it from
+  // being destroyed before the OnAddFilter call is invoked.
+  filter->AddRef();
+  context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+      context_.get(), &Context::OnAddFilter, filter));
+}
+
+void ChannelProxy::RemoveFilter(MessageFilter* filter) {
+  context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+      context_.get(), &Context::OnRemoveFilter, filter));
+}
+
+#if defined(OS_POSIX)
+// See the TODO regarding lazy initialization of the channel in
+// ChannelProxy::Init().
+// We assume that IPC::Channel::GetClientFileDescriptorMapping() is thread-safe.
+void ChannelProxy::GetClientFileDescriptorMapping(int *src_fd,
+                                                  int *dest_fd) const {
+  Channel *channel = context_.get()->channel_;
+  DCHECK(channel); // Channel must have been created first.
+  channel->GetClientFileDescriptorMapping(src_fd, dest_fd);
+}
+#endif
+
+//-----------------------------------------------------------------------------
+
+}  // namespace IPC
new file mode 100644
--- /dev/null
+++ b/ipc/chromium/src/chrome/common/ipc_channel_proxy.h
@@ -0,0 +1,213 @@
+// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CHROME_COMMON_IPC_CHANNEL_PROXY_H__
+#define CHROME_COMMON_IPC_CHANNEL_PROXY_H__
+
+#include <vector>
+#include "base/lock.h"
+#include "chrome/common/ipc_channel.h"
+#include "nsISupportsImpl.h"
+#include "nsAutoPtr.h"
+
+class MessageLoop;
+
+namespace IPC {
+
+//-----------------------------------------------------------------------------
+// IPC::ChannelProxy
+//
+// This class is a helper class that is useful when you wish to run an IPC
+// channel on a background thread.  It provides you with the option of either
+// handling IPC messages on that background thread or having them dispatched to
+// your main thread (the thread on which the IPC::ChannelProxy is created).
+//
+// The API for an IPC::ChannelProxy is very similar to that of an IPC::Channel.
+// When you send a message to an IPC::ChannelProxy, the message is routed to
+// the background thread, where it is then passed to the IPC::Channel's Send
+// method.  This means that you can send a message from your thread and your
+// message will be sent over the IPC channel when possible instead of being
+// delayed until your thread returns to its message loop.  (Often IPC messages
+// will queue up on the IPC::Channel when there is a lot of traffic, and the
+// channel will not get cycles to flush its message queue until the thread, on
+// which it is running, returns to its message loop.)
+//
+// An IPC::ChannelProxy can have a MessageFilter associated with it, which will
+// be notified of incoming messages on the IPC::Channel's thread.  This gives
+// the consumer of IPC::ChannelProxy the ability to respond to incoming
+// messages on this background thread instead of on their own thread, which may
+// be bogged down with other processing.  The result can be greatly improved
+// latency for messages that can be handled on a background thread.
+//
+// The consumer of IPC::ChannelProxy is responsible for allocating the Thread
+// instance where the IPC::Channel will be created and operated.
+//
+class ChannelProxy : public Message::Sender {
+ public:
+  // A class that receives messages on the thread where the IPC channel is
+  // running.  It can choose to prevent the default action for an IPC message.
+  class MessageFilter {
+   public:
+    NS_INLINE_DECL_THREADSAFE_REFCOUNTING(MessageFilter)
+
+    // Called on the background thread to provide the filter with access to the
+    // channel.  Called when the IPC channel is initialized or when AddFilter
+    // is called if the channel is already initialized.
+    virtual void OnFilterAdded(Channel* channel) {}
+
+    // Called on the background thread when the filter has been removed from
+    // the ChannelProxy and when the Channel is closing.  After a filter is
+    // removed, it will not be called again.
+    virtual void OnFilterRemoved() {}
+
+    // Called to inform the filter that the IPC channel is connected and we
+    // have received the internal Hello message from the peer.
+    virtual void OnChannelConnected(int32_t peer_pid) {}
+
+    // Called when there is an error on the channel, typically that the channel
+    // has been closed.
+    virtual void OnChannelError() {}
+
+    // Called to inform the filter that the IPC channel will be destroyed.
+    // OnFilterRemoved is called immediately after this.
+    virtual void OnChannelClosing() {}
+
+    // Return true to indicate that the message was handled, or false to let
+    // the message be handled in the default way.
+    virtual bool OnMessageReceived(const Message& message) {
+      return false;
+    }
+   protected:
+    virtual ~MessageFilter() {}
+  };
+
+  // Initializes a channel proxy.  The channel_id and mode parameters are
+  // passed directly to the underlying IPC::Channel.  The listener is called on
+  // the thread that creates the ChannelProxy.  The filter's OnMessageReceived
+  // method is called on the thread where the IPC::Channel is running.  The
+  // filter may be null if the consumer is not interested in handling messages
+  // on the background thread.  Any message not handled by the filter will be
+  // dispatched to the listener.  The given message loop indicates where the
+  // IPC::Channel should be created.
+  ChannelProxy(const std::wstring& channel_id, Channel::Mode mode,
+               Channel::Listener* listener, MessageFilter* filter,
+               MessageLoop* ipc_thread_loop);
+
+  ~ChannelProxy() {
+    Close();
+  }
+
+  // Close the IPC::Channel.  This operation completes asynchronously, once the
+  // background thread processes the command to close the channel.  It is ok to
+  // call this method multiple times.  Redundant calls are ignored.
+  //
+  // WARNING: The MessageFilter object held by the ChannelProxy is also
+  // released asynchronously, and it may in fact have its final reference
+  // released on the background thread.  The caller should be careful to deal
+  // with / allow for this possibility.
+  void Close();
+
+  // Send a message asynchronously.  The message is routed to the background
+  // thread where it is passed to the IPC::Channel's Send method.
+  virtual bool Send(Message* message);
+
+  // Used to intercept messages as they are received on the background thread.
+  //
+  // Ordinarily, messages sent to the ChannelProxy are routed to the matching
+  // listener on the worker thread.  This API allows code to intercept messages
+  // before they are sent to the worker thread.
+  void AddFilter(MessageFilter* filter);
+  void RemoveFilter(MessageFilter* filter);
+
+#if defined(OS_POSIX)
+  // Calls through to the underlying channel's methods.
+  // TODO(playmobil): For now this is only implemented in the case of
+  // create_pipe_now = true, we need to figure this out for the latter case.
+  void GetClientFileDescriptorMapping(int *src_fd, int *dest_fd) const;
+#endif  // defined(OS_POSIX)
+
+ protected:
+  class Context;
+  // A subclass uses this constructor if it needs to add more information
+  // to the internal state.  If create_pipe_now is true, the pipe is created
+  // immediately.  Otherwise it's created on the IO thread.
+  ChannelProxy(const std::wstring& channel_id, Channel::Mode mode,
+               MessageLoop* ipc_thread_loop, Context* context,
+               bool create_pipe_now);
+
+  // Used internally to hold state that is referenced on the IPC thread.
+  class Context : public Channel::Listener {
+   public:
+    NS_INLINE_DECL_THREADSAFE_REFCOUNTING(Context)
+    Context(Channel::Listener* listener, MessageFilter* filter,
+            MessageLoop* ipc_thread);
+    MessageLoop* ipc_message_loop() const { return ipc_message_loop_; }
+    const std::wstring& channel_id() const { return channel_id_; }
+
+    // Dispatches a message on the listener thread.
+    void OnDispatchMessage(const Message& message);
+
+   protected:
+    virtual ~Context() {}
+
+    // IPC::Channel::Listener methods:
+    virtual void OnMessageReceived(const Message& message);
+    virtual void OnChannelConnected(int32_t peer_pid);
+    virtual void OnChannelError();
+
+    // Like OnMessageReceived but doesn't try the filters.
+    void OnMessageReceivedNoFilter(const Message& message);
+
+    // Gives the filters a chance at processing |message|.
+    // Returns true if the message was processed, false otherwise.
+    bool TryFilters(const Message& message);
+
+    // Like Open and Close, but called on the IPC thread.
+    virtual void OnChannelOpened();
+    virtual void OnChannelClosed();
+
+    // Called on the consumers thread when the ChannelProxy is closed.  At that
+    // point the consumer is telling us that they don't want to receive any
+    // more messages, so we honor that wish by forgetting them!
+    virtual void Clear() { listener_ = NULL; }
+
+   private:
+    friend class ChannelProxy;
+    // Create the Channel
+    void CreateChannel(const std::wstring& id, const Channel::Mode& mode);
+
+    // Methods called via InvokeLater:
+    void OnSendMessage(Message* message_ptr);
+    void OnAddFilter(MessageFilter* filter);
+    void OnRemoveFilter(MessageFilter* filter);
+    void OnDispatchConnected();
+    void OnDispatchError();
+
+    MessageLoop* listener_message_loop_;
+    Channel::Listener* listener_;
+
+    // List of filters.  This is only accessed on the IPC thread.
+    std::vector<RefPtr<MessageFilter> > filters_;
+    MessageLoop* ipc_message_loop_;
+    Channel* channel_;
+    std::wstring channel_id_;
+    int peer_pid_;
+    bool channel_connected_called_;
+  };
+
+  Context* context() { return context_; }
+
+ private:
+  void Init(const std::wstring& channel_id, Channel::Mode mode,
+            MessageLoop* ipc_thread_loop, bool create_pipe_now);
+
+  // By maintaining this indirection (ref-counted) to our internal state, we
+  // can safely be destroyed while the background thread continues to do stuff
+  // that involves this data.
+  RefPtr<Context> context_;
+};
+
+}  // namespace IPC
+
+#endif  // CHROME_COMMON_IPC_CHANNEL_PROXY_H__
--- a/ipc/chromium/src/chrome/common/ipc_message_utils.h
+++ b/ipc/chromium/src/chrome/common/ipc_message_utils.h
@@ -12,17 +12,17 @@
 #include "base/file_path.h"
 #include "base/string_util.h"
 #include "base/string16.h"
 #include "base/time.h"
 
 #if defined(OS_POSIX)
 #include "chrome/common/file_descriptor_set_posix.h"
 #endif
-#include "chrome/common/ipc_message.h"
+#include "chrome/common/ipc_sync_message.h"
 #include "chrome/common/transport_dib.h"
 
 namespace IPC {
 
 //-----------------------------------------------------------------------------
 // An iterator class for reading the fields contained within a Message.
 
 class MessageIterator {
new file mode 100644
--- /dev/null
+++ b/ipc/chromium/src/chrome/common/ipc_sync_channel.cc
@@ -0,0 +1,452 @@
+// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "chrome/common/ipc_sync_channel.h"
+
+#include "base/logging.h"
+#include "base/thread_local.h"
+#include "base/message_loop.h"
+#include "base/waitable_event.h"
+#include "base/waitable_event_watcher.h"
+#include "chrome/common/ipc_sync_message.h"
+#include "nsISupportsImpl.h"
+
+using base::TimeDelta;
+using base::TimeTicks;
+using base::WaitableEvent;
+
+namespace IPC {
+// When we're blocked in a Send(), we need to process incoming synchronous
+// messages right away because it could be blocking our reply (either
+// directly from the same object we're calling, or indirectly through one or
+// more other channels).  That means that in SyncContext's OnMessageReceived,
+// we need to process sync message right away if we're blocked.  However a
+// simple check isn't sufficient, because the listener thread can be in the
+// process of calling Send.
+// To work around this, when SyncChannel filters a sync message, it sets
+// an event that the listener thread waits on during its Send() call.  This
+// allows us to dispatch incoming sync messages when blocked.  The race
+// condition is handled because if Send is in the process of being called, it
+// will check the event.  In case the listener thread isn't sending a message,
+// we queue a task on the listener thread to dispatch the received messages.
+// The messages are stored in this queue object that's shared among all
+// SyncChannel objects on the same thread (since one object can receive a
+// sync message while another one is blocked).
+
+class SyncChannel::ReceivedSyncMsgQueue {
+ public:
+  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(SyncChannel::ReceivedSyncMsgQueue)
+
+  static base::ThreadLocalPointer<ReceivedSyncMsgQueue>& get_tls_ptr() {
+    static base::ThreadLocalPointer<ReceivedSyncMsgQueue> tls_ptr;
+    return tls_ptr;
+  }
+
+  // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
+  // if necessary.  Call RemoveContext on the same thread when done.
+  static ReceivedSyncMsgQueue* AddContext() {
+    // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
+    // SyncChannel objects can block the same thread).
+    ReceivedSyncMsgQueue* rv = get_tls_ptr().Get();
+    if (!rv) {
+      rv = new ReceivedSyncMsgQueue();
+      get_tls_ptr().Set(rv);
+    }
+    rv->listener_count_++;
+    return rv;
+  }
+
+  // Called on IPC thread when a synchronous message or reply arrives.
+  void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
+    bool was_task_pending;
+    {
+      AutoLock auto_lock(message_lock_);
+
+      was_task_pending = task_pending_;
+      task_pending_ = true;
+
+      // We set the event in case the listener thread is blocked (or is about
+      // to). In case it's not, the PostTask dispatches the messages.
+      message_queue_.push_back(QueuedMessage(new Message(msg), context));
+    }
+
+    dispatch_event_.Signal();
+    if (!was_task_pending) {
+      listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
+          this, &ReceivedSyncMsgQueue::DispatchMessagesTask));
+    }
+  }
+
+  void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
+    received_replies_.push_back(QueuedMessage(new Message(msg), context));
+  }
+
+  // Called on the listener's thread to process any queues synchronous
+  // messages.
+  void DispatchMessagesTask() {
+    {
+      AutoLock auto_lock(message_lock_);
+      task_pending_ = false;
+    }
+    DispatchMessages();
+  }
+
+  void DispatchMessages() {
+    while (true) {
+      Message* message;
+      RefPtr<SyncChannel::SyncContext> context;
+      {
+        AutoLock auto_lock(message_lock_);
+        if (message_queue_.empty())
+          break;
+
+        message = message_queue_.front().message;
+        context = message_queue_.front().context;
+        message_queue_.pop_front();
+      }
+
+      context->OnDispatchMessage(*message);
+      delete message;
+    }
+  }
+
+  // SyncChannel calls this in its destructor.
+  void RemoveContext(SyncContext* context) {
+    AutoLock auto_lock(message_lock_);
+
+    SyncMessageQueue::iterator iter = message_queue_.begin();
+    while (iter != message_queue_.end()) {
+      if (iter->context == context) {
+        delete iter->message;
+        iter = message_queue_.erase(iter);
+      } else {
+        iter++;
+      }
+    }
+
+    if (--listener_count_ == 0) {
+      DCHECK(get_tls_ptr().Get());
+      get_tls_ptr().Set(NULL);
+    }
+  }
+
+  WaitableEvent* dispatch_event() { return &dispatch_event_; }
+  MessageLoop* listener_message_loop() { return listener_message_loop_; }
+
+  // Called on the ipc thread to check if we can unblock any current Send()
+  // calls based on a queued reply.
+  void DispatchReplies() {
+    for (size_t i = 0; i < received_replies_.size(); ++i) {
+      Message* message = received_replies_[i].message;
+      if (received_replies_[i].context->TryToUnblockListener(message)) {
+        delete message;
+        received_replies_.erase(received_replies_.begin() + i);
+        return;
+      }
+    }
+  }
+
+ protected:
+  ~ReceivedSyncMsgQueue() {}
+
+ private:
+  // See the comment in SyncChannel::SyncChannel for why this event is created
+  // as manual reset.
+  ReceivedSyncMsgQueue() :
+      dispatch_event_(true, false),
+      listener_message_loop_(MessageLoop::current()),
+      task_pending_(false),
+      listener_count_(0) {
+  }
+
+  // Holds information about a queued synchronous message or reply.
+  struct QueuedMessage {
+    QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
+    Message* message;
+    RefPtr<SyncChannel::SyncContext> context;
+  };
+
+  typedef std::deque<QueuedMessage> SyncMessageQueue;
+  SyncMessageQueue message_queue_;
+
+  std::vector<QueuedMessage> received_replies_;
+
+  // Set when we got a synchronous message that we must respond to as the
+  // sender needs its reply before it can reply to our original synchronous
+  // message.
+  WaitableEvent dispatch_event_;
+  MessageLoop* listener_message_loop_;
+  Lock message_lock_;
+  bool task_pending_;
+  int listener_count_;
+};
+
+SyncChannel::SyncContext::SyncContext(
+    Channel::Listener* listener,
+    MessageFilter* filter,
+    MessageLoop* ipc_thread,
+    WaitableEvent* shutdown_event)
+    : ChannelProxy::Context(listener, filter, ipc_thread),
+      received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
+      shutdown_event_(shutdown_event) {
+}
+
+SyncChannel::SyncContext::~SyncContext() {
+  while (!deserializers_.empty())
+    Pop();
+}
+
+// Adds information about an outgoing sync message to the context so that
+// we know how to deserialize the reply.  Returns a handle that's set when
+// the reply has arrived.
+void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
+  // The event is created as manual reset because in between Signal and
+  // OnObjectSignalled, another Send can happen which would stop the watcher
+  // from being called.  The event would get watched later, when the nested
+  // Send completes, so the event will need to remain set.
+  PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
+                         sync_msg->GetReplyDeserializer(),
+                         new WaitableEvent(true, false));
+  AutoLock auto_lock(deserializers_lock_);
+  deserializers_.push_back(pending);
+}
+
+bool SyncChannel::SyncContext::Pop() {
+  bool result;
+  {
+    AutoLock auto_lock(deserializers_lock_);
+    PendingSyncMsg msg = deserializers_.back();
+    delete msg.deserializer;
+    delete msg.done_event;
+    msg.done_event = NULL;
+    deserializers_.pop_back();
+    result = msg.send_result;
+  }
+
+  // We got a reply to a synchronous Send() call that's blocking the listener
+  // thread.  However, further down the call stack there could be another
+  // blocking Send() call, whose reply we received after we made this last
+  // Send() call.  So check if we have any queued replies available that
+  // can now unblock the listener thread.
+  ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+      received_sync_msgs_.get(), &ReceivedSyncMsgQueue::DispatchReplies));
+
+  return result;
+}
+
+WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
+  AutoLock auto_lock(deserializers_lock_);
+  return deserializers_.back().done_event;
+}
+
+WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
+  return received_sync_msgs_->dispatch_event();
+}
+
+void SyncChannel::SyncContext::DispatchMessages() {
+  received_sync_msgs_->DispatchMessages();
+}
+
+bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
+  AutoLock auto_lock(deserializers_lock_);
+  if (deserializers_.empty() ||
+      !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
+    return false;
+  }
+
+  if (!msg->is_reply_error()) {
+    deserializers_.back().send_result = deserializers_.back().deserializer->
+        SerializeOutputParameters(*msg);
+  }
+  deserializers_.back().done_event->Signal();
+
+  return true;
+}
+
+void SyncChannel::SyncContext::Clear() {
+  CancelPendingSends();
+  received_sync_msgs_->RemoveContext(this);
+
+  Context::Clear();
+}
+
+void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
+  // Give the filters a chance at processing this message.
+  if (TryFilters(msg))
+    return;
+
+  if (TryToUnblockListener(&msg))
+    return;
+
+  if (msg.should_unblock()) {
+    received_sync_msgs_->QueueMessage(msg, this);
+    return;
+  }
+
+  if (msg.is_reply()) {
+    received_sync_msgs_->QueueReply(msg, this);
+    return;
+  }
+
+  return Context::OnMessageReceivedNoFilter(msg);
+}
+
+void SyncChannel::SyncContext::OnChannelError() {
+  CancelPendingSends();
+  shutdown_watcher_.StopWatching();
+  Context::OnChannelError();
+}
+
+void SyncChannel::SyncContext::OnChannelOpened() {
+  shutdown_watcher_.StartWatching(shutdown_event_, this);
+  Context::OnChannelOpened();
+}
+
+void SyncChannel::SyncContext::OnChannelClosed() {
+  shutdown_watcher_.StopWatching();
+  Context::OnChannelClosed();
+}
+
+void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
+  AutoLock auto_lock(deserializers_lock_);
+  PendingSyncMessageQueue::iterator iter;
+  for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
+    if (iter->id == message_id) {
+      iter->done_event->Signal();
+      break;
+    }
+  }
+}
+
+void SyncChannel::SyncContext::CancelPendingSends() {
+  AutoLock auto_lock(deserializers_lock_);
+  PendingSyncMessageQueue::iterator iter;
+  for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
+    iter->done_event->Signal();
+}
+
+void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) {
+  DCHECK(event == shutdown_event_);
+  // Process shut down before we can get a reply to a synchronous message.
+  // Cancel pending Send calls, which will end up setting the send done event.
+  CancelPendingSends();
+}
+
+
+SyncChannel::SyncChannel(
+    const std::wstring& channel_id, Channel::Mode mode,
+    Channel::Listener* listener, MessageFilter* filter,
+    MessageLoop* ipc_message_loop, bool create_pipe_now,
+    WaitableEvent* shutdown_event)
+    : ChannelProxy(
+          channel_id, mode, ipc_message_loop,
+          new SyncContext(listener, filter, ipc_message_loop, shutdown_event),
+          create_pipe_now),
+      sync_messages_with_no_timeout_allowed_(true) {
+  // Ideally we only want to watch this object when running a nested message
+  // loop.  However, we don't know when it exits if there's another nested
+  // message loop running under it or not, so we wouldn't know whether to
+  // stop or keep watching.  So we always watch it, and create the event as
+  // manual reset since the object watcher might otherwise reset the event
+  // when we're doing a WaitMany.
+  dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this);
+}
+
+SyncChannel::~SyncChannel() {
+}
+
+bool SyncChannel::Send(Message* message) {
+  return SendWithTimeout(message, base::kNoTimeout);
+}
+
+bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
+  if (!message->is_sync()) {
+    ChannelProxy::Send(message);
+    return true;
+  }
+
+  // *this* might get deleted in WaitForReply.
+  RefPtr<SyncContext> context(sync_context());
+  if (context->shutdown_event()->IsSignaled()) {
+    delete message;
+    return false;
+  }
+
+  DCHECK(sync_messages_with_no_timeout_allowed_ ||
+         timeout_ms != base::kNoTimeout);
+  SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
+  context->Push(sync_msg);
+  int message_id = SyncMessage::GetMessageId(*sync_msg);
+  WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
+
+  ChannelProxy::Send(message);
+
+  if (timeout_ms != base::kNoTimeout) {
+    // We use the sync message id so that when a message times out, we don't
+    // confuse it with another send that is either above/below this Send in
+    // the call stack.
+    context->ipc_message_loop()->PostDelayedTask(FROM_HERE,
+        NewRunnableMethod(context.get(),
+            &SyncContext::OnSendTimeout, message_id), timeout_ms);
+  }
+
+  // Wait for reply, or for any other incoming synchronous messages.
+  WaitForReply(pump_messages_event);
+
+  return context->Pop();
+}
+
+void SyncChannel::WaitForReply(WaitableEvent* pump_messages_event) {
+  while (true) {
+    WaitableEvent* objects[] = {
+      sync_context()->GetDispatchEvent(),
+      sync_context()->GetSendDoneEvent(),
+      pump_messages_event
+    };
+
+    unsigned count = pump_messages_event ? 3: 2;
+    unsigned result = WaitableEvent::WaitMany(objects, count);
+    if (result == 0 /* dispatch event */) {
+      // We're waiting for a reply, but we received a blocking synchronous
+      // call.  We must process it or otherwise a deadlock might occur.
+      sync_context()->GetDispatchEvent()->Reset();
+      sync_context()->DispatchMessages();
+      continue;
+    }
+
+    if (result == 2 /* pump_messages_event */)
+      WaitForReplyWithNestedMessageLoop();  // Start a nested message loop.
+
+    break;
+  }
+}
+
+void SyncChannel::WaitForReplyWithNestedMessageLoop() {
+  WaitableEvent* old_done_event = send_done_watcher_.GetWatchedEvent();
+  send_done_watcher_.StopWatching();
+  send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this);
+  bool old_state = MessageLoop::current()->NestableTasksAllowed();
+  MessageLoop::current()->SetNestableTasksAllowed(true);
+  MessageLoop::current()->Run();
+  MessageLoop::current()->SetNestableTasksAllowed(old_state);
+  if (old_done_event)
+    send_done_watcher_.StartWatching(old_done_event, this);
+}
+
+void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
+  WaitableEvent* dispatch_event = sync_context()->GetDispatchEvent();
+  if (event == dispatch_event) {
+    // The call to DispatchMessages might delete this object, so reregister
+    // the object watcher first.
+    dispatch_event->Reset();
+    dispatch_watcher_.StartWatching(dispatch_event, this);
+    sync_context()->DispatchMessages();
+  } else {
+    // We got the reply, timed out or the process shutdown.
+    DCHECK(event == sync_context()->GetSendDoneEvent());
+    MessageLoop::current()->Quit();
+  }
+}
+
+}  // namespace IPC
new file mode 100644
--- /dev/null
+++ b/ipc/chromium/src/chrome/common/ipc_sync_channel.h
@@ -0,0 +1,160 @@
+// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CHROME_COMMON_IPC_SYNC_SENDER_H__
+#define CHROME_COMMON_IPC_SYNC_SENDER_H__
+
+#include <string>
+#include <deque>
+#include "base/basictypes.h"
+#include "base/lock.h"
+#include "base/scoped_handle.h"
+#include "base/waitable_event.h"
+#include "base/waitable_event_watcher.h"
+#include "chrome/common/ipc_channel_proxy.h"
+
+#include "nsAutoPtr.h"
+
+namespace IPC {
+
+class SyncMessage;
+class MessageReplyDeserializer;
+
+// This is similar to IPC::ChannelProxy, with the added feature of supporting
+// sending synchronous messages.
+// Note that care must be taken that the lifetime of the ipc_thread argument
+// is more than this object.  If the message loop goes away while this object
+// is running and it's used to send a message, then it will use the invalid
+// message loop pointer to proxy it to the ipc thread.
+class SyncChannel : public ChannelProxy,
+                    public base::WaitableEventWatcher::Delegate {
+ public:
+  SyncChannel(const std::wstring& channel_id, Channel::Mode mode,
+              Channel::Listener* listener, MessageFilter* filter,
+              MessageLoop* ipc_message_loop, bool create_pipe_now,
+              base::WaitableEvent* shutdown_event);
+  ~SyncChannel();
+
+  virtual bool Send(Message* message);
+  virtual bool SendWithTimeout(Message* message, int timeout_ms);
+
+  // Whether we allow sending messages with no time-out.
+  void set_sync_messages_with_no_timeout_allowed(bool value) {
+    sync_messages_with_no_timeout_allowed_ = value;
+  }
+
+ protected:
+  class ReceivedSyncMsgQueue;
+  friend class ReceivedSyncMsgQueue;
+
+  // SyncContext holds the per object data for SyncChannel, so that SyncChannel
+  // can be deleted while it's being used in a different thread.  See
+  // ChannelProxy::Context for more information.
+  class SyncContext : public Context,
+                      public base::WaitableEventWatcher::Delegate {
+   public:
+    SyncContext(Channel::Listener* listener,
+                MessageFilter* filter,
+                MessageLoop* ipc_thread,
+                base::WaitableEvent* shutdown_event);
+
+    ~SyncContext();
+
+    // Adds information about an outgoing sync message to the context so that
+    // we know how to deserialize the reply.
+    void Push(IPC::SyncMessage* sync_msg);
+
+    // Cleanly remove the top deserializer (and throw it away).  Returns the
+    // result of the Send call for that message.
+    bool Pop();
+
+    // Returns an event that's set when the send is complete, timed out or the
+    // process shut down.
+    base::WaitableEvent* GetSendDoneEvent();
+
+    // Returns an event that's set when an incoming message that's not the reply
+    // needs to get dispatched (by calling SyncContext::DispatchMessages).
+    base::WaitableEvent* GetDispatchEvent();
+
+    void DispatchMessages();
+
+    // Checks if the given message is blocking the listener thread because of a
+    // synchronous send.  If it is, the thread is unblocked and true is
+    // returned. Otherwise the function returns false.
+    bool TryToUnblockListener(const Message* msg);
+
+    // Called on the IPC thread when a sync send that runs a nested message loop
+    // times out.
+    void OnSendTimeout(int message_id);
+
+    base::WaitableEvent* shutdown_event() { return shutdown_event_; }
+
+   private:
+    // IPC::ChannelProxy methods that we override.
+
+    // Called on the listener thread.
+   virtual void Clear();
+
+    // Called on the IPC thread.
+    virtual void OnMessageReceived(const Message& msg);
+    virtual void OnChannelError();
+    virtual void OnChannelOpened();
+    virtual void OnChannelClosed();
+
+    // Cancels all pending Send calls.
+    void CancelPendingSends();
+
+    // WaitableEventWatcher::Delegate implementation.
+    virtual void OnWaitableEventSignaled(base::WaitableEvent* arg);
+
+    // When sending a synchronous message, this structure contains an object
+    // that knows how to deserialize the response.
+    struct PendingSyncMsg {
+      PendingSyncMsg(int id, IPC::MessageReplyDeserializer* d,
+                     base::WaitableEvent* e) :
+          id(id), deserializer(d), done_event(e), send_result(false) { }
+      int id;
+      IPC::MessageReplyDeserializer* deserializer;
+      base::WaitableEvent* done_event;
+      bool send_result;
+    };
+
+    typedef std::deque<PendingSyncMsg> PendingSyncMessageQueue;
+    PendingSyncMessageQueue deserializers_;
+    Lock deserializers_lock_;
+
+    RefPtr<ReceivedSyncMsgQueue> received_sync_msgs_;
+
+    base::WaitableEvent* shutdown_event_;
+    base::WaitableEventWatcher shutdown_watcher_;
+  };
+
+ private:
+  // WaitableEventWatcher::Delegate implementation.
+  virtual void OnWaitableEventSignaled(base::WaitableEvent* arg);
+
+  SyncContext* sync_context() {
+    return reinterpret_cast<SyncContext*>(context());
+  }
+
+  // Both these functions wait for a reply, timeout or process shutdown.  The
+  // latter one also runs a nested message loop in the meantime.
+  void WaitForReply(base::WaitableEvent* pump_messages_event);
+
+  // Runs a nested message loop until a reply arrives, times out, or the process
+  // shuts down.
+  void WaitForReplyWithNestedMessageLoop();
+
+  bool sync_messages_with_no_timeout_allowed_;
+
+  // Used to signal events between the IPC and listener threads.
+  base::WaitableEventWatcher send_done_watcher_;
+  base::WaitableEventWatcher dispatch_watcher_;
+
+  DISALLOW_EVIL_CONSTRUCTORS(SyncChannel);
+};
+
+}  // namespace IPC
+
+#endif  // CHROME_COMMON_IPC_SYNC_SENDER_H__
new file mode 100644
--- /dev/null
+++ b/ipc/chromium/src/chrome/common/ipc_sync_message.cc
@@ -0,0 +1,125 @@
+// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "build/build_config.h"
+
+#if defined(OS_WIN)
+#include <windows.h>
+#endif
+#include <stack>
+
+#include "base/logging.h"
+#include "base/waitable_event.h"
+#include "chrome/common/ipc_sync_message.h"
+
+namespace IPC {
+
+uint32_t SyncMessage::next_id_ = 0;
+#define kSyncMessageHeaderSize 4
+
+SyncMessage::SyncMessage(
+    int32_t routing_id,
+    uint16_t type,
+    PriorityValue priority,
+    MessageReplyDeserializer* deserializer)
+    : Message(routing_id, type, priority),
+      deserializer_(deserializer),
+      pump_messages_event_(NULL)
+      {
+  set_sync();
+  set_unblock(true);
+
+  // Add synchronous message data before the message payload.
+  SyncHeader header;
+  header.message_id = ++next_id_;
+  WriteSyncHeader(this, header);
+}
+
+MessageReplyDeserializer* SyncMessage::GetReplyDeserializer() {
+  MessageReplyDeserializer* rv = deserializer_;
+  DCHECK(rv);
+  deserializer_ = NULL;
+  return rv;
+}
+
+void SyncMessage::EnableMessagePumping() {
+  static base::WaitableEvent* dummy_event = new base::WaitableEvent(true, true);
+  DCHECK(!pump_messages_event_);
+  set_pump_messages_event(dummy_event);
+}
+
+bool SyncMessage::IsMessageReplyTo(const Message& msg, int request_id) {
+  if (!msg.is_reply())
+    return false;
+
+  return GetMessageId(msg) == request_id;
+}
+
+void* SyncMessage::GetDataIterator(const Message* msg) {
+  void* iter = const_cast<char*>(msg->payload());
+  UpdateIter(&iter, kSyncMessageHeaderSize);
+  return iter;
+}
+
+int SyncMessage::GetMessageId(const Message& msg) {
+  if (!msg.is_sync() && !msg.is_reply())
+    return 0;
+
+  SyncHeader header;
+  if (!ReadSyncHeader(msg, &header))
+    return 0;
+
+  return header.message_id;
+}
+
+Message* SyncMessage::GenerateReply(const Message* msg) {
+  DCHECK(msg->is_sync());
+
+  Message* reply = new Message(msg->routing_id(), IPC_REPLY_ID,
+                               msg->priority());
+  reply->set_reply();
+
+  SyncHeader header;
+
+  // use the same message id, but this time reply bit is set
+  header.message_id = GetMessageId(*msg);
+  WriteSyncHeader(reply, header);
+
+  return reply;
+}
+
+bool SyncMessage::ReadSyncHeader(const Message& msg, SyncHeader* header) {
+  DCHECK(msg.is_sync() || msg.is_reply());
+
+  void* iter = NULL;
+  bool result = msg.ReadInt(&iter, &header->message_id);
+  if (!result) {
+    NOTREACHED();
+    return false;
+  }
+
+  return true;
+}
+
+bool SyncMessage::WriteSyncHeader(Message* msg, const SyncHeader& header) {
+  DCHECK(msg->is_sync() || msg->is_reply());
+  DCHECK(msg->payload_size() == 0);
+  bool result = msg->WriteInt(header.message_id);
+  if (!result) {
+    NOTREACHED();
+    return false;
+  }
+
+  // Note: if you add anything here, you need to update kSyncMessageHeaderSize.
+  DCHECK(kSyncMessageHeaderSize == msg->payload_size());
+
+  return true;
+}
+
+
+bool MessageReplyDeserializer::SerializeOutputParameters(const Message& msg) {
+  return SerializeOutputParameters(msg, SyncMessage::GetDataIterator(&msg));
+}
+
+}  // namespace IPC
new file mode 100644
--- /dev/null
+++ b/ipc/chromium/src/chrome/common/ipc_sync_message.h
@@ -0,0 +1,97 @@
+// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CHROME_COMMON_IPC_SYNC_MESSAGE_H__
+#define CHROME_COMMON_IPC_SYNC_MESSAGE_H__
+
+#if defined(OS_WIN)
+#include <windows.h>
+#endif
+#include <string>
+#include "base/basictypes.h"
+#include "chrome/common/ipc_message.h"
+
+namespace base {
+class WaitableEvent;
+}
+
+namespace IPC {
+
+class MessageReplyDeserializer;
+
+class SyncMessage : public Message {
+ public:
+  SyncMessage(int32_t routing_id, uint16_t type, PriorityValue priority,
+              MessageReplyDeserializer* deserializer);
+
+  // Call this to get a deserializer for the output parameters.
+  // Note that this can only be called once, and the caller is responsible
+  // for deleting the deserializer when they're done.
+  MessageReplyDeserializer* GetReplyDeserializer();
+
+  // If this message can cause the receiver to block while waiting for user
+  // input (i.e. by calling MessageBox), then the caller needs to pump window
+  // messages and dispatch asynchronous messages while waiting for the reply.
+  // If this event is passed in, then window messages will start being pumped
+  // when it's set.  Note that this behavior will continue even if the event is
+  // later reset.  The event must be valid until after the Send call returns.
+  void set_pump_messages_event(base::WaitableEvent* event) {
+    pump_messages_event_ = event;
+    if (event) {
+      header()->flags |= PUMPING_MSGS_BIT;
+    } else {
+      header()->flags &= ~PUMPING_MSGS_BIT;
+    }
+  }
+
+  // Call this if you always want to pump messages.  You can call this method
+  // or set_pump_messages_event but not both.
+  void EnableMessagePumping();
+
+  base::WaitableEvent* pump_messages_event() const {
+    return pump_messages_event_;
+  }
+
+  // Returns true if the message is a reply to the given request id.
+  static bool IsMessageReplyTo(const Message& msg, int request_id);
+
+  // Given a reply message, returns an iterator to the beginning of the data
+  // (i.e. skips over the synchronous specific data).
+  static void* GetDataIterator(const Message* msg);
+
+  // Given a synchronous message (or its reply), returns its id.
+  static int GetMessageId(const Message& msg);
+
+  // Generates a reply message to the given message.
+  static Message* GenerateReply(const Message* msg);
+
+ private:
+  struct SyncHeader {
+    // unique ID (unique per sender)
+    int message_id;
+  };
+
+  static bool ReadSyncHeader(const Message& msg, SyncHeader* header);
+  static bool WriteSyncHeader(Message* msg, const SyncHeader& header);
+
+  MessageReplyDeserializer* deserializer_;
+  base::WaitableEvent* pump_messages_event_;
+
+  static uint32_t next_id_;  // for generation of unique ids
+};
+
+// Used to deserialize parameters from a reply to a synchronous message
+class MessageReplyDeserializer {
+ public:
+  bool SerializeOutputParameters(const Message& msg);
+  virtual ~MessageReplyDeserializer() {}
+ private:
+  // Derived classes need to implement this, using the given iterator (which
+  // is skipped past the header for synchronous messages).
+  virtual bool SerializeOutputParameters(const Message& msg, void* iter) = 0;
+};
+
+}  // namespace IPC
+
+#endif  // CHROME_COMMON_IPC_SYNC_MESSAGE_H__
new file mode 100644
--- /dev/null
+++ b/ipc/chromium/src/chrome/common/message_router.cc
@@ -0,0 +1,42 @@
+// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "chrome/common/message_router.h"
+
+void MessageRouter::OnControlMessageReceived(const IPC::Message& msg) {
+  NOTREACHED() <<
+      "should override in subclass if you care about control messages";
+}
+
+bool MessageRouter::Send(IPC::Message* msg) {
+  NOTREACHED() <<
+      "should override in subclass if you care about sending messages";
+  return false;
+}
+
+void MessageRouter::AddRoute(int32_t routing_id,
+                             IPC::Channel::Listener* listener) {
+  routes_.AddWithID(listener, routing_id);
+}
+
+void MessageRouter::RemoveRoute(int32_t routing_id) {
+  routes_.Remove(routing_id);
+}
+
+void MessageRouter::OnMessageReceived(const IPC::Message& msg) {
+  if (msg.routing_id() == MSG_ROUTING_CONTROL) {
+    OnControlMessageReceived(msg);
+  } else {
+    RouteMessage(msg);
+  }
+}
+
+bool MessageRouter::RouteMessage(const IPC::Message& msg) {
+  IPC::Channel::Listener* listener = routes_.Lookup(msg.routing_id());
+  if (!listener)
+    return false;
+
+  listener->OnMessageReceived(msg);
+  return true;
+}
new file mode 100644
--- /dev/null
+++ b/ipc/chromium/src/chrome/common/message_router.h
@@ -0,0 +1,60 @@
+// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CHROME_COMMON_MESSAGE_ROUTER_H__
+#define CHROME_COMMON_MESSAGE_ROUTER_H__
+
+#include "base/id_map.h"
+#include "chrome/common/ipc_channel.h"
+
+// The MessageRouter handles all incoming messages sent to it by routing them
+// to the correct listener.  Routing is based on the Message's routing ID.
+// Since routing IDs are typically assigned asynchronously by the browser
+// process, the MessageRouter has the notion of pending IDs for listeners that
+// have not yet been assigned a routing ID.
+//
+// When a message arrives, the routing ID is used to index the set of routes to
+// find a listener.  If a listener is found, then the message is passed to it.
+// Otherwise, the message is ignored if its routing ID is not equal to
+// MSG_ROUTING_CONTROL.
+//
+// The MessageRouter supports the IPC::Message::Sender interface for outgoing
+// messages, but does not define a meaningful implementation of it.  The
+// subclass of MessageRouter is intended to provide that if appropriate.
+//
+// The MessageRouter can be used as a concrete class provided its Send method
+// is not called and it does not receive any control messages.
+
+class MessageRouter : public IPC::Channel::Listener,
+                      public IPC::Message::Sender {
+ public:
+  MessageRouter() {}
+  virtual ~MessageRouter() {}
+
+  // Implemented by subclasses to handle control messages
+  virtual void OnControlMessageReceived(const IPC::Message& msg);
+
+  // IPC::Channel::Listener implementation:
+  virtual void OnMessageReceived(const IPC::Message& msg);
+
+  // Like OnMessageReceived, except it only handles routed messages.  Returns
+  // true if the message was dispatched, or false if there was no listener for
+  // that route id.
+  virtual bool RouteMessage(const IPC::Message& msg);
+
+  // IPC::Message::Sender implementation:
+  virtual bool Send(IPC::Message* msg);
+
+  // Called to add/remove a listener for a particular message routing ID.
+  void AddRoute(int32_t routing_id, IPC::Channel::Listener* listener);
+  void RemoveRoute(int32_t routing_id);
+
+ private:
+  // A list of all listeners with assigned routing IDs.
+  IDMap<IPC::Channel::Listener> routes_;
+
+  DISALLOW_EVIL_CONSTRUCTORS(MessageRouter);
+};
+
+#endif  // CHROME_COMMON_MESSAGE_ROUTER_H__