Bug 1244227 - Add network throttling. r=Honza draft
authorTom Tromey <tom@tromey.com>
Fri, 29 Jan 2016 11:56:25 -0700
changeset 399970 3b5ff580951f52d5ae0a1c699b73972c08a4dfe3
parent 399969 e3565645292ddca67a38becaf0301e0be2b480ed
child 399971 48cb737bb212864dfa6905553e19dcefe2ee6dda
push id26051
push userbmo:ttromey@mozilla.com
push dateFri, 12 Aug 2016 14:07:06 +0000
reviewersHonza
bugs1244227
milestone51.0a1
Bug 1244227 - Add network throttling. r=Honza MozReview-Commit-ID: Iy6buFxUrGg
devtools/shared/webconsole/moz.build
devtools/shared/webconsole/network-monitor.js
devtools/shared/webconsole/test/unit/test_throttle.js
devtools/shared/webconsole/test/unit/xpcshell.ini
devtools/shared/webconsole/throttle.js
--- a/devtools/shared/webconsole/moz.build
+++ b/devtools/shared/webconsole/moz.build
@@ -10,9 +10,10 @@ if CONFIG['OS_TARGET'] != 'Android':
 
 DevToolsModules(
     'client.js',
     'js-property-provider.js',
     'network-helper.js',
     'network-monitor.js',
     'server-logger-monitor.js',
     'server-logger.js',
+    'throttle.js',
 )
--- a/devtools/shared/webconsole/network-monitor.js
+++ b/devtools/shared/webconsole/network-monitor.js
@@ -15,16 +15,17 @@ loader.lazyRequireGetter(this, "NetworkH
 loader.lazyRequireGetter(this, "DevToolsUtils",
                          "devtools/shared/DevToolsUtils");
 loader.lazyRequireGetter(this, "flags",
                          "devtools/shared/flags");
 loader.lazyImporter(this, "NetUtil", "resource://gre/modules/NetUtil.jsm");
 loader.lazyServiceGetter(this, "gActivityDistributor",
                          "@mozilla.org/network/http-activity-distributor;1",
                          "nsIHttpActivityDistributor");
+const {NetworkThrottleManager} = require("devtools/shared/webconsole/throttle");
 
 // /////////////////////////////////////////////////////////////////////////////
 // Network logging
 // /////////////////////////////////////////////////////////////////////////////
 
 // The maximum uint32 value.
 const PR_UINT32_MAX = 4294967295;
 
@@ -664,17 +665,21 @@ NetworkResponseListener.prototype = {
  */
 function NetworkMonitor(filters, owner) {
   this.filters = filters;
   this.owner = owner;
   this.openRequests = {};
   this.openResponses = {};
   this._httpResponseExaminer =
     DevToolsUtils.makeInfallible(this._httpResponseExaminer).bind(this);
+  this._httpModifyExaminer =
+    DevToolsUtils.makeInfallible(this._httpModifyExaminer).bind(this);
   this._serviceWorkerRequest = this._serviceWorkerRequest.bind(this);
+  this.throttleData = null;
+  this._throttler = null;
 }
 
 exports.NetworkMonitor = NetworkMonitor;
 
 NetworkMonitor.prototype = {
   filters: null,
 
   httpTransactionCodes: {
@@ -725,23 +730,32 @@ NetworkMonitor.prototype = {
     this.interceptedChannels = new Set();
 
     if (Services.appinfo.processType != Ci.nsIXULRuntime.PROCESS_TYPE_CONTENT) {
       gActivityDistributor.addObserver(this);
       Services.obs.addObserver(this._httpResponseExaminer,
                                "http-on-examine-response", false);
       Services.obs.addObserver(this._httpResponseExaminer,
                                "http-on-examine-cached-response", false);
+      Services.obs.addObserver(this._httpModifyExaminer,
+                               "http-on-modify-request", false);
     }
     // In child processes, only watch for service worker requests
     // everything else only happens in the parent process
     Services.obs.addObserver(this._serviceWorkerRequest,
                              "service-worker-synthesized-response", false);
   },
 
+  _getThrottler: function () {
+    if (this.throttleData !== null && this._throttler === null) {
+      this._throttler = new NetworkThrottleManager(this.throttleData);
+    }
+    return this._throttler;
+  },
+
   _serviceWorkerRequest: function (subject, topic, data) {
     let channel = subject.QueryInterface(Ci.nsIHttpChannel);
 
     if (!matchRequest(channel, this.filters)) {
       return;
     }
 
     this.interceptedChannels.add(subject);
@@ -847,16 +861,34 @@ NetworkMonitor.prototype = {
       // There also is never any timing events, so we can fire this
       // event with zeroed out values.
       let timings = this._setupHarTimings(httpActivity, true);
       httpActivity.owner.addEventTimings(timings.total, timings.timings);
     }
   },
 
   /**
+   * Observe notifications for the http-on-modify-request topic, coming from
+   * the nsIObserverService.
+   *
+   * @private
+   * @param nsIHttpChannel aSubject
+   * @returns void
+   */
+  _httpModifyExaminer: function (subject) {
+    let throttler = this._getThrottler();
+    if (throttler) {
+      let channel = subject.QueryInterface(Ci.nsIHttpChannel);
+      if (matchRequest(channel, this.filters)) {
+        throttler.manageUpload(channel);
+      }
+    }
+  },
+
+  /**
    * Begin observing HTTP traffic that originates inside the current tab.
    *
    * @see https://developer.mozilla.org/en/XPCOM_Interface_Reference/nsIHttpActivityObserver
    *
    * @param nsIHttpChannel channel
    * @param number activityType
    * @param number activitySubtype
    * @param number timestamp
@@ -1016,17 +1048,17 @@ NetworkMonitor.prototype = {
     });
 
     if (cookieHeader) {
       cookies = NetworkHelper.parseCookieHeader(cookieHeader);
     }
 
     httpActivity.owner = this.owner.onNetworkEvent(event, channel);
 
-    this._setupResponseListener(httpActivity);
+    this._setupResponseListener(httpActivity, fromCache);
 
     httpActivity.owner.addRequestHeaders(headers, extraStringData);
     httpActivity.owner.addRequestCookies(cookies);
 
     this.openRequests[httpActivity.id] = httpActivity;
     return httpActivity;
   },
 
@@ -1086,20 +1118,27 @@ NetworkMonitor.prototype = {
   /**
    * Setup the network response listener for the given HTTP activity. The
    * NetworkResponseListener is responsible for storing the response body.
    *
    * @private
    * @param object httpActivity
    *        The HTTP activity object we are tracking.
    */
-  _setupResponseListener: function (httpActivity) {
+  _setupResponseListener: function (httpActivity, fromCache) {
     let channel = httpActivity.channel;
     channel.QueryInterface(Ci.nsITraceableChannel);
 
+    if (!fromCache) {
+      let throttler = this._getThrottler();
+      if (throttler) {
+        httpActivity.downloadThrottle = throttler.manage(channel);
+      }
+    }
+
     // The response will be written into the outputStream of this pipe.
     // This allows us to buffer the data we are receiving and read it
     // asynchronously.
     // Both ends of the pipe must be blocking.
     let sink = Cc["@mozilla.org/pipe;1"].createInstance(Ci.nsIPipe);
 
     // The streams need to be blocking because this is required by the
     // stream tee.
@@ -1323,26 +1362,29 @@ NetworkMonitor.prototype = {
    */
   destroy: function () {
     if (Services.appinfo.processType != Ci.nsIXULRuntime.PROCESS_TYPE_CONTENT) {
       gActivityDistributor.removeObserver(this);
       Services.obs.removeObserver(this._httpResponseExaminer,
                                   "http-on-examine-response");
       Services.obs.removeObserver(this._httpResponseExaminer,
                                   "http-on-examine-cached-response");
+      Services.obs.removeObserver(this._httpModifyExaminer,
+                                  "http-on-modify-request", false);
     }
 
     Services.obs.removeObserver(this._serviceWorkerRequest,
                                 "service-worker-synthesized-response");
 
     this.interceptedChannels.clear();
     this.openRequests = {};
     this.openResponses = {};
     this.owner = null;
     this.filters = null;
+    this._throttler = null;
   },
 };
 
 /**
  * The NetworkMonitorChild is used to proxy all of the network activity of the
  * child app process from the main process. The child WebConsoleActor creates an
  * instance of this object.
  *
new file mode 100644
--- /dev/null
+++ b/devtools/shared/webconsole/test/unit/test_throttle.js
@@ -0,0 +1,140 @@
+/* Any copyright is dedicated to the Public Domain.
+   http://creativecommons.org/publicdomain/zero/1.0/ */
+
+"use strict";
+
+const Cu = Components.utils;
+const Cc = Components.classes;
+const Ci = Components.interfaces;
+const { require } = Cu.import("resource://devtools/shared/Loader.jsm", {});
+const promise = require("promise");
+const { NetworkThrottleManager } =
+      require("devtools/shared/webconsole/throttle");
+const nsIScriptableInputStream = Ci.nsIScriptableInputStream;
+
+function TestStreamListener() {
+  this.state = "initial";
+}
+TestStreamListener.prototype = {
+  onStartRequest: function() {
+    this.setState("start");
+  },
+
+  onStopRequest: function() {
+    this.setState("stop");
+  },
+
+  onDataAvailable: function(request, context, inputStream, offset, count) {
+    const sin = Components.classes["@mozilla.org/scriptableinputstream;1"]
+          .createInstance(nsIScriptableInputStream);
+    sin.init(inputStream);
+    this.data = sin.read(count);
+    this.setState("data");
+  },
+
+  setState: function(state) {
+    this.state = state;
+    if (this._deferred) {
+      this._deferred.resolve(state);
+      this._deferred = null;
+    }
+  },
+
+  onStateChanged: function() {
+    if (!this._deferred) {
+      this._deferred = promise.defer();
+    }
+    return this._deferred.promise;
+  }
+};
+
+function TestChannel() {
+  this.state = "initial";
+  this.testListener = new TestStreamListener();
+  this._throttleQueue = null;
+}
+TestChannel.prototype = {
+  QueryInterface: function() {
+    return this;
+  },
+
+  get throttleQueue() {
+    return this._throttleQueue;
+  },
+
+  set throttleQueue(q) {
+    this._throttleQueue = q;
+    this.state = "throttled";
+  },
+
+  setNewListener: function(listener) {
+    this.listener = listener;
+    this.state = "listener";
+    return this.testListener;
+  },
+};
+
+add_task(function*() {
+  let throttler = new NetworkThrottleManager({
+    roundTripTimeMean: 1,
+    roundTripTimeMax: 1,
+    downloadBPSMean: 500,
+    downloadBPSMax: 500,
+    uploadBPSMean: 500,
+    uploadBPSMax: 500,
+  });
+
+  let uploadChannel = new TestChannel();
+  throttler.manageUpload(uploadChannel);
+  equal(uploadChannel.state, "throttled",
+        "NetworkThrottleManager set throttleQueue");
+
+  let downloadChannel = new TestChannel();
+  let testListener = downloadChannel.testListener;
+
+  let listener = throttler.manage(downloadChannel);
+  equal(downloadChannel.state, "listener",
+     "NetworkThrottleManager called setNewListener");
+
+  equal(testListener.state, "initial", "test listener in initial state");
+
+  // This method must be passed through immediately.
+  listener.onStartRequest(null, null);
+  equal(testListener.state, "start", "test listener started");
+
+  const TEST_INPUT = "hi bob";
+
+  let testStream = Cc["@mozilla.org/storagestream;1"]
+      .createInstance(Ci.nsIStorageStream);
+  testStream.init(512, 512);
+  let out = testStream.getOutputStream(0);
+  out.write(TEST_INPUT, TEST_INPUT.length);
+  out.close();
+  let testInputStream = testStream.newInputStream(0);
+
+  let activityDistributor =
+      Cc["@mozilla.org/network/http-activity-distributor;1"]
+      .getService(Ci.nsIHttpActivityDistributor);
+  let activitySeen = false;
+  listener.addActivityCallback(() => activitySeen = true, null, null, null,
+                               activityDistributor
+                               .ACTIVITY_SUBTYPE_RESPONSE_COMPLETE,
+                               null, TEST_INPUT.length, null);
+
+  // onDataAvailable is required to immediately read the data.
+  listener.onDataAvailable(null, null, testInputStream, 0, 6);
+  equal(testInputStream.available(), 0, "no more data should be available");
+  equal(testListener.state, "start",
+     "test listener should not have received data");
+  equal(activitySeen, false, "activity not distributed yet");
+
+  let newState = yield testListener.onStateChanged();
+  equal(newState, "data", "test listener received data");
+  equal(testListener.data, TEST_INPUT, "test listener received all the data");
+  equal(activitySeen, true, "activity has been distributed");
+
+  let onChange = testListener.onStateChanged();
+  listener.onStopRequest(null, null, null);
+  newState = yield onChange;
+  equal(newState, "stop", "onStateChanged reported");
+});
--- a/devtools/shared/webconsole/test/unit/xpcshell.ini
+++ b/devtools/shared/webconsole/test/unit/xpcshell.ini
@@ -9,8 +9,9 @@ support-files =
 [test_js_property_provider.js]
 [test_network_helper.js]
 [test_security-info-certificate.js]
 [test_security-info-parser.js]
 [test_security-info-protocol-version.js]
 [test_security-info-state.js]
 [test_security-info-static-hpkp.js]
 [test_security-info-weakness-reasons.js]
+[test_throttle.js]
new file mode 100644
--- /dev/null
+++ b/devtools/shared/webconsole/throttle.js
@@ -0,0 +1,325 @@
+/* -*- indent-tabs-mode: nil; js-indent-level: 2 -*- */
+/* vim: set ft= javascript ts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+const {CC, Ci, Cu, Cc} = require("chrome");
+
+const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1",
+                                  "nsIArrayBufferInputStream");
+const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1",
+                             "nsIBinaryInputStream", "setInputStream");
+
+const {XPCOMUtils} = require("resource://gre/modules/XPCOMUtils.jsm");
+const {setTimeout} = Cu.import("resource://gre/modules/Timer.jsm", {});
+
+/**
+ * Construct a new nsIStreamListener that buffers data and provides a
+ * method to notify another listener when data is available.  This is
+ * used to throttle network data on a per-channel basis.
+ *
+ * After construction, @see setOriginalListener must be called on the
+ * new object.
+ *
+ * @param {NetworkThrottleQueue} queue the NetworkThrottleQueue to
+ *        which status changes should be reported
+ */
+function NetworkThrottleListener(queue) {
+  this.queue = queue;
+  this.pendingData = [];
+  this.pendingException = null;
+  this.offset = 0;
+}
+
+NetworkThrottleListener.prototype = {
+  QueryInterface:
+    XPCOMUtils.generateQI([Ci.nsIStreamListener, Ci.nsIInterfaceRequestor,
+                           Ci.nsISupports]),
+
+  /**
+   * Set the original listener for this object.  The original listener
+   * will receive requests from this object when the queue allows data
+   * through.
+   *
+   * @param {nsIStreamListener} originalListener the original listener
+   *        for the channel, to which all requests will be sent
+   */
+  setOriginalListener: function (originalListener) {
+    this.originalListener = originalListener;
+  },
+
+  /**
+   * @see nsIStreamListener.onStartRequest.
+   */
+  onStartRequest: function (request, context) {
+    this.originalListener.onStartRequest(request, context);
+    this.queue.start(this);
+  },
+
+  /**
+   * @see nsIStreamListener.onStopRequest.
+   */
+  onStopRequest: function (request, context, statusCode) {
+    this.pendingData.push({request, context, statusCode});
+    this.queue.dataAvailable(this);
+  },
+
+  /**
+   * @see nsIStreamListener.onDataAvailable.
+   */
+  onDataAvailable: function (request, context, inputStream, offset, count) {
+    if (this.pendingException) {
+      throw this.pendingException;
+    }
+
+    const bin = new BinaryInputStream(inputStream);
+    const bytes = new ArrayBuffer(count);
+    bin.readArrayBuffer(count, bytes);
+
+    const stream = new ArrayBufferInputStream();
+    stream.setData(bytes, 0, count);
+
+    this.pendingData.push({request, context, stream, count});
+    this.queue.dataAvailable(this);
+  },
+
+  /**
+   * Allow some buffered data from this object to be forwarded to this
+   * object's originalListener.
+   *
+   * @param {Number} bytesPermitted The maximum number of bytes
+   *        permitted to be sent.
+   * @return {Object} an object of the form {length, done}, where
+   *         |length| is the number of bytes actually forwarded, and
+   *         |done| is a boolean indicating whether this particular
+   *         request has been completed.  (A NetworkThrottleListener
+   *         may be queued multiple times, so this does not mean that
+   *         all available data has been sent.)
+   */
+  sendSomeData: function (bytesPermitted) {
+    if (this.pendingData.length === 0) {
+      // Shouldn't happen.
+      return {length: 0, done: true};
+    }
+
+    const {request, context, stream, count, statusCode} = this.pendingData[0];
+
+    if (statusCode !== undefined) {
+      this.pendingData.shift();
+      this.originalListener.onStopRequest(request, context, statusCode);
+      return {length: 0, done: true};
+    }
+
+    if (bytesPermitted > count) {
+      bytesPermitted = count;
+    }
+
+    try {
+      this.originalListener.onDataAvailable(request, context, stream,
+                                            this.offset, bytesPermitted);
+    } catch (e) {
+      this.pendingException = e;
+    }
+
+    let done = false;
+    if (bytesPermitted === count) {
+      this.pendingData.shift();
+      done = true;
+    } else {
+      this.pendingData[0].count -= bytesPermitted;
+    }
+
+    this.offset += bytesPermitted;
+    return {length: bytesPermitted, done};
+  },
+
+  /**
+   * Return the number of pending data requests available for this
+   * listener.
+   */
+  pendingCount: function () {
+    return this.pendingData.length;
+  },
+};
+
+/**
+ * Construct a new queue that can be used to throttle the network for
+ * a group of related network requests.
+ *
+ * meanBPS {Number} Mean bytes per second.
+ * maxBPS {Number} Maximum bytes per second.
+ * roundTripTimeMean {Number} Mean round trip time in milliseconds.
+ * roundTripTimeMax {Number} Maximum round trip time in milliseconds.
+ */
+function NetworkThrottleQueue(meanBPS, maxBPS,
+                              roundTripTimeMean, roundTripTimeMax) {
+  this.meanBPS = meanBPS;
+  this.maxBPS = maxBPS;
+  this.roundTripTimeMean = roundTripTimeMean;
+  this.roundTripTimeMax = roundTripTimeMax;
+
+  this.pendingRequests = new Set();
+  this.downloadQueue = [];
+  this.previousReads = [];
+
+  this.pumping = false;
+}
+
+NetworkThrottleQueue.prototype = {
+  /**
+   * A helper function that, given a mean and a maximum, returns a
+   * random integer between (mean - (max - mean)) and max.
+   */
+  random: function (mean, max) {
+    return mean - (max - mean) + Math.floor(2 * (max - mean) * Math.random());
+  },
+
+  /**
+   * A helper function that lets the indicating listener start sending
+   * data.  This is called after the initial round trip time for the
+   * listener has elapsed.
+   */
+  allowDataFrom: function (throttleListener) {
+    this.pendingRequests.delete(throttleListener);
+    const count = throttleListener.pendingCount();
+    for (let i = 0; i < count; ++i) {
+      this.downloadQueue.push(throttleListener);
+    }
+    this.pump();
+  },
+
+  /**
+   * Notice a new listener object.  This is called by the
+   * NetworkThrottleListener when the request has started.  Initially
+   * a new listener object is put into a "pending" state, until the
+   * round-trip time has elapsed.  This is used to simulate latency.
+   *
+   * @param {NetworkThrottleListener} throttleListener the new listener
+   */
+  start: function (throttleListener) {
+    this.pendingRequests.add(throttleListener);
+    let delay = this.random(this.roundTripTimeMean, this.roundTripTimeMax);
+    if (delay > 0) {
+      setTimeout(() => this.allowDataFrom(throttleListener), delay);
+    } else {
+      this.allowDataFrom(throttleListener);
+    }
+  },
+
+  /**
+   * Note that new data is available for a given listener.  Each time
+   * data is available, the listener will be re-queued.
+   *
+   * @param {NetworkThrottleListener} throttleListener the listener
+   *        which has data available.
+   */
+  dataAvailable: function (throttleListener) {
+    if (!this.pendingRequests.has(throttleListener)) {
+      this.downloadQueue.push(throttleListener);
+      this.pump();
+    }
+  },
+
+  /**
+   * An internal function that permits individual listeners to send
+   * data.
+   */
+  pump: function () {
+    // A redirect will cause two NetworkThrottleListeners to be on a
+    // listener chain.  In this case, we might recursively call into
+    // this method.  Avoid infinite recursion here.
+    if (this.pumping) {
+      return;
+    }
+    this.pumping = true;
+
+    const now = Date.now();
+    const oneSecondAgo = now - 1000;
+
+    while (this.previousReads.length &&
+           this.previousReads[0].when < oneSecondAgo) {
+      this.previousReads.shift();
+    }
+
+    const totalBytes = this.previousReads.reduce((sum, elt) => {
+      return sum + elt.numBytes;
+    }, 0);
+
+    let thisSliceBytes = this.random(this.meanBPS, this.maxBPS);
+    if (totalBytes < thisSliceBytes) {
+      thisSliceBytes -= totalBytes;
+      let readThisTime = 0;
+      while (thisSliceBytes > 0 && this.downloadQueue.length) {
+        let {length, done} = this.downloadQueue[0].sendSomeData(thisSliceBytes);
+        thisSliceBytes -= length;
+        readThisTime += length;
+        if (done) {
+          this.downloadQueue.shift();
+        }
+      }
+      this.previousReads.push({when: now, numBytes: readThisTime});
+    }
+
+    // If there is more data to download, then schedule ourselves for
+    // one second after the oldest previous read.
+    if (this.downloadQueue.length) {
+      const when = this.previousReads[0].when + 1000;
+      setTimeout(this.pump.bind(this), when - now);
+    }
+
+    this.pumping = false;
+  },
+};
+
+/**
+ * Construct a new object that can be used to throttle the network for
+ * a group of related network requests.
+ *
+ * @param {Object} An object with the following attributes:
+ * roundTripTimeMean {Number} Mean round trip time in milliseconds.
+ * roundTripTimeMax {Number} Maximum round trip time in milliseconds.
+ * downloadBPSMean {Number} Mean bytes per second for downloads.
+ * downloadBPSMax {Number} Maximum bytes per second for downloads.
+ * uploadBPSMean {Number} Mean bytes per second for uploads.
+ * uploadBPSMax {Number} Maximum bytes per second for uploads.
+ */
+function NetworkThrottleManager({roundTripTimeMean, roundTripTimeMax,
+                                 downloadBPSMean, downloadBPSMax,
+                                 uploadBPSMean, uploadBPSMax}) {
+  this.downloadQueue =
+    new NetworkThrottleQueue(downloadBPSMean, downloadBPSMax,
+                             roundTripTimeMean, roundTripTimeMax);
+  this.uploadQueue = Cc["@mozilla.org/network/throttlequeue;1"]
+    .createInstance(Ci.nsIInputChannelThrottleQueue);
+  this.uploadQueue.init(uploadBPSMean, uploadBPSMax);
+}
+exports.NetworkThrottleManager = NetworkThrottleManager;
+
+NetworkThrottleManager.prototype = {
+  /**
+   * Create a new NetworkThrottleListener for a given channel and
+   * install it using |setNewListener|.
+   *
+   * @param {nsITraceableChannel} channel the channel to manage
+   * @return {NetworkThrottleListener} the new listener
+   */
+  manage: function (channel) {
+    let listener = new NetworkThrottleListener(this.downloadQueue);
+    let originalListener = channel.setNewListener(listener);
+    listener.setOriginalListener(originalListener);
+    return listener;
+  },
+
+  /**
+   * Throttle uploads taking place on the given channel.
+   *
+   * @param {nsITraceableChannel} channel the channel to manage
+   */
+  manageUpload: function (channel) {
+    channel = channel.QueryInterface(Ci.nsIThrottledInputChannel);
+    channel.throttleQueue = this.uploadQueue;
+  },
+};