Bug 1244227 - Add nsIThrottledInputChannel.idl and implement. r=mcmanus
☠☠ backed out by 64e2353eb554 ☠ ☠
authorTom Tromey <tom@tromey.com>
Tue, 23 Feb 2016 14:26:45 -0700
changeset 346341 5a4bb3258978f51463075e7f1c38dd254297120c
parent 346340 f83128e2d4bb8fc3f70dd99a644e94b19e20d4fc
child 346342 f680f6460f07f99ba7deaf3c1e89d3c91864617d
push id6389
push userraliiev@mozilla.com
push dateMon, 19 Sep 2016 13:38:22 +0000
treeherdermozilla-beta@01d67bfe6c81 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersmcmanus
bugs1244227
milestone50.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1244227 - Add nsIThrottledInputChannel.idl and implement. r=mcmanus MozReview-Commit-ID: JVIjxEO901W
netwerk/base/ThrottleQueue.cpp
netwerk/base/ThrottleQueue.h
netwerk/base/moz.build
netwerk/base/nsIThrottledInputChannel.idl
netwerk/build/nsNetCID.h
netwerk/build/nsNetModule.cpp
netwerk/protocol/http/HttpBaseChannel.cpp
netwerk/protocol/http/HttpBaseChannel.h
netwerk/protocol/http/nsHttpTransaction.cpp
netwerk/test/unit/test_throttlechannel.js
netwerk/test/unit/test_throttlequeue.js
netwerk/test/unit/test_throttling.js
netwerk/test/unit/xpcshell.ini
new file mode 100644
--- /dev/null
+++ b/netwerk/base/ThrottleQueue.cpp
@@ -0,0 +1,392 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "ThrottleQueue.h"
+#include "nsISeekableStream.h"
+#include "nsIAsyncInputStream.h"
+#include "nsStreamUtils.h"
+#include "nsNetUtil.h"
+
+namespace mozilla {
+namespace net {
+
+//-----------------------------------------------------------------------------
+
+class ThrottleInputStream final
+  : public nsIAsyncInputStream
+  , public nsISeekableStream
+{
+public:
+
+  ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue);
+
+  NS_DECL_THREADSAFE_ISUPPORTS
+  NS_DECL_NSIINPUTSTREAM
+  NS_DECL_NSISEEKABLESTREAM
+  NS_DECL_NSIASYNCINPUTSTREAM
+
+  void AllowInput();
+
+private:
+
+  ~ThrottleInputStream();
+
+  nsCOMPtr<nsIInputStream> mStream;
+  RefPtr<ThrottleQueue> mQueue;
+  nsresult mClosedStatus;
+
+  nsCOMPtr<nsIInputStreamCallback> mCallback;
+  nsCOMPtr<nsIEventTarget> mEventTarget;
+};
+
+NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream, nsISeekableStream)
+
+ThrottleInputStream::ThrottleInputStream(nsIInputStream *aStream, ThrottleQueue* aQueue)
+  : mStream(aStream)
+  , mQueue(aQueue)
+  , mClosedStatus(NS_OK)
+{
+  MOZ_ASSERT(aQueue != nullptr);
+}
+
+ThrottleInputStream::~ThrottleInputStream()
+{
+  Close();
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Close()
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  if (mQueue) {
+    mQueue->DequeueStream(this);
+    mQueue = nullptr;
+    mClosedStatus = NS_BASE_STREAM_CLOSED;
+  }
+  return mStream->Close();
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Available(uint64_t* aResult)
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  return mStream->Available(aResult);
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult)
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  uint32_t realCount;
+  nsresult rv = mQueue->Available(aCount, &realCount);
+  if (NS_FAILED(rv)) {
+    return rv;
+  }
+
+  if (realCount == 0) {
+    return NS_BASE_STREAM_WOULD_BLOCK;
+  }
+
+  rv = mStream->Read(aBuf, realCount, aResult);
+  if (NS_SUCCEEDED(rv) && *aResult > 0) {
+    mQueue->RecordRead(*aResult);
+  }
+  return rv;
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
+                                  uint32_t aCount, uint32_t* aResult)
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  uint32_t realCount;
+  nsresult rv = mQueue->Available(aCount, &realCount);
+  if (NS_FAILED(rv)) {
+    return rv;
+  }
+
+  if (realCount == 0) {
+    return NS_BASE_STREAM_WOULD_BLOCK;
+  }
+
+  rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult);
+  if (NS_SUCCEEDED(rv) && *aResult > 0) {
+    mQueue->RecordRead(*aResult);
+  }
+  return rv;
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::IsNonBlocking(bool* aNonBlocking)
+{
+  *aNonBlocking = true;
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset)
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
+  if (!sstream) {
+    return NS_ERROR_FAILURE;
+  }
+
+  return sstream->Seek(aWhence, aOffset);
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::Tell(int64_t* aResult)
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
+  if (!sstream) {
+    return NS_ERROR_FAILURE;
+  }
+
+  return sstream->Tell(aResult);
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::SetEOF()
+{
+  if (NS_FAILED(mClosedStatus)) {
+    return mClosedStatus;
+  }
+
+  nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
+  if (!sstream) {
+    return NS_ERROR_FAILURE;
+  }
+
+  return sstream->SetEOF();
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::CloseWithStatus(nsresult aStatus)
+{
+  if (NS_FAILED(mClosedStatus)) {
+    // Already closed, ignore.
+    return NS_OK;
+  }
+  if (NS_SUCCEEDED(aStatus)) {
+    aStatus = NS_BASE_STREAM_CLOSED;
+  }
+
+  mClosedStatus = Close();
+  if (NS_SUCCEEDED(mClosedStatus)) {
+    mClosedStatus = aStatus;
+  }
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleInputStream::AsyncWait(nsIInputStreamCallback *aCallback,
+                               uint32_t aFlags,
+                               uint32_t aRequestedCount,
+                               nsIEventTarget *aEventTarget)
+{
+  if (aFlags != 0) {
+    return NS_ERROR_ILLEGAL_VALUE;
+  }
+
+  mCallback = aCallback;
+  mEventTarget = aEventTarget;
+  if (mCallback) {
+    mQueue->QueueStream(this);
+  } else {
+    mQueue->DequeueStream(this);
+  }
+  return NS_OK;
+}
+
+void
+ThrottleInputStream::AllowInput()
+{
+  MOZ_ASSERT(mCallback);
+  nsCOMPtr<nsIInputStreamCallback> callbackEvent =
+    NS_NewInputStreamReadyEvent(mCallback, mEventTarget);
+  mCallback = nullptr;
+  mEventTarget = nullptr;
+  callbackEvent->OnInputStreamReady(this);
+}
+
+//-----------------------------------------------------------------------------
+
+NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback)
+
+ThrottleQueue::ThrottleQueue()
+  : mMeanBytesPerSecond(0)
+  , mMaxBytesPerSecond(0)
+  , mBytesProcessed(0)
+  , mTimerArmed(false)
+{
+  nsresult rv;
+  nsCOMPtr<nsIEventTarget> sts;
+  nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
+  if (NS_SUCCEEDED(rv))
+    sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
+  if (NS_SUCCEEDED(rv))
+    mTimer = do_CreateInstance("@mozilla.org/timer;1");
+  if (mTimer)
+    mTimer->SetTarget(sts);
+}
+
+ThrottleQueue::~ThrottleQueue()
+{
+  if (mTimer && mTimerArmed) {
+    mTimer->Cancel();
+  }
+  mTimer = nullptr;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::RecordRead(uint32_t aBytesRead)
+{
+  MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
+  ThrottleEntry entry;
+  entry.mTime = TimeStamp::Now();
+  entry.mBytesRead = aBytesRead;
+  mReadEvents.AppendElement(entry);
+  mBytesProcessed += aBytesRead;
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable)
+{
+  MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
+  TimeStamp now = TimeStamp::Now();
+  TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1);
+  size_t i;
+
+  // Remove all stale events.
+  for (i = 0; i < mReadEvents.Length(); ++i) {
+    if (mReadEvents[i].mTime >= oneSecondAgo) {
+      break;
+    }
+  }
+  mReadEvents.RemoveElementsAt(0, i);
+
+  uint32_t totalBytes = 0;
+  for (i = 0; i < mReadEvents.Length(); ++i) {
+    totalBytes += mReadEvents[i].mBytesRead;
+  }
+
+  uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond;
+  double prob = static_cast<double>(rand()) / RAND_MAX;
+  uint32_t thisSliceBytes = mMeanBytesPerSecond - spread +
+    static_cast<uint32_t>(2 * spread * prob);
+
+  if (totalBytes >= thisSliceBytes) {
+    *aAvailable = 0;
+  } else {
+    *aAvailable = thisSliceBytes;
+  }
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond)
+{
+  // Can be called on any thread.
+  if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 || aMaxBytesPerSecond < aMeanBytesPerSecond) {
+    return NS_ERROR_ILLEGAL_VALUE;
+  }
+
+  mMeanBytesPerSecond = aMeanBytesPerSecond;
+  mMaxBytesPerSecond = aMaxBytesPerSecond;
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::BytesProcessed(uint64_t* aResult)
+{
+  *aResult = mBytesProcessed;
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::WrapStream(nsIInputStream* aInputStream, nsIAsyncInputStream** aResult)
+{
+  nsCOMPtr<nsIAsyncInputStream> result = new ThrottleInputStream(aInputStream, this);
+  result.forget(aResult);
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+ThrottleQueue::Notify(nsITimer* aTimer)
+{
+  MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
+  // A notified reader may need to push itself back on the queue.
+  // Swap out the list of readers so that this works properly.
+  nsTArray<RefPtr<ThrottleInputStream>> events;
+  events.SwapElements(mAsyncEvents);
+
+  // Optimistically notify all the waiting readers, and then let them
+  // requeue if there isn't enough bandwidth.
+  for (size_t i = 0; i < events.Length(); ++i) {
+    events[i]->AllowInput();
+  }
+
+  mTimerArmed = false;
+  return NS_OK;
+}
+
+void
+ThrottleQueue::QueueStream(ThrottleInputStream* aStream)
+{
+  MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
+  if (mAsyncEvents.IndexOf(aStream) == mAsyncEvents.NoIndex) {
+    mAsyncEvents.AppendElement(aStream);
+
+    if (!mTimerArmed) {
+      uint32_t ms = 1000;
+      if (mReadEvents.Length() > 0) {
+        TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1);
+        TimeStamp now = TimeStamp::Now();
+
+        if (t > now) {
+          ms = static_cast<uint32_t>((t - now).ToMilliseconds());
+        } else {
+          ms = 1;
+        }
+      }
+
+      if (NS_SUCCEEDED(mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) {
+        mTimerArmed = true;
+      }
+    }
+  }
+}
+
+void
+ThrottleQueue::DequeueStream(ThrottleInputStream* aStream)
+{
+  MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
+  mAsyncEvents.RemoveElement(aStream);
+}
+
+}
+}
new file mode 100644
--- /dev/null
+++ b/netwerk/base/ThrottleQueue.h
@@ -0,0 +1,65 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#ifndef mozilla_net_ThrottleQueue_h
+#define mozilla_net_ThrottleQueue_h
+
+#include "mozilla/TimeStamp.h"
+#include "nsIThrottledInputChannel.h"
+#include "nsITimer.h"
+
+namespace mozilla {
+namespace net {
+
+class ThrottleInputStream;
+
+/**
+ * An implementation of nsIInputChannelThrottleQueue that can be used
+ * to throttle uploads.  This class is not thread-safe.
+ * Initialization and calls to WrapStream may be done on any thread;
+ * but otherwise, after creation, it can only be used on the socket
+ * thread.  It currently throttles with a one second granularity, so
+ * may be a bit choppy.
+ */
+
+class ThrottleQueue final
+  : public nsIInputChannelThrottleQueue
+  , public nsITimerCallback
+{
+public:
+
+  ThrottleQueue();
+
+  NS_DECL_THREADSAFE_ISUPPORTS
+  NS_DECL_NSIINPUTCHANNELTHROTTLEQUEUE
+  NS_DECL_NSITIMERCALLBACK
+
+  void QueueStream(ThrottleInputStream* aStream);
+  void DequeueStream(ThrottleInputStream* aStream);
+
+private:
+
+  ~ThrottleQueue();
+
+  struct ThrottleEntry {
+    TimeStamp mTime;
+    uint32_t mBytesRead;
+  };
+
+  nsTArray<ThrottleEntry> mReadEvents;
+  uint32_t mMeanBytesPerSecond;
+  uint32_t mMaxBytesPerSecond;
+  uint64_t mBytesProcessed;
+
+  nsTArray<RefPtr<ThrottleInputStream>> mAsyncEvents;
+  nsCOMPtr<nsITimer> mTimer;
+  bool mTimerArmed;
+};
+
+}
+}
+
+#endif //  mozilla_net_ThrottleQueue_h
--- a/netwerk/base/moz.build
+++ b/netwerk/base/moz.build
@@ -120,16 +120,17 @@ XPIDL_SOURCES += [
     'nsIStreamListener.idl',
     'nsIStreamListenerTee.idl',
     'nsIStreamLoader.idl',
     'nsIStreamTransportService.idl',
     'nsISyncStreamListener.idl',
     'nsISystemProxySettings.idl',
     'nsIThreadRetargetableRequest.idl',
     'nsIThreadRetargetableStreamListener.idl',
+    'nsIThrottledInputChannel.idl',
     'nsITimedChannel.idl',
     'nsITLSServerSocket.idl',
     'nsITraceableChannel.idl',
     'nsITransport.idl',
     'nsIUDPSocket.idl',
     'nsIUnicharStreamLoader.idl',
     'nsIUploadChannel.idl',
     'nsIUploadChannel2.idl',
@@ -255,16 +256,17 @@ UNIFIED_SOURCES += [
     'OfflineObserver.cpp',
     'PollableEvent.cpp',
     'Predictor.cpp',
     'ProxyAutoConfig.cpp',
     'RedirectChannelRegistrar.cpp',
     'RequestContextService.cpp',
     'SimpleBuffer.cpp',
     'StreamingProtocolService.cpp',
+    'ThrottleQueue.cpp',
     'Tickler.cpp',
     'TLSServerSocket.cpp',
 ]
 
 if CONFIG['MOZ_WIDGET_TOOLKIT'] == 'windows':
     SOURCES += [
         'nsURLHelperWin.cpp',
         'ShutdownLayer.cpp',
new file mode 100644
--- /dev/null
+++ b/netwerk/base/nsIThrottledInputChannel.idl
@@ -0,0 +1,80 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "nsISupports.idl"
+
+interface nsIInputStream;
+interface nsIAsyncInputStream;
+
+/**
+ * An instance of this interface can be used to throttle the uploads
+ * of a group of associated channels.
+ */
+[scriptable, uuid(6b4b96fe-3c67-4587-af7b-58b6b17da411)]
+interface nsIInputChannelThrottleQueue : nsISupports
+{
+    /**
+     * Initialize this object with the mean and maximum bytes per
+     * second that will be allowed.  Neither value may be zero, and
+     * the maximum must not be less than the mean.
+     *
+     * @param aMeanBytesPerSecond
+     *        Mean number of bytes per second.
+     * @param aMaxBytesPerSecond
+     *        Maximum number of bytes per second.
+     */
+    void init(in unsigned long aMeanBytesPerSecond, in unsigned long aMaxBytesPerSecond);
+
+    /**
+     * Return the number of bytes that are available to the caller in
+     * this time slice.
+     *
+     * @param aRemaining
+     *        The number of bytes available to be processed
+     * @return the number of bytes allowed to be processed during this
+     *        time slice; this will never be greater than aRemaining.
+     */
+    unsigned long available(in unsigned long aRemaining);
+
+    /**
+     * Record a successful read.
+     *
+     * @param aBytesRead
+     *        The number of bytes actually read.
+     */
+    void recordRead(in unsigned long aBytesRead);
+
+    /**
+     * Return the number of bytes allowed through this queue.  This is
+     * the sum of all the values passed to recordRead.  This method is
+     * primarily useful for testing.
+     */
+    unsigned long long bytesProcessed();
+
+    /**
+     * Wrap the given input stream in a new input stream which
+     * throttles the incoming data.
+     *
+     * @param aInputStream the input stream to wrap
+     * @return a new input stream that throttles the data.
+     */
+    nsIAsyncInputStream wrapStream(in nsIInputStream aInputStream);
+};
+
+/**
+ * A throttled input channel can be managed by an
+ * nsIInputChannelThrottleQueue to limit how much data is sent during
+ * a given time slice.
+ */
+[scriptable, uuid(0a32a100-c031-45b6-9e8b-0444c7d4a143)]
+interface nsIThrottledInputChannel : nsISupports
+{
+    /**
+     * The queue that manages this channel.  Multiple channels can
+     * share a single queue.  A null value means that no throttling
+     * will be done.
+     */
+    attribute nsIInputChannelThrottleQueue throttleQueue;
+};
--- a/netwerk/build/nsNetCID.h
+++ b/netwerk/build/nsNetCID.h
@@ -620,16 +620,26 @@
 #define NS_HTTPACTIVITYDISTRIBUTOR_CID \
 { /* 15629ada-a41c-4a09-961f-6553cd60b1a2 */         \
     0x15629ada,                                      \
     0xa41c,                                          \
     0x4a09,                                          \
     {0x96, 0x1f, 0x65, 0x53, 0xcd, 0x60, 0xb1, 0xa2} \
 }
 
+#define NS_THROTTLEQUEUE_CONTRACTID \
+    "@mozilla.org/network/throttlequeue;1"
+#define NS_THROTTLEQUEUE_CID                            \
+{ /* 4c39159c-cd90-4dd3-97a7-06af5e6d84c4 */            \
+    0x4c39159c,                                         \
+    0xcd90,                                             \
+    0x4dd3,                                             \
+    {0x97, 0xa7, 0x06, 0xaf, 0x5e, 0x6d, 0x84, 0xc4}    \
+}
+
 /******************************************************************************
  * netwerk/protocol/ftp/ classes
  */
 
 #define NS_FTPPROTOCOLHANDLER_CID \
 { /* 25029490-F132-11d2-9588-00805F369F95 */         \
     0x25029490,                                      \
     0xf132,                                          \
--- a/netwerk/build/nsNetModule.cpp
+++ b/netwerk/build/nsNetModule.cpp
@@ -264,28 +264,30 @@ NS_GENERIC_FACTORY_CONSTRUCTOR_INIT(nsFt
 #undef LOG
 #undef LOG_ENABLED
 #include "nsHttpAuthManager.h"
 #include "nsHttpChannelAuthProvider.h"
 #include "nsHttpBasicAuth.h"
 #include "nsHttpDigestAuth.h"
 #include "nsHttpNTLMAuth.h"
 #include "nsHttpActivityDistributor.h"
+#include "ThrottleQueue.h"
 #undef LOG
 #undef LOG_ENABLED
 namespace mozilla {
 namespace net {
 NS_GENERIC_FACTORY_CONSTRUCTOR(nsHttpNTLMAuth)
 NS_GENERIC_FACTORY_CONSTRUCTOR_INIT(nsHttpHandler, Init)
 NS_GENERIC_FACTORY_CONSTRUCTOR_INIT(nsHttpsHandler, Init)
 NS_GENERIC_FACTORY_CONSTRUCTOR_INIT(nsHttpAuthManager, Init)
 NS_GENERIC_FACTORY_CONSTRUCTOR(nsHttpChannelAuthProvider)
 NS_GENERIC_FACTORY_CONSTRUCTOR(nsHttpActivityDistributor)
 NS_GENERIC_FACTORY_CONSTRUCTOR(nsHttpBasicAuth)
 NS_GENERIC_FACTORY_CONSTRUCTOR(nsHttpDigestAuth)
+NS_GENERIC_FACTORY_CONSTRUCTOR(ThrottleQueue)
 } // namespace net
 } // namespace mozilla
 #endif // !NECKO_PROTOCOL_http
 
 #include "mozilla/net/Dashboard.h"
 #include "mozilla/net/PackagedAppService.h"
 #include "mozilla/net/PackagedAppVerifier.h"
 namespace mozilla {
@@ -789,16 +791,17 @@ NS_DEFINE_NAMED_CID(NS_FILEPROTOCOLHANDL
 NS_DEFINE_NAMED_CID(NS_HTTPPROTOCOLHANDLER_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPSPROTOCOLHANDLER_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPBASICAUTH_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPDIGESTAUTH_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPNTLMAUTH_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPAUTHMANAGER_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPCHANNELAUTHPROVIDER_CID);
 NS_DEFINE_NAMED_CID(NS_HTTPACTIVITYDISTRIBUTOR_CID);
+NS_DEFINE_NAMED_CID(NS_THROTTLEQUEUE_CID);
 #endif // !NECKO_PROTOCOL_http
 #ifdef NECKO_PROTOCOL_ftp
 NS_DEFINE_NAMED_CID(NS_FTPPROTOCOLHANDLER_CID);
 #endif
 #ifdef NECKO_PROTOCOL_res
 NS_DEFINE_NAMED_CID(NS_RESPROTOCOLHANDLER_CID);
 NS_DEFINE_NAMED_CID(NS_EXTENSIONPROTOCOLHANDLER_CID);
 NS_DEFINE_NAMED_CID(NS_SUBSTITUTINGURL_CID);
@@ -939,16 +942,17 @@ static const mozilla::Module::CIDEntry k
     { &kNS_HTTPPROTOCOLHANDLER_CID, false, nullptr, mozilla::net::nsHttpHandlerConstructor },
     { &kNS_HTTPSPROTOCOLHANDLER_CID, false, nullptr, mozilla::net::nsHttpsHandlerConstructor },
     { &kNS_HTTPBASICAUTH_CID, false, nullptr, mozilla::net::nsHttpBasicAuthConstructor },
     { &kNS_HTTPDIGESTAUTH_CID, false, nullptr, mozilla::net::nsHttpDigestAuthConstructor },
     { &kNS_HTTPNTLMAUTH_CID, false, nullptr, mozilla::net::nsHttpNTLMAuthConstructor },
     { &kNS_HTTPAUTHMANAGER_CID, false, nullptr, mozilla::net::nsHttpAuthManagerConstructor },
     { &kNS_HTTPCHANNELAUTHPROVIDER_CID, false, nullptr, mozilla::net::nsHttpChannelAuthProviderConstructor },
     { &kNS_HTTPACTIVITYDISTRIBUTOR_CID, false, nullptr, mozilla::net::nsHttpActivityDistributorConstructor },
+    { &kNS_THROTTLEQUEUE_CID, false, nullptr, mozilla::net::ThrottleQueueConstructor },
 #endif // !NECKO_PROTOCOL_http
 #ifdef NECKO_PROTOCOL_ftp
     { &kNS_FTPPROTOCOLHANDLER_CID, false, nullptr, nsFtpProtocolHandlerConstructor },
 #endif
 #ifdef NECKO_PROTOCOL_res
     { &kNS_RESPROTOCOLHANDLER_CID, false, nullptr, nsResProtocolHandlerConstructor },
     { &kNS_EXTENSIONPROTOCOLHANDLER_CID, false, nullptr, mozilla::ExtensionProtocolHandlerConstructor },
     { &kNS_SUBSTITUTINGURL_CID, false, nullptr, mozilla::SubstitutingURLConstructor },
@@ -1100,16 +1104,17 @@ static const mozilla::Module::ContractID
     { NS_NETWORK_PROTOCOL_CONTRACTID_PREFIX "http", &kNS_HTTPPROTOCOLHANDLER_CID },
     { NS_NETWORK_PROTOCOL_CONTRACTID_PREFIX "https", &kNS_HTTPSPROTOCOLHANDLER_CID },
     { NS_HTTP_AUTHENTICATOR_CONTRACTID_PREFIX "basic", &kNS_HTTPBASICAUTH_CID },
     { NS_HTTP_AUTHENTICATOR_CONTRACTID_PREFIX "digest", &kNS_HTTPDIGESTAUTH_CID },
     { NS_HTTP_AUTHENTICATOR_CONTRACTID_PREFIX "ntlm", &kNS_HTTPNTLMAUTH_CID },
     { NS_HTTPAUTHMANAGER_CONTRACTID, &kNS_HTTPAUTHMANAGER_CID },
     { NS_HTTPCHANNELAUTHPROVIDER_CONTRACTID, &kNS_HTTPCHANNELAUTHPROVIDER_CID },
     { NS_HTTPACTIVITYDISTRIBUTOR_CONTRACTID, &kNS_HTTPACTIVITYDISTRIBUTOR_CID },
+    { NS_THROTTLEQUEUE_CONTRACTID, &kNS_THROTTLEQUEUE_CID },
 #endif // !NECKO_PROTOCOL_http
 #ifdef NECKO_PROTOCOL_ftp
     { NS_NETWORK_PROTOCOL_CONTRACTID_PREFIX "ftp", &kNS_FTPPROTOCOLHANDLER_CID },
 #endif
 #ifdef NECKO_PROTOCOL_res
     { NS_NETWORK_PROTOCOL_CONTRACTID_PREFIX "resource", &kNS_RESPROTOCOLHANDLER_CID },
     { NS_NETWORK_PROTOCOL_CONTRACTID_PREFIX "moz-extension", &kNS_EXTENSIONPROTOCOLHANDLER_CID },
 #endif
--- a/netwerk/protocol/http/HttpBaseChannel.cpp
+++ b/netwerk/protocol/http/HttpBaseChannel.cpp
@@ -222,16 +222,17 @@ NS_INTERFACE_MAP_BEGIN(HttpBaseChannel)
   NS_INTERFACE_MAP_ENTRY(nsIUploadChannel)
   NS_INTERFACE_MAP_ENTRY(nsIFormPOSTActionChannel)
   NS_INTERFACE_MAP_ENTRY(nsIUploadChannel2)
   NS_INTERFACE_MAP_ENTRY(nsISupportsPriority)
   NS_INTERFACE_MAP_ENTRY(nsITraceableChannel)
   NS_INTERFACE_MAP_ENTRY(nsIPrivateBrowsingChannel)
   NS_INTERFACE_MAP_ENTRY(nsITimedChannel)
   NS_INTERFACE_MAP_ENTRY(nsIConsoleReportCollector)
+  NS_INTERFACE_MAP_ENTRY(nsIThrottledInputChannel)
 NS_INTERFACE_MAP_END_INHERITING(nsHashPropertyBag)
 
 //-----------------------------------------------------------------------------
 // HttpBaseChannel::nsIRequest
 //-----------------------------------------------------------------------------
 
 NS_IMETHODIMP
 HttpBaseChannel::GetName(nsACString& aName)
@@ -3436,16 +3437,38 @@ HttpBaseChannel::GetInnerDOMWindow()
     nsCOMPtr<nsPIDOMWindowInner> innerWindow = pDomWindow->GetCurrentInnerWindow();
     if (!innerWindow) {
       return nullptr;
     }
 
     return innerWindow;
 }
 
+//-----------------------------------------------------------------------------
+// HttpBaseChannel::nsIThrottledInputChannel
+//-----------------------------------------------------------------------------
+
+NS_IMETHODIMP
+HttpBaseChannel::SetThrottleQueue(nsIInputChannelThrottleQueue* aQueue)
+{
+  if (!XRE_IsParentProcess()) {
+    return NS_ERROR_FAILURE;
+  }
+
+  mThrottleQueue = aQueue;
+  return NS_OK;
+}
+
+NS_IMETHODIMP
+HttpBaseChannel::GetThrottleQueue(nsIInputChannelThrottleQueue** aQueue)
+{
+  *aQueue = mThrottleQueue;
+  return NS_OK;
+}
+
 //------------------------------------------------------------------------------
 
 bool
 HttpBaseChannel::EnsureRequestContextID()
 {
     nsID nullID;
     nullID.Clear();
     if (!mRequestContextID.Equals(nullID)) {
--- a/netwerk/protocol/http/HttpBaseChannel.h
+++ b/netwerk/protocol/http/HttpBaseChannel.h
@@ -38,16 +38,17 @@
 #include "nsThreadUtils.h"
 #include "PrivateBrowsingChannel.h"
 #include "mozilla/net/DNS.h"
 #include "nsITimedChannel.h"
 #include "nsIHttpChannel.h"
 #include "nsISecurityConsoleMessage.h"
 #include "nsCOMArray.h"
 #include "mozilla/net/ChannelEventQueue.h"
+#include "nsIThrottledInputChannel.h"
 
 class nsISecurityConsoleMessage;
 class nsIPrincipal;
 
 namespace mozilla {
 
 namespace dom {
 class Performance;
@@ -74,27 +75,29 @@ class HttpBaseChannel : public nsHashPro
                       , public nsISupportsPriority
                       , public nsIClassOfService
                       , public nsIResumableChannel
                       , public nsITraceableChannel
                       , public PrivateBrowsingChannel<HttpBaseChannel>
                       , public nsITimedChannel
                       , public nsIForcePendingChannel
                       , public nsIConsoleReportCollector
+                      , public nsIThrottledInputChannel
 {
 protected:
   virtual ~HttpBaseChannel();
 
 public:
   NS_DECL_ISUPPORTS_INHERITED
   NS_DECL_NSIUPLOADCHANNEL
   NS_DECL_NSIFORMPOSTACTIONCHANNEL
   NS_DECL_NSIUPLOADCHANNEL2
   NS_DECL_NSITRACEABLECHANNEL
   NS_DECL_NSITIMEDCHANNEL
+  NS_DECL_NSITHROTTLEDINPUTCHANNEL
 
   HttpBaseChannel();
 
   virtual nsresult Init(nsIURI *aURI, uint32_t aCaps, nsProxyInfo *aProxyInfo,
                         uint32_t aProxyResolveFlags,
                         nsIURI *aProxyURI,
                         const nsID& aChannelId);
 
@@ -379,16 +382,18 @@ protected:
   nsCOMPtr<nsIProgressEventSink>    mProgressSink;
   nsCOMPtr<nsIURI>                  mReferrer;
   nsCOMPtr<nsIApplicationCache>     mApplicationCache;
 
   // An instance of nsHTTPCompressConv
   nsCOMPtr<nsIStreamListener>       mCompressListener;
 
   nsHttpRequestHead                 mRequestHead;
+  // Upload throttling.
+  nsCOMPtr<nsIInputChannelThrottleQueue> mThrottleQueue;
   nsCOMPtr<nsIInputStream>          mUploadStream;
   nsCOMPtr<nsIRunnable>             mUploadCloneableCallback;
   nsAutoPtr<nsHttpResponseHead>     mResponseHead;
   RefPtr<nsHttpConnectionInfo>    mConnectionInfo;
   nsCOMPtr<nsIProxyInfo>            mProxyInfo;
   nsCOMPtr<nsISupports>             mSecurityInfo;
 
   nsCString                         mSpec; // ASCII encoded URL spec
--- a/netwerk/protocol/http/nsHttpTransaction.cpp
+++ b/netwerk/protocol/http/nsHttpTransaction.cpp
@@ -28,16 +28,17 @@
 #include "nsComponentManagerUtils.h" // do_CreateInstance
 #include "nsServiceManagerUtils.h"   // do_GetService
 #include "nsIHttpActivityObserver.h"
 #include "nsSocketTransportService2.h"
 #include "nsICancelable.h"
 #include "nsIEventTarget.h"
 #include "nsIHttpChannelInternal.h"
 #include "nsIInputStream.h"
+#include "nsIThrottledInputChannel.h"
 #include "nsITransport.h"
 #include "nsIOService.h"
 #include "nsIRequestContext.h"
 #include <algorithm>
 
 #ifdef MOZ_WIDGET_GONK
 #include "NetStatistics.h"
 #endif
@@ -227,16 +228,17 @@ nsHttpTransaction::Init(uint32_t caps,
 {
     nsresult rv;
 
     LOG(("nsHttpTransaction::Init [this=%p caps=%x]\n", this, caps));
 
     MOZ_ASSERT(cinfo);
     MOZ_ASSERT(requestHead);
     MOZ_ASSERT(target);
+    MOZ_ASSERT(NS_IsMainThread());
 
     mActivityDistributor = do_GetService(NS_HTTPACTIVITYDISTRIBUTOR_CONTRACTID, &rv);
     if (NS_FAILED(rv)) return rv;
 
     bool activityDistributorActive;
     rv = mActivityDistributor->GetIsActive(&activityDistributorActive);
     if (NS_SUCCEEDED(rv) && activityDistributorActive) {
         // there are some observers registered at activity distributor, gather
@@ -374,16 +376,35 @@ nsHttpTransaction::Init(uint32_t caps,
         // necessary to workaround some common server bugs (see bug 137155).
         rv = NS_NewBufferedInputStream(getter_AddRefs(mRequestStream), multi,
                                        nsIOService::gDefaultSegmentSize);
         if (NS_FAILED(rv)) return rv;
     }
     else
         mRequestStream = headers;
 
+    nsCOMPtr<nsIThrottledInputChannel> throttled = do_QueryInterface(mChannel);
+    nsIInputChannelThrottleQueue* queue;
+    if (throttled) {
+        rv = throttled->GetThrottleQueue(&queue);
+        // In case of failure, just carry on without throttling.
+        if (NS_SUCCEEDED(rv) && queue) {
+            nsCOMPtr<nsIAsyncInputStream> wrappedStream;
+            rv = queue->WrapStream(mRequestStream, getter_AddRefs(wrappedStream));
+            // Failure to throttle isn't sufficient reason to fail
+            // initialization
+            if (NS_SUCCEEDED(rv)) {
+                MOZ_ASSERT(wrappedStream != nullptr);
+                LOG(("nsHttpTransaction::Init %p wrapping input stream using throttle queue %p\n",
+                     this, queue));
+                mRequestStream = do_QueryInterface(wrappedStream);
+            }
+        }
+    }
+
     uint64_t size_u64;
     rv = mRequestStream->Available(&size_u64);
     if (NS_FAILED(rv)) {
         return rv;
     }
 
     // make sure it fits within js MAX_SAFE_INTEGER
     mRequestSize = InScriptableRange(size_u64) ? static_cast<int64_t>(size_u64) : -1;
new file mode 100644
--- /dev/null
+++ b/netwerk/test/unit/test_throttlechannel.js
@@ -0,0 +1,41 @@
+// Test nsIThrottledInputChannel interface.
+
+Cu.import("resource://testing-common/httpd.js");
+Cu.import("resource://gre/modules/NetUtil.jsm");
+
+function test_handler(metadata, response) {
+  const originalBody = "the response";
+  response.setHeader("Content-Type", "text/html", false);
+  response.setStatusLine(metadata.httpVersion, 200, "OK");
+  response.bodyOutputStream.write(originalBody, originalBody.length);
+}
+
+function make_channel(url) {
+  return NetUtil.newChannel({uri: url, loadUsingSystemPrincipal: true})
+                .QueryInterface(Components.interfaces.nsIHttpChannel);
+}
+
+function run_test() {
+  let httpserver = new HttpServer();
+  httpserver.start(-1);
+  const PORT = httpserver.identity.primaryPort;
+
+  httpserver.registerPathHandler("/testdir", test_handler);
+
+  let channel = make_channel("http://localhost:" + PORT + "/testdir");
+
+  let tq = Cc["@mozilla.org/network/throttlequeue;1"]
+      .createInstance(Ci.nsIInputChannelThrottleQueue);
+  tq.init(1000, 1000);
+
+  let tic = channel.QueryInterface(Ci.nsIThrottledInputChannel);
+  tic.throttleQueue = tq;
+
+  channel.asyncOpen2(new ChannelListener(() => {
+    ok(tq.bytesProcessed() > 0, "throttled queue processed some bytes");
+
+    httpserver.stop(do_test_finished);
+  }));
+
+  do_test_pending();
+}
new file mode 100644
--- /dev/null
+++ b/netwerk/test/unit/test_throttlequeue.js
@@ -0,0 +1,23 @@
+// Test ThrottleQueue initialization.
+
+function init(tq, mean, max) {
+  let threw = false;
+  try {
+    tq.init(mean, max);
+  } catch (e) {
+    threw = true;
+  }
+  return !threw;
+}
+
+function run_test() {
+  let tq = Cc["@mozilla.org/network/throttlequeue;1"]
+      .createInstance(Ci.nsIInputChannelThrottleQueue);
+
+  ok(!init(tq, 0, 50), "mean bytes cannot be 0");
+  ok(!init(tq, 50, 0), "max bytes cannot be 0");
+  ok(!init(tq, 0, 0), "mean and max bytes cannot be 0");
+  ok(!init(tq, 70, 20), "max cannot be less than mean");
+
+  ok(init(tq, 2, 2), "valid initialization");
+}
new file mode 100644
--- /dev/null
+++ b/netwerk/test/unit/test_throttling.js
@@ -0,0 +1,57 @@
+// Test nsIThrottledInputChannel interface.
+
+Cu.import("resource://testing-common/httpd.js");
+Cu.import("resource://gre/modules/NetUtil.jsm");
+
+function test_handler(metadata, response) {
+  const originalBody = "the response";
+  response.setHeader("Content-Type", "text/html", false);
+  response.setStatusLine(metadata.httpVersion, 200, "OK");
+  response.bodyOutputStream.write(originalBody, originalBody.length);
+}
+
+function make_channel(url) {
+  return NetUtil.newChannel({uri: url, loadUsingSystemPrincipal: true})
+                .QueryInterface(Ci.nsIHttpChannel);
+}
+
+function run_test() {
+  let httpserver = new HttpServer();
+  httpserver.registerPathHandler("/testdir", test_handler);
+  httpserver.start(-1);
+
+  const PORT = httpserver.identity.primaryPort;
+  const size = 4096;
+
+  let sstream = Cc["@mozilla.org/io/string-input-stream;1"].
+                  createInstance(Ci.nsIStringInputStream);
+  sstream.data = 'x'.repeat(size);
+
+  let mime = Cc["@mozilla.org/network/mime-input-stream;1"].
+               createInstance(Ci.nsIMIMEInputStream);
+  mime.addHeader("Content-Type", "multipart/form-data; boundary=zzzzz");
+  mime.setData(sstream);
+  mime.addContentLength = true;
+
+  let tq = Cc["@mozilla.org/network/throttlequeue;1"]
+      .createInstance(Ci.nsIInputChannelThrottleQueue);
+  // Make sure the request takes more than one read.
+  tq.init(100 + size / 2, 100 + size / 2);
+
+  let channel = make_channel("http://localhost:" + PORT + "/testdir");
+  channel.QueryInterface(Ci.nsIUploadChannel)
+         .setUploadStream(mime, "", mime.available());
+  channel.requestMethod = "POST";
+
+  let tic = channel.QueryInterface(Ci.nsIThrottledInputChannel);
+  tic.throttleQueue = tq;
+
+  let startTime = Date.now();
+  channel.asyncOpen2(new ChannelListener(() => {
+    ok(Date.now() - startTime > 1000, "request took more than one second");
+
+    httpserver.stop(do_test_finished);
+  }));
+
+  do_test_pending();
+}
--- a/netwerk/test/unit/xpcshell.ini
+++ b/netwerk/test/unit/xpcshell.ini
@@ -354,8 +354,11 @@ skip-if = os == "android"
 [test_bug1195415.js]
 [test_cookie_blacklist.js]
 [test_getHost.js]
 [test_packaged_app_bug1214079.js]
 [test_bug412457.js]
 [test_bug464591.js]
 [test_cache-control_request.js]
 [test_bug1279246.js]
+[test_throttlequeue.js]
+[test_throttlechannel.js]
+[test_throttling.js]