Bug 1235633 - IPC OOM mitigation by eliminating buffer copying (r=jld,a=ritu)
authorBill McCloskey <wmccloskey@mozilla.com>
Wed, 20 Apr 2016 12:01:57 -0700
changeset 324021 5649dee23169c659655bfd2f9bae78b5b1bed073
parent 324020 c678a3c4169ecdb9bb761e98978307b008659246
child 324022 803140bf65ab3dd48fdd9bfe01e4142c874277c0
push id5913
push userjlund@mozilla.com
push dateMon, 25 Apr 2016 16:57:49 +0000
treeherdermozilla-beta@dcaf0a6fa115 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersjld, ritu
bugs1235633
milestone47.0a2
Bug 1235633 - IPC OOM mitigation by eliminating buffer copying (r=jld,a=ritu)
ipc/chromium/moz.build
ipc/chromium/src/base/buffer.cc
ipc/chromium/src/base/buffer.h
ipc/chromium/src/base/pickle.cc
ipc/chromium/src/base/pickle.h
ipc/chromium/src/chrome/common/child_process_host.cc
ipc/chromium/src/chrome/common/child_process_host.h
ipc/chromium/src/chrome/common/child_thread.cc
ipc/chromium/src/chrome/common/child_thread.h
ipc/chromium/src/chrome/common/ipc_channel.h
ipc/chromium/src/chrome/common/ipc_channel_posix.cc
ipc/chromium/src/chrome/common/ipc_channel_posix.h
ipc/chromium/src/chrome/common/ipc_channel_proxy.cc
ipc/chromium/src/chrome/common/ipc_channel_proxy.h
ipc/chromium/src/chrome/common/ipc_channel_win.cc
ipc/chromium/src/chrome/common/ipc_channel_win.h
ipc/chromium/src/chrome/common/ipc_message.cc
ipc/chromium/src/chrome/common/ipc_message.h
ipc/chromium/src/chrome/common/ipc_sync_channel.cc
ipc/chromium/src/chrome/common/ipc_sync_channel.h
ipc/chromium/src/chrome/common/message_router.cc
ipc/chromium/src/chrome/common/message_router.h
ipc/glue/GeckoChildProcessHost.cpp
ipc/glue/GeckoChildProcessHost.h
ipc/glue/MessageChannel.cpp
ipc/glue/MessageChannel.h
ipc/glue/MessageLink.cpp
ipc/glue/MessageLink.h
--- a/ipc/chromium/moz.build
+++ b/ipc/chromium/moz.build
@@ -5,16 +5,17 @@
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 libevent_path_prefix = 'src/third_party'
 include(libevent_path_prefix + '/libeventcommon.mozbuild')
 
 UNIFIED_SOURCES += [
     'src/base/at_exit.cc',
     'src/base/base_switches.cc',
+    'src/base/buffer.cc',
     'src/base/command_line.cc',
     'src/base/file_path.cc',
     'src/base/file_util.cc',
     'src/base/histogram.cc',
     'src/base/lock.cc',
     'src/base/logging.cc',
     'src/base/message_loop.cc',
     'src/base/message_pump_default.cc',
new file mode 100644
--- /dev/null
+++ b/ipc/chromium/src/base/buffer.cc
@@ -0,0 +1,128 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "buffer.h"
+#include "nsDebug.h"
+
+Buffer::Buffer()
+ : mBuffer(nullptr),
+   mSize(0),
+   mReserved(0)
+{
+}
+
+Buffer::~Buffer()
+{
+  if (mBuffer) {
+    free(mBuffer);
+  }
+}
+
+bool
+Buffer::empty() const
+{
+  return mSize == 0;
+}
+
+size_t
+Buffer::size() const
+{
+  return mSize;
+}
+
+const char*
+Buffer::data() const
+{
+  return mBuffer;
+}
+
+void
+Buffer::clear()
+{
+  free(mBuffer);
+  mBuffer = nullptr;
+  mSize = 0;
+  mReserved = 0;
+}
+
+void
+Buffer::try_realloc(size_t newlength)
+{
+  char* buffer = (char*)realloc(mBuffer, newlength);
+  if (buffer || !newlength) {
+    mBuffer = buffer;
+    mReserved = newlength;
+    return;
+  }
+
+  // If we're growing the buffer, crash. If we're shrinking, then we continue to
+  // use the old (larger) buffer.
+  if (newlength > mReserved) {
+    NS_ABORT_OOM(newlength);
+  }
+}
+
+void
+Buffer::append(const char* bytes, size_t length)
+{
+  if (mSize + length > mReserved) {
+    try_realloc(mSize + length);
+  }
+
+  memcpy(mBuffer + mSize, bytes, length);
+  mSize += length;
+}
+
+void
+Buffer::assign(const char* bytes, size_t length)
+{
+  if (bytes >= mBuffer && bytes < mBuffer + mReserved) {
+    MOZ_RELEASE_ASSERT(length <= mSize);
+    memmove(mBuffer, bytes, length);
+    mSize = length;
+    try_realloc(length);
+  } else {
+    try_realloc(length);
+    mSize = length;
+    memcpy(mBuffer, bytes, length);
+  }
+}
+
+void
+Buffer::erase(size_t start, size_t count)
+{
+  mSize -= count;
+  memmove(mBuffer + start, mBuffer + start + count, mSize);
+  try_realloc(mSize);
+}
+
+void
+Buffer::reserve(size_t size)
+{
+  if (mReserved < size) {
+    try_realloc(size);
+  }
+}
+
+char*
+Buffer::trade_bytes(size_t count)
+{
+  MOZ_RELEASE_ASSERT(count);
+
+  char* result = mBuffer;
+  mSize = mReserved = mSize - count;
+  mBuffer = mReserved ? (char*)malloc(mReserved) : nullptr;
+  MOZ_RELEASE_ASSERT(!mReserved || mBuffer);
+  memcpy(mBuffer, result + count, mSize);
+
+  // Try to resize the buffer down, but ignore failure. This can cause extra
+  // copies, but so be it.
+  char* resized = (char*)realloc(result, count);
+  if (resized) {
+    return resized;
+  }
+  return result;
+}
new file mode 100644
--- /dev/null
+++ b/ipc/chromium/src/base/buffer.h
@@ -0,0 +1,44 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#ifndef CHROME_BASE_BUFFER_H_
+#define CHROME_BASE_BUFFER_H_
+
+// Buffer is a simple std::string-like class for buffering up IPC messages. Its
+// main distinguishing characteristic is the trade_bytes function.
+class Buffer {
+public:
+  Buffer();
+  ~Buffer();
+
+  bool empty() const;
+  const char* data() const;
+  size_t size() const;
+
+  void clear();
+  void append(const char* bytes, size_t length);
+  void assign(const char* bytes, size_t length);
+  void erase(size_t start, size_t count);
+
+  void reserve(size_t size);
+
+  // This function should be used by a caller who wants to extract the first
+  // |count| bytes from the buffer. Rather than copying the bytes out, this
+  // function returns the entire buffer. The bytes in range [count, size()) are
+  // copied out to a new buffer which becomes the current buffer. The
+  // presumption is that |count| is very large and approximately equal to size()
+  // so not much needs to be copied.
+  char* trade_bytes(size_t count);
+
+private:
+  void try_realloc(size_t newlength);
+
+  char* mBuffer;
+  size_t mSize;
+  size_t mReserved;
+};
+
+#endif // CHROME_BASE_BUFFER_H_
--- a/ipc/chromium/src/base/pickle.cc
+++ b/ipc/chromium/src/base/pickle.cc
@@ -117,20 +117,20 @@ Pickle::Pickle(int header_size)
   DCHECK(header_size <= kPayloadUnit);
   Resize(kPayloadUnit);
   if (!header_) {
     NS_ABORT_OOM(kPayloadUnit);
   }
   header_->payload_size = 0;
 }
 
-Pickle::Pickle(const char* data, int data_len)
+Pickle::Pickle(const char* data, int data_len, Ownership ownership)
     : header_(reinterpret_cast<Header*>(const_cast<char*>(data))),
       header_size_(0),
-      capacity_(kCapacityReadOnly),
+      capacity_(ownership == BORROWS ? kCapacityReadOnly : data_len),
       variable_buffer_offset_(0) {
   if (data_len >= static_cast<int>(sizeof(Header)))
     header_size_ = data_len - header_->payload_size;
 
   if (header_size_ > static_cast<unsigned int>(data_len))
     header_size_ = 0;
 
   if (header_size_ != AlignInt(header_size_))
@@ -657,8 +657,28 @@ const char* Pickle::FindNext(uint32_t he
     return nullptr;
 
   const Header* hdr = reinterpret_cast<const Header*>(start);
   if (length < header_size || length - header_size < hdr->payload_size)
     return nullptr;
 
   return start + header_size + hdr->payload_size;
 }
+
+// static
+uint32_t Pickle::GetLength(uint32_t header_size,
+			   const char* start,
+			   const char* end) {
+  DCHECK(header_size == AlignInt(header_size));
+  DCHECK(header_size <= static_cast<memberAlignmentType>(kPayloadUnit));
+
+  if (end < start)
+    return 0;
+  size_t length = static_cast<size_t>(end - start);
+  if (length < sizeof(Header))
+    return 0;
+
+  const Header* hdr = reinterpret_cast<const Header*>(start);
+  if (length < header_size)
+    return 0;
+
+  return header_size + hdr->payload_size;
+}
--- a/ipc/chromium/src/base/pickle.h
+++ b/ipc/chromium/src/base/pickle.h
@@ -27,31 +27,38 @@
 //
 // The Pickle's data has a header which contains the size of the Pickle's
 // payload.  It can optionally support additional space in the header.  That
 // space is controlled by the header_size parameter passed to the Pickle
 // constructor.
 //
 class Pickle {
  public:
+  enum Ownership {
+    BORROWS,
+    OWNS,
+  };
+
   ~Pickle();
 
   // Initialize a Pickle object using the default header size.
   Pickle();
 
   // Initialize a Pickle object with the specified header size in bytes, which
   // must be greater-than-or-equal-to sizeof(Pickle::Header).  The header size
   // will be rounded up to ensure that the header size is 32bit-aligned.
   explicit Pickle(int header_size);
 
-  // Initializes a Pickle from a const block of data.  The data is not copied;
-  // instead the data is merely referenced by this Pickle.  Only const methods
-  // should be used on the Pickle when initialized this way.  The header
-  // padding size is deduced from the data length.
-  Pickle(const char* data, int data_len);
+  // Initializes a Pickle from a const block of data. If ownership == BORROWS,
+  // the data is not copied; instead the data is merely referenced by this
+  // Pickle. Only const methods should be used on the Pickle when initialized
+  // this way. The header padding size is deduced from the data length.  If
+  // ownership == OWNS, then again no copying takes place. However, the buffer
+  // is writable and will be freed when this Pickle is destroyed.
+  Pickle(const char* data, int data_len, Ownership ownership = BORROWS);
 
   // Initializes a Pickle as a deep copy of another Pickle.
   Pickle(const Pickle& other);
 
   Pickle(Pickle&& other);
 
   // Performs a deep copy.
   Pickle& operator=(const Pickle& other);
@@ -278,16 +285,22 @@ class Pickle {
   }
 
   // Find the end of the pickled data that starts at range_start.  Returns NULL
   // if the entire Pickle is not found in the given data range.
   static const char* FindNext(uint32_t header_size,
                               const char* range_start,
                               const char* range_end);
 
+  // If the given range contains at least header_size bytes, return the length
+  // of the pickled data including the header.
+  static uint32_t GetLength(uint32_t header_size,
+                            const char* range_start,
+                            const char* range_end);
+
   // The allocation granularity of the payload.
   static const int kPayloadUnit;
 
  private:
   Header* header_;
   uint32_t header_size_;
   uint32_t capacity_;
   uint32_t variable_buffer_offset_;
--- a/ipc/chromium/src/chrome/common/child_process_host.cc
+++ b/ipc/chromium/src/chrome/common/child_process_host.cc
@@ -149,23 +149,23 @@ void ChildProcessHost::OnWaitableEventSi
 #endif
 }
 
 ChildProcessHost::ListenerHook::ListenerHook(ChildProcessHost* host)
     : host_(host) {
 }
 
 void ChildProcessHost::ListenerHook::OnMessageReceived(
-    const IPC::Message& msg) {
+    IPC::Message&& msg) {
 
   bool msg_is_ok = true;
   bool handled = false;
 
   if (!handled) {
-      host_->OnMessageReceived(msg);
+      host_->OnMessageReceived(mozilla::Move(msg));
   }
 
   if (!msg_is_ok)
     base::KillProcess(host_->handle(), ResultCodes::KILLED_BAD_MESSAGE, false);
 
 }
 
 void ChildProcessHost::ListenerHook::OnChannelConnected(int32_t peer_pid) {
--- a/ipc/chromium/src/chrome/common/child_process_host.h
+++ b/ipc/chromium/src/chrome/common/child_process_host.h
@@ -70,17 +70,17 @@ class ChildProcessHost :
   // Once the subclass gets a handle to the process, it needs to tell
   // ChildProcessHost using this function.
   void SetHandle(base::ProcessHandle handle);
 
   // Notifies us that an instance has been created on this child process.
   void InstanceCreated();
 
   // IPC::Channel::Listener implementation:
-  virtual void OnMessageReceived(const IPC::Message& msg) { }
+  virtual void OnMessageReceived(IPC::Message&& msg) { }
   virtual void OnChannelConnected(int32_t peer_pid) { }
   virtual void OnChannelError() { }
 
   bool opening_channel() { return opening_channel_; }
   const std::wstring& channel_id() { return channel_id_; }
 
   base::WaitableEvent* GetProcessEvent() { return process_event_.get(); }
 
@@ -97,17 +97,17 @@ class ChildProcessHost :
 
  private:
   // By using an internal class as the IPC::Channel::Listener, we can intercept
   // OnMessageReceived/OnChannelConnected and do our own processing before
   // calling the subclass' implementation.
   class ListenerHook : public IPC::Channel::Listener {
    public:
     explicit ListenerHook(ChildProcessHost* host);
-    virtual void OnMessageReceived(const IPC::Message& msg);
+    virtual void OnMessageReceived(IPC::Message&& msg);
     virtual void OnChannelConnected(int32_t peer_pid);
     virtual void OnChannelError();
     virtual void GetQueuedMessages(std::queue<IPC::Message>& queue);
    private:
     ChildProcessHost* host_;
   };
 
   ListenerHook listener_;
--- a/ipc/chromium/src/chrome/common/child_thread.cc
+++ b/ipc/chromium/src/chrome/common/child_thread.cc
@@ -69,21 +69,21 @@ void ChildThread::AddRoute(int32_t routi
 }
 
 void ChildThread::RemoveRoute(int32_t routing_id) {
   DCHECK(MessageLoop::current() == message_loop());
 
   router_.RemoveRoute(routing_id);
 }
 
-void ChildThread::OnMessageReceived(const IPC::Message& msg) {
+void ChildThread::OnMessageReceived(IPC::Message&& msg) {
   if (msg.routing_id() == MSG_ROUTING_CONTROL) {
     OnControlMessageReceived(msg);
   } else {
-    router_.OnMessageReceived(msg);
+    router_.OnMessageReceived(mozilla::Move(msg));
   }
 }
 
 ChildThread* ChildThread::current() {
   return ChildProcess::current()->child_thread();
 }
 
 void ChildThread::Init() {
--- a/ipc/chromium/src/chrome/common/child_thread.h
+++ b/ipc/chromium/src/chrome/common/child_thread.h
@@ -54,17 +54,17 @@ class ChildThread : public IPC::Channel:
   IPC::Channel* channel() { return channel_.get(); }
 
   // Thread implementation.
   virtual void Init();
   virtual void CleanUp();
 
  private:
   // IPC::Channel::Listener implementation:
-  virtual void OnMessageReceived(const IPC::Message& msg);
+  virtual void OnMessageReceived(IPC::Message&& msg);
   virtual void OnChannelError();
 
 #ifdef MOZ_NUWA_PROCESS
   static void MarkThread();
 #endif
 
   // The message loop used to run tasks on the thread that started this thread.
   MessageLoop* owner_loop_;
--- a/ipc/chromium/src/chrome/common/ipc_channel.h
+++ b/ipc/chromium/src/chrome/common/ipc_channel.h
@@ -20,17 +20,17 @@ class Channel : public Message::Sender {
 
  public:
   // Implemented by consumers of a Channel to receive messages.
   class Listener {
    public:
     virtual ~Listener() {}
 
     // Called when a message is received.
-    virtual void OnMessageReceived(const Message& message) = 0;
+    virtual void OnMessageReceived(Message&& message) = 0;
 
     // Called when the channel is connected and we have received the internal
     // Hello message from the peer.
     virtual void OnChannelConnected(int32_t peer_pid) {}
 
     // Called when an error is detected that causes the channel to close.
     // This method is not called when a channel is closed normally.
     virtual void OnChannelError() {}
@@ -46,17 +46,20 @@ class Channel : public Message::Sender {
   };
 
   enum {
     // The maximum message size in bytes. Attempting to receive a
     // message of this size or bigger results in a channel error.
     kMaximumMessageSize = 256 * 1024 * 1024,
 
     // Ammount of data to read at once from the pipe.
-    kReadBufferSize = 4 * 1024
+    kReadBufferSize = 4 * 1024,
+
+    // Maximum size of a message that we allow to be copied (rather than moved).
+    kMaxCopySize = 32 * 1024,
   };
 
   // Initialize a Channel.
   //
   // |channel_id| identifies the communication Channel.
   // |mode| specifies whether this Channel is to operate in server mode or
   // client mode.  In server mode, the Channel is responsible for setting up the
   // IPC object, whereas in client mode, the Channel merely connects to the
--- a/ipc/chromium/src/chrome/common/ipc_channel_posix.cc
+++ b/ipc/chromium/src/chrome/common/ipc_channel_posix.cc
@@ -382,28 +382,17 @@ bool Channel::ChannelImpl::EnqueueHelloM
   }
 
   OutputQueuePush(msg.release());
   return true;
 }
 
 void Channel::ChannelImpl::ClearAndShrinkInputOverflowBuf()
 {
-  // If input_overflow_buf_ has grown, shrink it back to its normal size.
-  static size_t previousCapacityAfterClearing = 0;
-  if (input_overflow_buf_.capacity() > previousCapacityAfterClearing) {
-    // This swap trick is the closest thing C++ has to a guaranteed way
-    // to shrink the capacity of a string.
-    std::string tmp;
-    tmp.reserve(Channel::kReadBufferSize);
-    input_overflow_buf_.swap(tmp);
-    previousCapacityAfterClearing = input_overflow_buf_.capacity();
-  } else {
-    input_overflow_buf_.clear();
-  }
+  input_overflow_buf_.clear();
 }
 
 bool Channel::ChannelImpl::Connect() {
   if (mode_ == MODE_SERVER && uses_fifo_) {
     if (server_listen_pipe_ == -1) {
       return false;
     }
     MessageLoopForIO::current()->WatchFileDescriptor(
@@ -526,19 +515,33 @@ bool Channel::ChannelImpl::ProcessIncomi
       end = p + bytes_read;
     } else {
       if (input_overflow_buf_.size() >
          static_cast<size_t>(kMaximumMessageSize - bytes_read)) {
         ClearAndShrinkInputOverflowBuf();
         CHROMIUM_LOG(ERROR) << "IPC message is too big";
         return false;
       }
+
       input_overflow_buf_.append(input_buf_, bytes_read);
       overflowp = p = input_overflow_buf_.data();
       end = p + input_overflow_buf_.size();
+
+      // If we've received the entire header, then we know the message
+      // length. In that case, reserve enough space to hold the entire
+      // message. This is more efficient than repeatedly enlarging the buffer as
+      // more data comes in.
+      uint32_t length = Message::GetLength(p, end);
+      if (length) {
+        input_overflow_buf_.reserve(length + kReadBufferSize);
+
+        // Recompute these pointers in case the buffer moved.
+        overflowp = p = input_overflow_buf_.data();
+        end = p + input_overflow_buf_.size();
+      }
     }
 
     // A pointer to an array of |num_fds| file descriptors which includes any
     // fds that have spilled over from a previous read.
     const int* fds;
     unsigned num_fds;
     unsigned fds_i = 0;  // the index of the first unused descriptor
 
@@ -553,17 +556,41 @@ bool Channel::ChannelImpl::ProcessIncomi
       fds = &input_overflow_fds_[0];
       num_fds = input_overflow_fds_.size();
     }
 
     while (p < end) {
       const char* message_tail = Message::FindNext(p, end);
       if (message_tail) {
         int len = static_cast<int>(message_tail - p);
-        Message m(p, len);
+        char* buf;
+
+        // The Message |m| allocated below needs to own its data. We can either
+        // copy the data out of the buffer or else steal the buffer and move the
+        // remaining data elsewhere. If len is large enough, we steal. Otherwise
+        // we copy.
+        if (len > kMaxCopySize) {
+          // Since len > kMaxCopySize > kReadBufferSize, we know that we must be
+          // using the overflow buffer. And since we always shift everything to
+          // the left at the end of a read, we must be at the start of the
+          // overflow buffer.
+          MOZ_RELEASE_ASSERT(p == overflowp);
+          buf = input_overflow_buf_.trade_bytes(len);
+
+          // At this point the remaining data is at the from of
+          // input_overflow_buf_. p will get fixed up at the end of the
+          // loop. Set it to null here to make sure no one uses it.
+          p = nullptr;
+          overflowp = message_tail = input_overflow_buf_.data();
+          end = overflowp + input_overflow_buf_.size();
+        } else {
+          buf = (char*)malloc(len);
+          memcpy(buf, p, len);
+        }
+        Message m(buf, len, Message::OWNS);
         if (m.header()->num_fds) {
           // the message has file descriptors
           const char* error = NULL;
           if (m.header()->num_fds > num_fds - fds_i) {
             // the message has been completely received, but we didn't get
             // enough file descriptors.
             error = "Message needs unreceived descriptors";
           }
@@ -622,17 +649,17 @@ bool Channel::ChannelImpl::ProcessIncomi
           listener_->OnChannelConnected(MessageIterator(m).NextInt());
 #if defined(OS_MACOSX)
         } else if (m.routing_id() == MSG_ROUTING_NONE &&
                    m.type() == RECEIVED_FDS_MESSAGE_TYPE) {
           DCHECK(m.fd_cookie() != 0);
           CloseDescriptors(m.fd_cookie());
 #endif
         } else {
-          listener_->OnMessageReceived(m);
+          listener_->OnMessageReceived(mozilla::Move(m));
         }
         p = message_tail;
       } else {
         // Last message is partial.
         break;
       }
     }
     if (end == p) {
--- a/ipc/chromium/src/chrome/common/ipc_channel_posix.h
+++ b/ipc/chromium/src/chrome/common/ipc_channel_posix.h
@@ -9,16 +9,17 @@
 
 #include <sys/socket.h>  // for CMSG macros
 
 #include <queue>
 #include <string>
 #include <vector>
 #include <list>
 
+#include "base/buffer.h"
 #include "base/message_loop.h"
 #include "chrome/common/file_descriptor_set_posix.h"
 
 #include "nsAutoPtr.h"
 
 namespace IPC {
 
 // An implementation of ChannelImpl for POSIX systems that works via
@@ -126,17 +127,17 @@ class Channel::ChannelImpl : public Mess
   // kReadBufferSize / sizeof(int) file descriptors. Since a file descriptor
   // takes sizeof(int) bytes, the control buffer must be
   // Channel::kReadBufferSize bytes. We add kControlBufferSlopBytes bytes
   // for the control header.
   char input_cmsg_buf_[Channel::kReadBufferSize + kControlBufferSlopBytes];
 
   // Large messages that span multiple pipe buffers, get built-up using
   // this buffer.
-  std::string input_overflow_buf_;
+  Buffer input_overflow_buf_;
   std::vector<int> input_overflow_fds_;
 
   // In server-mode, we have to wait for the client to connect before we
   // can begin reading.  We make use of the input_state_ when performing
   // the connect operation in overlapped mode.
   bool waiting_connect_;
 
   // This flag is set when processing incoming messages.  It is used to
--- a/ipc/chromium/src/chrome/common/ipc_channel_proxy.cc
+++ b/ipc/chromium/src/chrome/common/ipc_channel_proxy.cc
@@ -29,25 +29,25 @@ void ChannelProxy::Context::CreateChanne
   DCHECK(channel_ == NULL);
   channel_id_ = id;
   channel_ = new Channel(id, mode, this);
 }
 
 bool ChannelProxy::Context::TryFilters(const Message& message) {
 
   for (size_t i = 0; i < filters_.size(); ++i) {
-    if (filters_[i]->OnMessageReceived(message)) {
+      if (filters_[i]->OnMessageReceived(Message(message))) {
       return true;
     }
   }
   return false;
 }
 
 // Called on the IPC::Channel thread
-void ChannelProxy::Context::OnMessageReceived(const Message& message) {
+void ChannelProxy::Context::OnMessageReceived(Message&& message) {
   // First give a chance to the filters to process this message.
   if (!TryFilters(message))
     OnMessageReceivedNoFilter(message);
 }
 
 // Called on the IPC::Channel thread
 void ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
   // NOTE: This code relies on the listener's message loop not going away while
@@ -154,17 +154,17 @@ void ChannelProxy::Context::OnRemoveFilt
 // Called on the listener's thread
 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
   if (!listener_)
     return;
 
   OnDispatchConnected();
 
 
-  listener_->OnMessageReceived(message);
+  listener_->OnMessageReceived(Message(message));
 
 }
 
 // Called on the listener's thread
 void ChannelProxy::Context::OnDispatchConnected() {
   if (channel_connected_called_)
     return;
 
--- a/ipc/chromium/src/chrome/common/ipc_channel_proxy.h
+++ b/ipc/chromium/src/chrome/common/ipc_channel_proxy.h
@@ -70,17 +70,17 @@ class ChannelProxy : public Message::Sen
     virtual void OnChannelError() {}
 
     // Called to inform the filter that the IPC channel will be destroyed.
     // OnFilterRemoved is called immediately after this.
     virtual void OnChannelClosing() {}
 
     // Return true to indicate that the message was handled, or false to let
     // the message be handled in the default way.
-    virtual bool OnMessageReceived(const Message& message) {
+    virtual bool OnMessageReceived(Message&& message) {
       return false;
     }
    protected:
     virtual ~MessageFilter() {}
   };
 
   // Initializes a channel proxy.  The channel_id and mode parameters are
   // passed directly to the underlying IPC::Channel.  The listener is called on
@@ -147,17 +147,17 @@ class ChannelProxy : public Message::Sen
 
     // Dispatches a message on the listener thread.
     void OnDispatchMessage(const Message& message);
 
    protected:
     virtual ~Context() {}
 
     // IPC::Channel::Listener methods:
-    virtual void OnMessageReceived(const Message& message);
+    virtual void OnMessageReceived(Message&& message);
     virtual void OnChannelConnected(int32_t peer_pid);
     virtual void OnChannelError();
 
     // Like OnMessageReceived but doesn't try the filters.
     void OnMessageReceivedNoFilter(const Message& message);
 
     // Gives the filters a chance at processing |message|.
     // Returns true if the message was processed, false otherwise.
--- a/ipc/chromium/src/chrome/common/ipc_channel_win.cc
+++ b/ipc/chromium/src/chrome/common/ipc_channel_win.cc
@@ -345,26 +345,63 @@ bool Channel::ChannelImpl::ProcessIncomi
       p = input_buf_;
       end = p + bytes_read;
     } else {
       if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) {
         input_overflow_buf_.clear();
         CHROMIUM_LOG(ERROR) << "IPC message is too big";
         return false;
       }
+
       input_overflow_buf_.append(input_buf_, bytes_read);
       p = input_overflow_buf_.data();
       end = p + input_overflow_buf_.size();
+
+      // If we've received the entire header, then we know the message
+      // length. In that case, reserve enough space to hold the entire
+      // message. This is more efficient than repeatedly enlarging the buffer as
+      // more data comes in.
+      uint32_t length = Message::GetLength(p, end);
+      if (length) {
+        input_overflow_buf_.reserve(length + kReadBufferSize);
+
+        // Recompute these pointers in case the buffer moved.
+        p = input_overflow_buf_.data();
+        end = p + input_overflow_buf_.size();
+      }
     }
 
     while (p < end) {
       const char* message_tail = Message::FindNext(p, end);
       if (message_tail) {
         int len = static_cast<int>(message_tail - p);
-        const Message m(p, len);
+        char* buf;
+
+        // The Message |m| allocated below needs to own its data. We can either
+        // copy the data out of the buffer or else steal the buffer and move the
+        // remaining data elsewhere. If len is large enough, we steal. Otherwise
+        // we copy.
+        if (len > kMaxCopySize) {
+          // Since len > kMaxCopySize > kReadBufferSize, we know that we must be
+          // using the overflow buffer. And since we always shift everything to
+          // the left at the end of a read, we must be at the start of the
+          // overflow buffer.
+          buf = input_overflow_buf_.trade_bytes(len);
+
+          // At this point the remaining data is at the from of
+          // input_overflow_buf_. p will get fixed up at the end of the
+          // loop. Set it to null here to make sure no one uses it.
+          p = nullptr;
+          message_tail = input_overflow_buf_.data();
+          end = message_tail + input_overflow_buf_.size();
+        } else {
+          buf = (char*)malloc(len);
+          memcpy(buf, p, len);
+        }
+        Message m(buf, len, Message::OWNS);
 #ifdef IPC_MESSAGE_DEBUG_EXTRA
         DLOG(INFO) << "received message on channel @" << this <<
                       " with type " << m.type();
 #endif
         if (m.routing_id() == MSG_ROUTING_NONE &&
             m.type() == HELLO_MESSAGE_TYPE) {
           // The Hello message contains the process id and must include the
           // shared secret, if we are waiting for it.
@@ -375,25 +412,29 @@ bool Channel::ChannelImpl::ProcessIncomi
             // Something went wrong. Abort connection.
             Close();
             listener_->OnChannelError();
             return false;
           }
           waiting_for_shared_secret_ = false;
           listener_->OnChannelConnected(claimed_pid);
         } else {
-          listener_->OnMessageReceived(m);
+          listener_->OnMessageReceived(mozilla::Move(m));
         }
         p = message_tail;
       } else {
         // Last message is partial.
         break;
       }
     }
-    input_overflow_buf_.assign(p, end - p);
+    if (p != input_overflow_buf_.data()) {
+      // Don't assign unless we have to since this will throw away any memory we
+      // might have reserved.
+      input_overflow_buf_.assign(p, end - p);
+    }
 
     bytes_read = 0;  // Get more data.
   }
 
   return true;
 }
 
 bool Channel::ChannelImpl::ProcessOutgoingMessages(
--- a/ipc/chromium/src/chrome/common/ipc_channel_win.h
+++ b/ipc/chromium/src/chrome/common/ipc_channel_win.h
@@ -5,16 +5,17 @@
 #ifndef CHROME_COMMON_IPC_CHANNEL_WIN_H_
 #define CHROME_COMMON_IPC_CHANNEL_WIN_H_
 
 #include "chrome/common/ipc_channel.h"
 
 #include <queue>
 #include <string>
 
+#include "base/buffer.h"
 #include "base/message_loop.h"
 #include "mozilla/UniquePtr.h"
 
 class NonThreadSafe;
 
 namespace IPC {
 
 class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
@@ -81,17 +82,17 @@ class Channel::ChannelImpl : public Mess
   // Messages to be sent are queued here.
   std::queue<Message*> output_queue_;
 
   // We read from the pipe into this buffer
   char input_buf_[Channel::kReadBufferSize];
 
   // Large messages that span multiple pipe buffers, get built-up using
   // this buffer.
-  std::string input_overflow_buf_;
+  Buffer input_overflow_buf_;
 
   // In server-mode, we have to wait for the client to connect before we
   // can begin reading.  We make use of the input_state_ when performing
   // the connect operation in overlapped mode.
   bool waiting_connect_;
 
   // This flag is set when processing incoming messages.  It is used to
   // avoid recursing through ProcessIncomingMessages, which could cause
--- a/ipc/chromium/src/chrome/common/ipc_message.cc
+++ b/ipc/chromium/src/chrome/common/ipc_message.cc
@@ -66,17 +66,19 @@ Message::Message(int32_t routing_id, msg
 #ifdef MOZ_TASK_TRACER
   header()->source_event_id = 0;
   header()->parent_task_id = 0;
   header()->source_event_type = SourceEventType::Unknown;
 #endif
   InitLoggingVariables(aName);
 }
 
-Message::Message(const char* data, int data_len) : Pickle(data, data_len) {
+Message::Message(const char* data, int data_len, Ownership ownership)
+  : Pickle(data, data_len, ownership)
+{
   MOZ_COUNT_CTOR(IPC::Message);
   InitLoggingVariables();
 }
 
 Message::Message(const Message& other) : Pickle(other) {
   MOZ_COUNT_CTOR(IPC::Message);
   InitLoggingVariables(other.name_);
 #if defined(OS_POSIX)
--- a/ipc/chromium/src/chrome/common/ipc_message.h
+++ b/ipc/chromium/src/chrome/common/ipc_message.h
@@ -65,20 +65,22 @@ class Message : public Pickle {
   Message();
 
   // Initialize a message with a user-defined type, priority value, and
   // destination WebView ID.
   Message(int32_t routing_id, msgid_t type, PriorityValue priority,
           MessageCompression compression = COMPRESSION_NONE,
           const char* const name="???");
 
-  // Initializes a message from a const block of data.  The data is not copied;
-  // instead the data is merely referenced by this message.  Only const methods
-  // should be used on the message when initialized this way.
-  Message(const char* data, int data_len);
+  // Initializes a message from a const block of data. If ownership == BORROWS,
+  // the data is not copied; instead the data is merely referenced by this
+  // message. Only const methods should be used on the message when initialized
+  // this way. If ownership == OWNS, then again no copying takes place. However,
+  // the buffer is writable and will be freed when the message is destroyed.
+  Message(const char* data, int data_len, Ownership ownership = BORROWS);
 
   Message(const Message& other);
   Message(Message&& other);
   Message& operator=(const Message& other);
   Message& operator=(Message&& other);
 
   PriorityValue priority() const {
     return static_cast<PriorityValue>(header()->flags & PRIORITY_MASK);
@@ -237,16 +239,22 @@ class Message : public Pickle {
   }
 
   // Find the end of the message data that starts at range_start.  Returns NULL
   // if the entire message is not found in the given data range.
   static const char* FindNext(const char* range_start, const char* range_end) {
     return Pickle::FindNext(sizeof(Header), range_start, range_end);
   }
 
+  // If the given range contains at least header_size bytes, return the length
+  // of the message including the header.
+  static uint32_t GetLength(const char* range_start, const char* range_end) {
+    return Pickle::GetLength(sizeof(Header), range_start, range_end);
+  }
+
 #if defined(OS_POSIX)
   // On POSIX, a message supports reading / writing FileDescriptor objects.
   // This is used to pass a file descriptor to the peer of an IPC channel.
 
   // Add a descriptor to the end of the set. Returns false iff the set is full.
   bool WriteFileDescriptor(const base::FileDescriptor& descriptor);
   // Get a file descriptor from the message. Returns false on error.
   //   iter: a Pickle iterator to the current location in the message.
--- a/ipc/chromium/src/chrome/common/ipc_sync_channel.cc
+++ b/ipc/chromium/src/chrome/common/ipc_sync_channel.cc
@@ -266,17 +266,17 @@ bool SyncChannel::SyncContext::TryToUnbl
 
 void SyncChannel::SyncContext::Clear() {
   CancelPendingSends();
   received_sync_msgs_->RemoveContext(this);
 
   Context::Clear();
 }
 
-void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
+void SyncChannel::SyncContext::OnMessageReceived(Message&& msg) {
   // Give the filters a chance at processing this message.
   if (TryFilters(msg))
     return;
 
   if (TryToUnblockListener(&msg))
     return;
 
   if (msg.should_unblock()) {
--- a/ipc/chromium/src/chrome/common/ipc_sync_channel.h
+++ b/ipc/chromium/src/chrome/common/ipc_sync_channel.h
@@ -92,17 +92,17 @@ class SyncChannel : public ChannelProxy,
 
    private:
     // IPC::ChannelProxy methods that we override.
 
     // Called on the listener thread.
    virtual void Clear();
 
     // Called on the IPC thread.
-    virtual void OnMessageReceived(const Message& msg);
+    virtual void OnMessageReceived(Message&& msg);
     virtual void OnChannelError();
     virtual void OnChannelOpened();
     virtual void OnChannelClosed();
 
     // Cancels all pending Send calls.
     void CancelPendingSends();
 
     // WaitableEventWatcher::Delegate implementation.
--- a/ipc/chromium/src/chrome/common/message_router.cc
+++ b/ipc/chromium/src/chrome/common/message_router.cc
@@ -19,24 +19,24 @@ void MessageRouter::AddRoute(int32_t rou
                              IPC::Channel::Listener* listener) {
   routes_.AddWithID(listener, routing_id);
 }
 
 void MessageRouter::RemoveRoute(int32_t routing_id) {
   routes_.Remove(routing_id);
 }
 
-void MessageRouter::OnMessageReceived(const IPC::Message& msg) {
+void MessageRouter::OnMessageReceived(IPC::Message&& msg) {
   if (msg.routing_id() == MSG_ROUTING_CONTROL) {
     OnControlMessageReceived(msg);
   } else {
     RouteMessage(msg);
   }
 }
 
 bool MessageRouter::RouteMessage(const IPC::Message& msg) {
   IPC::Channel::Listener* listener = routes_.Lookup(msg.routing_id());
   if (!listener)
     return false;
 
-  listener->OnMessageReceived(msg);
+  listener->OnMessageReceived(IPC::Message(msg));
   return true;
 }
--- a/ipc/chromium/src/chrome/common/message_router.h
+++ b/ipc/chromium/src/chrome/common/message_router.h
@@ -31,17 +31,17 @@ class MessageRouter : public IPC::Channe
  public:
   MessageRouter() {}
   virtual ~MessageRouter() {}
 
   // Implemented by subclasses to handle control messages
   virtual void OnControlMessageReceived(const IPC::Message& msg);
 
   // IPC::Channel::Listener implementation:
-  virtual void OnMessageReceived(const IPC::Message& msg);
+  virtual void OnMessageReceived(IPC::Message&& msg);
 
   // Like OnMessageReceived, except it only handles routed messages.  Returns
   // true if the message was dispatched, or false if there was no listener for
   // that route id.
   virtual bool RouteMessage(const IPC::Message& msg);
 
   // IPC::Message::Sender implementation:
   virtual bool Send(IPC::Message* msg);
--- a/ipc/glue/GeckoChildProcessHost.cpp
+++ b/ipc/glue/GeckoChildProcessHost.cpp
@@ -1099,21 +1099,21 @@ GeckoChildProcessHost::OnChannelConnecte
   {
     MonitorAutoLock lock(mMonitor);
     mProcessState = PROCESS_CONNECTED;
     lock.Notify();
   }
 }
 
 void
-GeckoChildProcessHost::OnMessageReceived(const IPC::Message& aMsg)
+GeckoChildProcessHost::OnMessageReceived(IPC::Message&& aMsg)
 {
   // We never process messages ourself, just save them up for the next
   // listener.
-  mQueue.push(aMsg);
+  mQueue.push(Move(aMsg));
 }
 
 void
 GeckoChildProcessHost::OnChannelError()
 {
   // Update the process state to an error state if we have a channel
   // error before we're connected. This fixes certain failures,
   // but does not address the full range of possible issues described
--- a/ipc/glue/GeckoChildProcessHost.h
+++ b/ipc/glue/GeckoChildProcessHost.h
@@ -77,17 +77,17 @@ public:
   bool SyncLaunch(StringVector aExtraOpts=StringVector(),
                   int32_t timeoutMs=0,
                   base::ProcessArchitecture arch=base::GetCurrentProcessArchitecture());
 
   virtual bool PerformAsyncLaunch(StringVector aExtraOpts=StringVector(),
                                   base::ProcessArchitecture aArch=base::GetCurrentProcessArchitecture());
 
   virtual void OnChannelConnected(int32_t peer_pid);
-  virtual void OnMessageReceived(const IPC::Message& aMsg);
+  virtual void OnMessageReceived(IPC::Message&& aMsg);
   virtual void OnChannelError();
   virtual void GetQueuedMessages(std::queue<IPC::Message>& queue);
 
   virtual void InitializeChannel();
 
   virtual bool CanShutdown() { return true; }
 
   virtual void OnWaitableEventSignaled(base::WaitableEvent *event);
--- a/ipc/glue/MessageChannel.cpp
+++ b/ipc/glue/MessageChannel.cpp
@@ -865,17 +865,17 @@ public:
     MatchingKinds(Message::msgid_t aType, int32_t aRoutingId) :
         mType(aType), mRoutingId(aRoutingId) {}
     bool operator()(const Message &msg) {
         return msg.type() == mType && msg.routing_id() == mRoutingId;
     }
 };
 
 void
-MessageChannel::OnMessageReceivedFromLink(const Message& aMsg)
+MessageChannel::OnMessageReceivedFromLink(Message&& aMsg)
 {
     AssertLinkThread();
     mMonitor->AssertCurrentThreadOwns();
 
     if (MaybeInterceptSpecialIOMessage(aMsg))
         return;
 
     // Regardless of the Interrupt stack, if we're awaiting a sync reply,
@@ -962,17 +962,17 @@ MessageChannel::OnMessageReceivedFromLin
     //
     // (3) We are not waiting on a reply.
     //   - We post a task to the main event loop.
     //
     // Note that, we may notify the main thread even though the monitor is not
     // blocked. This is okay, since we always check for pending events before
     // blocking again.
 
-    mPending.push_back(aMsg);
+    mPending.push_back(Move(aMsg));
 
     if (shouldWakeUp) {
         NotifyWorkerThread();
     }
 
     if (shouldPostTask) {
         if (!compress) {
             // If we compressed away the previous message, we'll re-use
--- a/ipc/glue/MessageChannel.h
+++ b/ipc/glue/MessageChannel.h
@@ -420,17 +420,17 @@ class MessageChannel : HasResultCodes
 
     void OnChannelConnected(int32_t peer_id);
 
     // Tell the IO thread to close the channel and wait for it to ACK.
     void SynchronouslyClose();
 
     bool WasTransactionCanceled(int transaction);
     bool ShouldDeferMessage(const Message& aMsg);
-    void OnMessageReceivedFromLink(const Message& aMsg);
+    void OnMessageReceivedFromLink(Message&& aMsg);
     void OnChannelErrorFromLink();
 
   private:
     // Run on the not current thread.
     void NotifyChannelClosed();
     void NotifyMaybeChannelError();
 
   private:
--- a/ipc/glue/MessageLink.cpp
+++ b/ipc/glue/MessageLink.cpp
@@ -266,28 +266,28 @@ ThreadLink::~ThreadLink()
 }
 
 void
 ThreadLink::EchoMessage(Message *msg)
 {
     mChan->AssertWorkerThread();
     mChan->mMonitor->AssertCurrentThreadOwns();
 
-    mChan->OnMessageReceivedFromLink(*msg);
+    mChan->OnMessageReceivedFromLink(Move(*msg));
     delete msg;
 }
 
 void
 ThreadLink::SendMessage(Message *msg)
 {
     mChan->AssertWorkerThread();
     mChan->mMonitor->AssertCurrentThreadOwns();
 
     if (mTargetChan)
-        mTargetChan->OnMessageReceivedFromLink(*msg);
+        mTargetChan->OnMessageReceivedFromLink(Move(*msg));
     delete msg;
 }
 
 void
 ThreadLink::SendClose()
 {
     mChan->AssertWorkerThread();
     mChan->mMonitor->AssertCurrentThreadOwns();
@@ -317,29 +317,29 @@ ThreadLink::Unsound_NumQueuedMessages() 
     return 0;
 }
 
 //
 // The methods below run in the context of the IO thread
 //
 
 void
-ProcessLink::OnMessageReceived(const Message& msg)
+ProcessLink::OnMessageReceived(Message&& msg)
 {
     AssertIOThread();
     NS_ASSERTION(mChan->mChannelState != ChannelError, "Shouldn't get here!");
     MonitorAutoLock lock(*mChan->mMonitor);
-    mChan->OnMessageReceivedFromLink(msg);
+    mChan->OnMessageReceivedFromLink(Move(msg));
 }
 
 void
 ProcessLink::OnEchoMessage(Message* msg)
 {
     AssertIOThread();
-    OnMessageReceived(*msg);
+    OnMessageReceived(Move(*msg));
     delete msg;
 }
 
 void
 ProcessLink::OnChannelOpened()
 {
     AssertIOThread();
 
@@ -376,17 +376,17 @@ ProcessLink::OnTakeConnectedChannel()
         if (mExistingListener) {
             mExistingListener->GetQueuedMessages(pending);
         }
         lock.Notify();
     }
 
     // Dispatch whatever messages the previous listener had queued up.
     while (!pending.empty()) {
-        OnMessageReceived(pending.front());
+        OnMessageReceived(Move(pending.front()));
         pending.pop();
     }
 }
 
 void
 ProcessLink::OnChannelConnected(int32_t peer_pid)
 {
     AssertIOThread();
--- a/ipc/glue/MessageLink.h
+++ b/ipc/glue/MessageLink.h
@@ -199,17 +199,17 @@ class ProcessLink
     // or a pipe error) the chain will be destroyed and the original listener
     // will again be registered.
     void Open(Transport* aTransport, MessageLoop *aIOLoop, Side aSide);
     
     // Run on the I/O thread, only when using inter-process link.
     // These methods acquire the monitor and forward to the
     // similarly named methods in AsyncChannel below
     // (OnMessageReceivedFromLink(), etc)
-    virtual void OnMessageReceived(const Message& msg) override;
+    virtual void OnMessageReceived(Message&& msg) override;
     virtual void OnChannelConnected(int32_t peer_pid) override;
     virtual void OnChannelError() override;
 
     virtual void EchoMessage(Message *msg) override;
     virtual void SendMessage(Message *msg) override;
     virtual void SendClose() override;
 
     virtual bool Unsound_IsClosed() const override;