Bug 1444539: Disconnect StreamFilters on redirect. r=mixedpuppy
authorKris Maglione <maglione.k@gmail.com>
Fri, 09 Mar 2018 14:38:41 -0800
changeset 462462 a97bab3e3c901e13d2b1cd0c0a4fa12f1b941d8c
parent 462461 55c87e7ea09d4d9d6a38603360d57523b3c4c821
child 462463 1976ff94b12a14e32cac357c1ef2cf7caca8ff7e
push id1683
push usersfraser@mozilla.com
push dateThu, 26 Apr 2018 16:43:40 +0000
treeherdermozilla-release@5af6cb21869d [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersmixedpuppy
bugs1444539
milestone60.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1444539: Disconnect StreamFilters on redirect. r=mixedpuppy MozReview-Commit-ID: AuCjXTlsFSC
toolkit/components/extensions/test/xpcshell/test_ext_webRequest_filterResponseData.js
toolkit/components/extensions/test/xpcshell/xpcshell-common.ini
toolkit/components/extensions/webrequest/PStreamFilter.ipdl
toolkit/components/extensions/webrequest/StreamFilterChild.cpp
toolkit/components/extensions/webrequest/StreamFilterChild.h
toolkit/components/extensions/webrequest/StreamFilterParent.cpp
toolkit/components/extensions/webrequest/StreamFilterParent.h
new file mode 100644
--- /dev/null
+++ b/toolkit/components/extensions/test/xpcshell/test_ext_webRequest_filterResponseData.js
@@ -0,0 +1,131 @@
+"use strict";
+
+XPCOMUtils.defineLazyServiceGetter(this, "proxyService",
+                                   "@mozilla.org/network/protocol-proxy-service;1",
+                                   "nsIProtocolProxyService");
+
+const server = createHttpServer();
+const gHost = "localhost";
+const gPort = server.identity.primaryPort;
+
+const HOSTS = new Set([
+  "example.com",
+  "example.org",
+  "example.net",
+]);
+
+for (let host of HOSTS) {
+  server.identity.add("http", host, 80);
+}
+
+const proxyFilter = {
+  proxyInfo: proxyService.newProxyInfo("http", gHost, gPort, 0, 4096, null),
+
+  applyFilter(service, channel, defaultProxyInfo, callback) {
+    if (HOSTS.has(channel.URI.host)) {
+      callback.onProxyFilterResult(this.proxyInfo);
+    } else {
+      callback.onProxyFilterResult(defaultProxyInfo);
+    }
+  },
+};
+
+proxyService.registerChannelFilter(proxyFilter, 0);
+registerCleanupFunction(() => {
+  proxyService.unregisterChannelFilter(proxyFilter);
+});
+
+server.registerPathHandler("/redirect", (request, response) => {
+  let params = new URLSearchParams(request.queryString);
+  response.setStatusLine(request.httpVersion, 302, "Moved Temporarily");
+  response.setHeader("Location", params.get("redirect_uri"));
+  response.setHeader("Access-Control-Allow-Origin", "*");
+});
+
+server.registerPathHandler("/dummy", (request, response) => {
+  response.setStatusLine(request.httpVersion, 200, "OK");
+  response.setHeader("Access-Control-Allow-Origin", "*");
+  response.write("ok");
+});
+
+Cu.importGlobalProperties(["fetch"]);
+
+add_task(async function() {
+  const {fetch} = Cu.Sandbox("http://example.com/", {wantGlobalProperties: ["fetch"]});
+
+  let extension = ExtensionTestUtils.loadExtension({
+    background() {
+      let pending = [];
+
+      browser.webRequest.onBeforeRequest.addListener(
+        data => {
+          let filter = browser.webRequest.filterResponseData(data.requestId);
+
+          let url = new URL(data.url);
+
+          if (url.searchParams.get("redirect_uri")) {
+            pending.push(
+              new Promise(resolve => { filter.onerror = resolve; }).then(() => {
+                browser.test.assertEq("Channel redirected", filter.error,
+                                      "Got correct error for redirected filter");
+              }));
+          }
+
+          filter.onstart = () => {
+            filter.write(new TextEncoder().encode(data.url));
+          };
+          filter.ondata = event => {
+            let str = new TextDecoder().decode(event.data);
+            browser.test.assertEq("ok", str, `Got unfiltered data for ${data.url}`);
+          };
+          filter.onstop = () => {
+            filter.close();
+          };
+        }, {
+          urls: ["<all_urls>"],
+        },
+        ["blocking"]);
+
+      browser.test.onMessage.addListener(async msg => {
+        if (msg === "done") {
+          await Promise.all(pending);
+          browser.test.notifyPass("stream-filter");
+        }
+      });
+    },
+
+    manifest: {
+      permissions: [
+        "webRequest",
+        "webRequestBlocking",
+        "http://example.com/",
+        "http://example.org/",
+      ],
+    },
+  });
+
+  await extension.startup();
+
+  let results = [
+    ["http://example.com/dummy", "http://example.com/dummy"],
+    ["http://example.org/dummy", "http://example.org/dummy"],
+    ["http://example.net/dummy", "ok"],
+    ["http://example.com/redirect?redirect_uri=http://example.com/dummy", "http://example.com/dummy"],
+    ["http://example.com/redirect?redirect_uri=http://example.org/dummy", "http://example.org/dummy"],
+    ["http://example.com/redirect?redirect_uri=http://example.net/dummy", "ok"],
+    ["http://example.net/redirect?redirect_uri=http://example.com/dummy", "http://example.com/dummy"],
+  ].map(async ([url, expectedResponse]) => {
+    let resp = await fetch(url);
+    let text = await resp.text();
+    equal(text, expectedResponse, `Expected response for ${url}`);
+  });
+
+  await Promise.all(results);
+
+  extension.sendMessage("done");
+  await extension.awaitFinish("stream-filter");
+  await extension.unload();
+
+  ChromeUtils.import("resource://gre/modules/Timer.jsm");
+  await new Promise(resolve => setTimeout(resolve, 100));
+});
--- a/toolkit/components/extensions/test/xpcshell/xpcshell-common.ini
+++ b/toolkit/components/extensions/test/xpcshell/xpcshell-common.ini
@@ -76,16 +76,17 @@ head = head.js head_sync.js
 skip-if = os == "android"
 [test_ext_storage_sync_crypto.js]
 skip-if = os == "android"
 [test_ext_storage_telemetry.js]
 skip-if = os == "android" # checking for telemetry needs to be updated: 1384923
 [test_ext_trustworthy_origin.js]
 [test_ext_topSites.js]
 skip-if = os == "android"
+[test_ext_webRequest_filterResponseData.js]
 [test_native_manifests.js]
 subprocess = true
 skip-if = os == "android"
 [test_ext_permissions.js]
 skip-if = os == "android" # Bug 1350559
 [test_proxy_listener.js]
 [test_proxy_scripts.js]
 [test_proxy_scripts_results.js]
--- a/toolkit/components/extensions/webrequest/PStreamFilter.ipdl
+++ b/toolkit/components/extensions/webrequest/PStreamFilter.ipdl
@@ -13,21 +13,23 @@ parent:
   async Write(uint8_t[] data);
 
   async FlushedData();
 
   async Suspend();
   async Resume();
   async Close();
   async Disconnect();
+  async Destroy();
 
 child:
   async Resumed();
   async Suspended();
   async Closed();
+  async Error(nsCString error);
 
   async FlushData();
 
   async StartRequest();
   async Data(uint8_t[] data);
   async StopRequest(nsresult aStatus);
 };
 
--- a/toolkit/components/extensions/webrequest/StreamFilterChild.cpp
+++ b/toolkit/components/extensions/webrequest/StreamFilterChild.cpp
@@ -300,16 +300,28 @@ StreamFilterChild::RecvInitialized(bool 
     if (mStreamFilter) {
       mStreamFilter->FireErrorEvent(NS_LITERAL_STRING("Invalid request ID"));
       mStreamFilter = nullptr;
     }
   }
 }
 
 IPCResult
+StreamFilterChild::RecvError(const nsCString& aError)
+{
+  mState = State::Error;
+  if (mStreamFilter) {
+    mStreamFilter->FireErrorEvent(NS_ConvertUTF8toUTF16(aError));
+    mStreamFilter = nullptr;
+  }
+  SendDestroy();
+  return IPC_OK();
+}
+
+IPCResult
 StreamFilterChild::RecvClosed() {
   MOZ_DIAGNOSTIC_ASSERT(mState == State::Closing);
 
   SetNextState();
   return IPC_OK();
 }
 
 IPCResult
--- a/toolkit/components/extensions/webrequest/StreamFilterChild.h
+++ b/toolkit/components/extensions/webrequest/StreamFilterChild.h
@@ -94,16 +94,17 @@ public:
 
   void  RecvInitialized(bool aSuccess);
 
 protected:
 
   virtual IPCResult RecvStartRequest() override;
   virtual IPCResult RecvData(Data&& data) override;
   virtual IPCResult RecvStopRequest(const nsresult& aStatus) override;
+  virtual IPCResult RecvError(const nsCString& aError) override;
 
   virtual IPCResult RecvClosed() override;
   virtual IPCResult RecvSuspended() override;
   virtual IPCResult RecvResumed() override;
   virtual IPCResult RecvFlushData() override;
 
   virtual void DeallocPStreamFilterChild() override;
 
--- a/toolkit/components/extensions/webrequest/StreamFilterParent.cpp
+++ b/toolkit/components/extensions/webrequest/StreamFilterParent.cpp
@@ -220,16 +220,17 @@ StreamFilterParent::Broken()
   mState = State::Disconnecting;
 
   RefPtr<StreamFilterParent> self(this);
   RunOnIOThread(FUNC, [=] {
     self->FlushBufferedData();
 
     RunOnActorThread(FUNC, [=] {
       if (self->IPCActive()) {
+        self->mDisconnected = true;
         self->mState = State::Disconnected;
       }
     });
   });
 }
 
 /*****************************************************************************
  * State change requests
@@ -263,16 +264,25 @@ StreamFilterParent::Destroy()
   ActorThread()->Dispatch(
     NewRunnableMethod("StreamFilterParent::Close",
                       this,
                       &StreamFilterParent::Close),
     NS_DISPATCH_NORMAL);
 }
 
 IPCResult
+StreamFilterParent::RecvDestroy()
+{
+  AssertIsActorThread();
+  Destroy();
+  return IPC_OK();
+}
+
+
+IPCResult
 StreamFilterParent::RecvSuspend()
 {
   AssertIsActorThread();
 
   if (mState == State::TransferringData) {
     RefPtr<StreamFilterParent> self(this);
     RunOnMainThread(FUNC, [=] {
       self->mChannel->Suspend();
@@ -306,17 +316,16 @@ StreamFilterParent::RecvResume()
         if (self->IPCActive()) {
           self->CheckResult(self->SendResumed());
         }
       });
     });
   }
   return IPC_OK();
 }
-
 IPCResult
 StreamFilterParent::RecvDisconnect()
 {
   AssertIsActorThread();
 
   if (mState == State::Suspended) {
     RefPtr<StreamFilterParent> self(this);
     RunOnMainThread(FUNC, [=] {
@@ -341,16 +350,17 @@ StreamFilterParent::RecvFlushedData()
   Destroy();
 
   RefPtr<StreamFilterParent> self(this);
   RunOnIOThread(FUNC, [=] {
     self->FlushBufferedData();
 
     RunOnActorThread(FUNC, [=] {
       self->mState = State::Disconnected;
+      self->mDisconnected = true;
     });
   });
   return IPC_OK();
 }
 
 /*****************************************************************************
  * Data output
  *****************************************************************************/
@@ -401,17 +411,29 @@ StreamFilterParent::Write(Data& aData)
 
 NS_IMETHODIMP
 StreamFilterParent::OnStartRequest(nsIRequest* aRequest, nsISupports* aContext)
 {
   AssertIsMainThread();
 
   mContext = aContext;
 
-  if (mState != State::Disconnected) {
+  if (aRequest != mChannel) {
+    mDisconnected = true;
+
+    RefPtr<StreamFilterParent> self(this);
+    RunOnActorThread(FUNC, [=] {
+      if (self->IPCActive()) {
+        self->mState = State::Disconnected;
+        CheckResult(self->SendError(NS_LITERAL_CSTRING("Channel redirected")));
+      }
+    });
+  }
+
+  if (!mDisconnected) {
     RefPtr<StreamFilterParent> self(this);
     RunOnActorThread(FUNC, [=] {
       if (self->IPCActive()) {
         self->mState = State::TransferringData;
         self->CheckResult(self->SendStartRequest());
       }
     });
   }
@@ -434,17 +456,17 @@ StreamFilterParent::OnStartRequest(nsIRe
 NS_IMETHODIMP
 StreamFilterParent::OnStopRequest(nsIRequest* aRequest,
                                   nsISupports* aContext,
                                   nsresult aStatusCode)
 {
   AssertIsMainThread();
 
   mReceivedStop = true;
-  if (mState == State::Disconnected) {
+  if (mDisconnected) {
     return EmitStopRequest(aStatusCode);
   }
 
   RefPtr<StreamFilterParent> self(this);
   RunOnActorThread(FUNC, [=] {
     if (self->IPCActive()) {
       self->CheckResult(self->SendStopRequest(aStatusCode));
     }
@@ -480,17 +502,17 @@ NS_IMETHODIMP
 StreamFilterParent::OnDataAvailable(nsIRequest* aRequest,
                                     nsISupports* aContext,
                                     nsIInputStream* aInputStream,
                                     uint64_t aOffset,
                                     uint32_t aCount)
 {
   AssertIsIOThread();
 
-  if (mState == State::Disconnected) {
+  if (mDisconnected) {
     // If we're offloading data in a thread pool, it's possible that we'll
     // have buffered some additional data while waiting for the buffer to
     // flush. So, if there's any buffered data left, flush that before we
     // flush this incoming data.
     //
     // Note: When in the eDisconnected state, the buffer list is guaranteed
     // never to be accessed by another thread during an OnDataAvailable call.
     if (!mBufferedData.isEmpty()) {
--- a/toolkit/components/extensions/webrequest/StreamFilterParent.h
+++ b/toolkit/components/extensions/webrequest/StreamFilterParent.h
@@ -88,16 +88,17 @@ protected:
   virtual ~StreamFilterParent();
 
   virtual IPCResult RecvWrite(Data&& aData) override;
   virtual IPCResult RecvFlushedData() override;
   virtual IPCResult RecvSuspend() override;
   virtual IPCResult RecvResume() override;
   virtual IPCResult RecvClose() override;
   virtual IPCResult RecvDisconnect() override;
+  virtual IPCResult RecvDestroy() override;
 
   virtual void DeallocPStreamFilterParent() override;
 
 private:
   bool IPCActive()
   {
     return (mState != State::Closed &&
             mState != State::Disconnecting &&
@@ -170,16 +171,17 @@ private:
   nsCOMPtr<nsIEventTarget> mIOThread;
 
   RefPtr<net::ChannelEventQueue> mQueue;
 
   Mutex mBufferMutex;
 
   bool mReceivedStop;
   bool mSentStop;
+  bool mDisconnected = false;
 
   nsCOMPtr<nsISupports> mContext;
   uint64_t mOffset;
 
   volatile State mState;
 };
 
 } // namespace extensions