servo: Merge #13774 - Finish up the implementation of EventSource (from KiChjang:event-source-constructor); r=jdm
authorKeith Yeung <kungfukeith11@gmail.com>
Sat, 12 Nov 2016 08:08:38 -0600
changeset 340139 e7c357e94e3065c10c28efea382865eaa2d4ca41
parent 340138 77c07e9b9dadb6ed1abea9281daa645f733331b3
child 340140 a2c4c7bb733cd999af7945c3b4cc210fcc1a89aa
push id31307
push usergszorc@mozilla.com
push dateSat, 04 Feb 2017 00:59:06 +0000
treeherdermozilla-central@94079d43835f [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersjdm
servo: Merge #13774 - Finish up the implementation of EventSource (from KiChjang:event-source-constructor); r=jdm Full implementation of EventSource, complete with closing and reopening streams. Fixes #8925. Source-Repo: https://github.com/servo/servo Source-Revision: 579ab2d99cd8c07a31c3b68a4659c484d5189ada
servo/components/net_traits/request.rs
servo/components/script/dom/bindings/trace.rs
servo/components/script/dom/bluetooth.rs
servo/components/script/dom/eventsource.rs
servo/components/script/dom/globalscope.rs
servo/components/script/dom/htmlimageelement.rs
servo/components/script/dom/htmllinkelement.rs
servo/components/script/dom/htmlmediaelement.rs
servo/components/script/dom/htmlscriptelement.rs
servo/components/script/dom/webidls/EventSource.webidl
servo/components/script/dom/websocket.rs
servo/components/script/dom/window.rs
servo/components/script/dom/workerglobalscope.rs
servo/components/script/dom/xmlhttprequest.rs
servo/components/script/fetch.rs
servo/components/script/lib.rs
servo/components/script/network_listener.rs
servo/components/script/script_thread.rs
servo/components/script/task_source/networking.rs
servo/components/script/timers.rs
--- a/servo/components/net_traits/request.rs
+++ b/servo/components/net_traits/request.rs
@@ -65,17 +65,17 @@ pub enum RequestMode {
 #[derive(Copy, Clone, PartialEq, Serialize, Deserialize, HeapSizeOf)]
 pub enum CredentialsMode {
     Omit,
     CredentialsSameOrigin,
     Include
 }
 
 /// [Cache mode](https://fetch.spec.whatwg.org/#concept-request-cache-mode)
-#[derive(Copy, Clone, PartialEq, HeapSizeOf)]
+#[derive(Copy, Clone, PartialEq, Serialize, Deserialize, HeapSizeOf)]
 pub enum CacheMode {
     Default,
     NoStore,
     Reload,
     NoCache,
     ForceCache,
     OnlyIfCached
 }
@@ -106,32 +106,35 @@ pub enum Window {
 
 /// [CORS settings attribute](https://html.spec.whatwg.org/multipage/#attr-crossorigin-anonymous)
 #[derive(Copy, Clone, PartialEq, Serialize, Deserialize)]
 pub enum CorsSettings {
     Anonymous,
     UseCredentials
 }
 
-#[derive(Serialize, Deserialize, Clone)]
+#[derive(Serialize, Deserialize, Clone, HeapSizeOf)]
 pub struct RequestInit {
     #[serde(deserialize_with = "::hyper_serde::deserialize",
             serialize_with = "::hyper_serde::serialize")]
+    #[ignore_heap_size_of = "Defined in hyper"]
     pub method: Method,
     pub url: Url,
     #[serde(deserialize_with = "::hyper_serde::deserialize",
             serialize_with = "::hyper_serde::serialize")]
+    #[ignore_heap_size_of = "Defined in hyper"]
     pub headers: Headers,
     pub unsafe_request: bool,
     pub body: Option<Vec<u8>>,
     // TODO: client object
     pub type_: Type,
     pub destination: Destination,
     pub synchronous: bool,
     pub mode: RequestMode,
+    pub cache_mode: CacheMode,
     pub use_cors_preflight: bool,
     pub credentials_mode: CredentialsMode,
     pub use_url_credentials: bool,
     // this should actually be set by fetch, but fetch
     // doesn't have info about the client right now
     pub origin: Url,
     // XXXManishearth these should be part of the client object
     pub referrer_url: Option<Url>,
@@ -147,16 +150,17 @@ impl Default for RequestInit {
             url: Url::parse("about:blank").unwrap(),
             headers: Headers::new(),
             unsafe_request: false,
             body: None,
             type_: Type::None,
             destination: Destination::None,
             synchronous: false,
             mode: RequestMode::NoCors,
+            cache_mode: CacheMode::Default,
             use_cors_preflight: false,
             credentials_mode: CredentialsMode::Omit,
             use_url_credentials: false,
             origin: Url::parse("about:blank").unwrap(),
             referrer_url: None,
             referrer_policy: None,
             pipeline_id: None,
             redirect_mode: RedirectMode::Follow,
@@ -256,16 +260,17 @@ impl Request {
         *req.body.borrow_mut() = init.body;
         req.type_ = init.type_;
         req.destination = init.destination;
         req.synchronous = init.synchronous;
         req.mode = init.mode;
         req.use_cors_preflight = init.use_cors_preflight;
         req.credentials_mode = init.credentials_mode;
         req.use_url_credentials = init.use_url_credentials;
+        req.cache_mode.set(init.cache_mode);
         *req.referrer.borrow_mut() = if let Some(url) = init.referrer_url {
             Referrer::ReferrerUrl(url)
         } else {
             Referrer::NoReferrer
         };
         req.referrer_policy.set(init.referrer_policy);
         req.pipeline_id.set(init.pipeline_id);
         req.redirect_mode.set(init.redirect_mode);
--- a/servo/components/script/dom/bindings/trace.rs
+++ b/servo/components/script/dom/bindings/trace.rs
@@ -58,17 +58,17 @@ use js::jsapi::{GCTraceKindToAscii, Heap
 use js::jsval::JSVal;
 use js::rust::Runtime;
 use libc;
 use msg::constellation_msg::{FrameId, FrameType, PipelineId};
 use net_traits::{Metadata, NetworkError, ReferrerPolicy, ResourceThreads};
 use net_traits::filemanager_thread::RelativePos;
 use net_traits::image::base::{Image, ImageMetadata};
 use net_traits::image_cache_thread::{ImageCacheChan, ImageCacheThread};
-use net_traits::request::Request;
+use net_traits::request::{Request, RequestInit};
 use net_traits::response::{Response, ResponseBody};
 use net_traits::response::HttpsState;
 use net_traits::storage_thread::StorageType;
 use offscreen_gl_context::GLLimits;
 use profile_traits::mem::ProfilerChan as MemProfilerChan;
 use profile_traits::time::ProfilerChan as TimeProfilerChan;
 use script_layout_interface::OpaqueStyleAndLayoutData;
 use script_layout_interface::reporter::CSSErrorReporter;
@@ -344,16 +344,17 @@ no_jsmanaged_fields!(Length);
 no_jsmanaged_fields!(ElementState);
 no_jsmanaged_fields!(DOMString);
 no_jsmanaged_fields!(Mime);
 no_jsmanaged_fields!(AttrIdentifier);
 no_jsmanaged_fields!(AttrValue);
 no_jsmanaged_fields!(Snapshot);
 no_jsmanaged_fields!(HttpsState);
 no_jsmanaged_fields!(Request);
+no_jsmanaged_fields!(RequestInit);
 no_jsmanaged_fields!(SharedRt);
 no_jsmanaged_fields!(TouchpadPressurePhase);
 no_jsmanaged_fields!(USVString);
 no_jsmanaged_fields!(ReferrerPolicy);
 no_jsmanaged_fields!(Response);
 no_jsmanaged_fields!(ResponseBody);
 no_jsmanaged_fields!(ResourceThreads);
 no_jsmanaged_fields!(StatusCode);
--- a/servo/components/script/dom/bluetooth.rs
+++ b/servo/components/script/dom/bluetooth.rs
@@ -143,24 +143,24 @@ impl Bluetooth {
         self.get_bluetooth_thread().send(BluetoothRequest::RequestDevice(option, sender)).unwrap();
     }
 }
 
 pub fn response_async<T: AsyncBluetoothListener + Reflectable + 'static>(
         promise: &Rc<Promise>,
         receiver: &T) -> IpcSender<BluetoothResponseResult> {
     let (action_sender, action_receiver) = ipc::channel().unwrap();
-    let chan = receiver.global().networking_task_source();
+    let task_source = receiver.global().networking_task_source();
     let context = Arc::new(Mutex::new(BluetoothContext {
         promise: Some(TrustedPromise::new(promise.clone())),
         receiver: Trusted::new(receiver),
     }));
     let listener = NetworkListener {
         context: context,
-        script_chan: chan,
+        task_source: task_source,
         wrapper: None,
     };
     ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
         listener.notify_response(message.to().unwrap());
     });
     action_sender
 }
 
--- a/servo/components/script/dom/eventsource.rs
+++ b/servo/components/script/dom/eventsource.rs
@@ -1,79 +1,411 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 use dom::bindings::cell::DOMRefCell;
 use dom::bindings::codegen::Bindings::EventHandlerBinding::EventHandlerNonNull;
 use dom::bindings::codegen::Bindings::EventSourceBinding::{EventSourceInit, EventSourceMethods, Wrap};
 use dom::bindings::error::{Error, Fallible};
+use dom::bindings::inheritance::Castable;
 use dom::bindings::js::Root;
-use dom::bindings::reflector::reflect_dom_object;
+use dom::bindings::refcounted::Trusted;
+use dom::bindings::reflector::{Reflectable, reflect_dom_object};
 use dom::bindings::str::DOMString;
+use dom::event::Event;
 use dom::eventtarget::EventTarget;
 use dom::globalscope::GlobalScope;
+use dom::messageevent::MessageEvent;
+use encoding::Encoding;
+use encoding::all::UTF_8;
+use euclid::length::Length;
+use hyper::header::{Accept, qitem};
+use ipc_channel::ipc;
+use ipc_channel::router::ROUTER;
+use js::conversions::ToJSValConvertible;
+use js::jsapi::JSAutoCompartment;
+use js::jsval::UndefinedValue;
+use mime::{Mime, TopLevel, SubLevel};
+use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseMsg, FetchResponseListener, NetworkError};
+use net_traits::request::{CacheMode, CorsSettings, CredentialsMode};
+use net_traits::request::{RequestInit, RequestMode};
+use network_listener::{NetworkListener, PreInvoke};
+use script_thread::Runnable;
+use servo_atoms::Atom;
 use std::cell::Cell;
+use std::mem;
+use std::str::{Chars, FromStr};
+use std::sync::{Arc, Mutex};
+use task_source::TaskSource;
+use timers::OneshotTimerCallback;
 use url::Url;
 
+header! { (LastEventId, "Last-Event-ID") => [String] }
+
+const DEFAULT_RECONNECTION_TIME: u64 = 5000;
+
 #[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)]
-enum EventSourceReadyState {
+struct GenerationId(u32);
+
+#[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)]
+/// https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate
+enum ReadyState {
     Connecting = 0,
-    #[allow(dead_code)]
     Open = 1,
     Closed = 2
 }
 
 #[dom_struct]
 pub struct EventSource {
     eventtarget: EventTarget,
     url: Url,
-    ready_state: Cell<EventSourceReadyState>,
+    request: DOMRefCell<Option<RequestInit>>,
+    last_event_id: DOMRefCell<DOMString>,
+    reconnection_time: Cell<u64>,
+    generation_id: Cell<GenerationId>,
+
+    ready_state: Cell<ReadyState>,
     with_credentials: bool,
-    last_event_id: DOMRefCell<DOMString>
+}
+
+enum ParserState {
+    Field,
+    Comment,
+    Value,
+    Eol
+}
+
+struct EventSourceContext {
+    event_source: Trusted<EventSource>,
+    gen_id: GenerationId,
+    action_sender: ipc::IpcSender<FetchResponseMsg>,
+
+    parser_state: ParserState,
+    field: String,
+    value: String,
+    origin: String,
+
+    event_type: String,
+    data: String,
+    last_event_id: String,
+}
+
+impl EventSourceContext {
+    fn announce_the_connection(&self) {
+        let event_source = self.event_source.root();
+        if self.gen_id != event_source.generation_id.get() {
+            return;
+        }
+        let runnable = box AnnounceConnectionRunnable {
+            event_source: self.event_source.clone()
+        };
+        let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
+    }
+
+    fn fail_the_connection(&self) {
+        let event_source = self.event_source.root();
+        if self.gen_id != event_source.generation_id.get() {
+            return;
+        }
+        let runnable = box FailConnectionRunnable {
+            event_source: self.event_source.clone()
+        };
+        let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
+    }
+
+    // https://html.spec.whatwg.org/multipage/#reestablish-the-connection
+    fn reestablish_the_connection(&self) {
+        let event_source = self.event_source.root();
+
+        if self.gen_id != event_source.generation_id.get() {
+            return;
+        }
+
+        // Step 1
+        let runnable = box ReestablishConnectionRunnable {
+            event_source: self.event_source.clone(),
+            action_sender: self.action_sender.clone()
+        };
+        let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
+    }
+
+    // https://html.spec.whatwg.org/multipage/#processField
+    fn process_field(&mut self) {
+        match &*self.field {
+            "event" => mem::swap(&mut self.event_type, &mut self.value),
+            "data" => {
+                self.data.push_str(&self.value);
+                self.data.push('\n');
+            }
+            "id" => mem::swap(&mut self.last_event_id, &mut self.value),
+            "retry" => if let Ok(time) = u64::from_str(&self.value) {
+                self.event_source.root().reconnection_time.set(time);
+            },
+            _ => ()
+        }
+
+        self.field.clear();
+        self.value.clear();
+    }
+
+    // https://html.spec.whatwg.org/multipage/#dispatchMessage
+    #[allow(unsafe_code)]
+    fn dispatch_event(&mut self) {
+        let event_source = self.event_source.root();
+        // Step 1
+        *event_source.last_event_id.borrow_mut() = DOMString::from(self.last_event_id.clone());
+        // Step 2
+        if self.data.is_empty() {
+            self.data.clear();
+            self.event_type.clear();
+            return;
+        }
+        // Step 3
+        if let Some(last) = self.data.pop() {
+            if last != '\n' {
+                self.data.push(last);
+            }
+        }
+        // Step 6
+        let type_ = if !self.event_type.is_empty() {
+            Atom::from(self.event_type.clone())
+        } else {
+            atom!("message")
+        };
+        // Steps 4-5
+        let event = {
+            let _ac = JSAutoCompartment::new(event_source.global().get_cx(),
+                                             event_source.reflector().get_jsobject().get());
+            rooted!(in(event_source.global().get_cx()) let mut data = UndefinedValue());
+            unsafe { self.data.to_jsval(event_source.global().get_cx(), data.handle_mut()) };
+            MessageEvent::new(&*event_source.global(), type_, false, false, data.handle(),
+                              DOMString::from(self.origin.clone()),
+                              event_source.last_event_id.borrow().clone())
+        };
+        // Step 7
+        self.event_type.clear();
+        self.data.clear();
+        // Step 8
+        let runnable = box DispatchEventRunnable {
+            event_source: self.event_source.clone(),
+            event: Trusted::new(&event)
+        };
+        let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
+    }
+
+    // https://html.spec.whatwg.org/multipage/#event-stream-interpretation
+    fn parse(&mut self, stream: Chars) {
+        let mut stream = stream.peekable();
+
+        while let Some(ch) = stream.next() {
+            match (ch, &self.parser_state) {
+                (':', &ParserState::Eol) => self.parser_state = ParserState::Comment,
+                (':', &ParserState::Field) => {
+                    self.parser_state = ParserState::Value;
+                    if let Some(&' ') = stream.peek() {
+                        stream.next();
+                    }
+                }
+
+                ('\n', &ParserState::Value) => {
+                    self.parser_state = ParserState::Eol;
+                    self.process_field();
+                }
+                ('\r', &ParserState::Value) => {
+                    if let Some(&'\n') = stream.peek() {
+                        continue;
+                    }
+                    self.parser_state = ParserState::Eol;
+                    self.process_field();
+                }
+
+                ('\n', &ParserState::Field) => {
+                    self.parser_state = ParserState::Eol;
+                    self.process_field();
+                }
+                ('\r', &ParserState::Field) => {
+                    if let Some(&'\n') = stream.peek() {
+                        continue;
+                    }
+                    self.parser_state = ParserState::Eol;
+                    self.process_field();
+                }
+
+                ('\n', &ParserState::Eol) => self.dispatch_event(),
+                ('\r', &ParserState::Eol) => {
+                    if let Some(&'\n') = stream.peek() {
+                        continue;
+                    }
+                    self.dispatch_event();
+                }
+
+                ('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol,
+                ('\r', &ParserState::Comment) => {
+                    if let Some(&'\n') = stream.peek() {
+                        continue;
+                    }
+                    self.parser_state = ParserState::Eol;
+                }
+
+                (_, &ParserState::Field) => self.field.push(ch),
+                (_, &ParserState::Value) => self.value.push(ch),
+                (_, &ParserState::Eol) => {
+                    self.parser_state = ParserState::Field;
+                    self.field.push(ch);
+                }
+                (_, &ParserState::Comment) => (),
+            }
+        }
+    }
+}
+
+impl FetchResponseListener for EventSourceContext {
+    fn process_request_body(&mut self) {
+        // TODO
+    }
+
+    fn process_request_eof(&mut self) {
+        // TODO
+    }
+
+    fn process_response(&mut self, metadata: Result<FetchMetadata, NetworkError>) {
+        match metadata {
+            Ok(fm) => {
+                let meta = match fm {
+                    FetchMetadata::Unfiltered(m) => m,
+                    FetchMetadata::Filtered { unsafe_, .. } => unsafe_
+                };
+                match meta.content_type {
+                    None => self.fail_the_connection(),
+                    Some(ct) => match ct.into_inner().0 {
+                        Mime(TopLevel::Text, SubLevel::EventStream, _) => {
+                            self.origin = meta.final_url.origin().unicode_serialization();
+                            self.announce_the_connection();
+                        }
+                        _ => self.fail_the_connection()
+                    }
+                }
+            }
+            Err(_) => {
+                self.reestablish_the_connection();
+            }
+        }
+    }
+
+    fn process_response_chunk(&mut self, chunk: Vec<u8>) {
+        let mut stream = String::new();
+        UTF_8.raw_decoder().raw_feed(&chunk, &mut stream);
+        self.parse(stream.chars())
+    }
+
+    fn process_response_eof(&mut self, _response: Result<(), NetworkError>) {
+        self.reestablish_the_connection();
+    }
+}
+
+impl PreInvoke for EventSourceContext {
+    fn should_invoke(&self) -> bool {
+        self.event_source.root().generation_id.get() == self.gen_id
+    }
 }
 
 impl EventSource {
     fn new_inherited(url: Url, with_credentials: bool) -> EventSource {
         EventSource {
             eventtarget: EventTarget::new_inherited(),
             url: url,
-            ready_state: Cell::new(EventSourceReadyState::Connecting),
+            request: DOMRefCell::new(None),
+            last_event_id: DOMRefCell::new(DOMString::from("")),
+            reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME),
+            generation_id: Cell::new(GenerationId(0)),
+
+            ready_state: Cell::new(ReadyState::Connecting),
             with_credentials: with_credentials,
-            last_event_id: DOMRefCell::new(DOMString::from(""))
         }
     }
 
     fn new(global: &GlobalScope, url: Url, with_credentials: bool) -> Root<EventSource> {
         reflect_dom_object(box EventSource::new_inherited(url, with_credentials),
                            global,
                            Wrap)
     }
 
+    pub fn request(&self) -> RequestInit {
+        self.request.borrow().clone().unwrap()
+    }
+
     pub fn Constructor(global: &GlobalScope,
-                       url_str: DOMString,
+                       url: DOMString,
                        event_source_init: &EventSourceInit) -> Fallible<Root<EventSource>> {
-        // Steps 1-2
-        let base_url = global.get_url();
-        let url = match base_url.join(&*url_str) {
+        // TODO: Step 2 relevant settings object
+        // Step 3
+        let base_url = global.api_base_url();
+        let url_record = match base_url.join(&*url) {
             Ok(u) => u,
+            //  Step 4
             Err(_) => return Err(Error::Syntax)
         };
-        // Step 3
-        let event_source = EventSource::new(global, url, event_source_init.withCredentials);
-        // Step 4
-        // Step 5
-        // Step 6
-        // Step 7
+        // Step 1, 5
+        let ev = EventSource::new(global, url_record.clone(), event_source_init.withCredentials);
+        // Steps 6-7
+        let cors_attribute_state = if event_source_init.withCredentials {
+            CorsSettings::UseCredentials
+        } else {
+            CorsSettings::Anonymous
+        };
         // Step 8
-        // Step 9
+        // TODO: Step 9 set request's client settings
+        let mut request = RequestInit {
+            url: url_record,
+            origin: global.get_url(),
+            pipeline_id: Some(global.pipeline_id()),
+            // https://html.spec.whatwg.org/multipage/#create-a-potential-cors-request
+            use_url_credentials: true,
+            mode: RequestMode::CorsMode,
+            credentials_mode: if cors_attribute_state == CorsSettings::Anonymous {
+                CredentialsMode::CredentialsSameOrigin
+            } else {
+                CredentialsMode::Include
+            },
+            ..RequestInit::default()
+        };
         // Step 10
+        request.headers.set(Accept(vec![qitem(mime!(Text / EventStream))]));
         // Step 11
-        Ok(event_source)
+        request.cache_mode = CacheMode::NoStore;
         // Step 12
+        *ev.request.borrow_mut() = Some(request.clone());
+        // Step 14
+        let (action_sender, action_receiver) = ipc::channel().unwrap();
+        let context = EventSourceContext {
+            event_source: Trusted::new(&ev),
+            gen_id: ev.generation_id.get(),
+            action_sender: action_sender.clone(),
+
+            parser_state: ParserState::Eol,
+            field: String::new(),
+            value: String::new(),
+            origin: String::new(),
+
+            event_type: String::new(),
+            data: String::new(),
+            last_event_id: String::new(),
+        };
+        let listener = NetworkListener {
+            context: Arc::new(Mutex::new(context)),
+            task_source: global.networking_task_source(),
+            wrapper: Some(global.get_runnable_wrapper())
+        };
+        ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
+            listener.notify_fetch(message.to().unwrap());
+        });
+        global.core_resource_thread().send(CoreResourceMsg::Fetch(request, action_sender)).unwrap();
+        // Step 13
+        Ok(ev)
     }
 }
 
 impl EventSourceMethods for EventSource {
     // https://html.spec.whatwg.org/multipage/#handler-eventsource-onopen
     event_handler!(open, GetOnopen, SetOnopen);
 
     // https://html.spec.whatwg.org/multipage/#handler-eventsource-onmessage
@@ -94,12 +426,124 @@ impl EventSourceMethods for EventSource 
 
     // https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate
     fn ReadyState(&self) -> u16 {
         self.ready_state.get() as u16
     }
 
     // https://html.spec.whatwg.org/multipage/#dom-eventsource-close
     fn Close(&self) {
-        self.ready_state.set(EventSourceReadyState::Closed);
-        // TODO: Terminate ongoing fetch
+        let GenerationId(prev_id) = self.generation_id.get();
+        self.generation_id.set(GenerationId(prev_id + 1));
+        self.ready_state.set(ReadyState::Closed);
+    }
+}
+
+pub struct AnnounceConnectionRunnable {
+    event_source: Trusted<EventSource>,
+}
+
+impl Runnable for AnnounceConnectionRunnable {
+    fn name(&self) -> &'static str { "EventSource AnnounceConnectionRunnable" }
+
+    // https://html.spec.whatwg.org/multipage/#announce-the-connection
+    fn handler(self: Box<AnnounceConnectionRunnable>) {
+        let event_source = self.event_source.root();
+        if event_source.ready_state.get() != ReadyState::Closed {
+            event_source.ready_state.set(ReadyState::Open);
+            event_source.upcast::<EventTarget>().fire_event(atom!("open"));
+        }
+    }
+}
+
+pub struct FailConnectionRunnable {
+    event_source: Trusted<EventSource>,
+}
+
+impl Runnable for FailConnectionRunnable {
+    fn name(&self) -> &'static str { "EventSource FailConnectionRunnable" }
+
+    // https://html.spec.whatwg.org/multipage/#fail-the-connection
+    fn handler(self: Box<FailConnectionRunnable>) {
+        let event_source = self.event_source.root();
+        if event_source.ready_state.get() != ReadyState::Closed {
+            event_source.ready_state.set(ReadyState::Closed);
+            event_source.upcast::<EventTarget>().fire_event(atom!("error"));
+        }
     }
 }
+
+pub struct ReestablishConnectionRunnable {
+    event_source: Trusted<EventSource>,
+    action_sender: ipc::IpcSender<FetchResponseMsg>,
+}
+
+impl Runnable for ReestablishConnectionRunnable {
+    fn name(&self) -> &'static str { "EventSource ReestablishConnectionRunnable" }
+
+    // https://html.spec.whatwg.org/multipage/#reestablish-the-connection
+    fn handler(self: Box<ReestablishConnectionRunnable>) {
+        let event_source = self.event_source.root();
+        // Step 1.1
+        if event_source.ready_state.get() == ReadyState::Closed {
+            return;
+        }
+        // Step 1.2
+        event_source.ready_state.set(ReadyState::Connecting);
+        // Step 1.3
+        event_source.upcast::<EventTarget>().fire_event(atom!("error"));
+        // Step 2
+        let duration = Length::new(event_source.reconnection_time.get());
+        // TODO Step 3: Optionally wait some more
+        // Steps 4-5
+        let callback = OneshotTimerCallback::EventSourceTimeout(EventSourceTimeoutCallback {
+            event_source: self.event_source.clone(),
+            action_sender: self.action_sender.clone()
+        });
+        let _ = event_source.global().schedule_callback(callback, duration);
+    }
+}
+
+#[derive(JSTraceable, HeapSizeOf)]
+pub struct EventSourceTimeoutCallback {
+    #[ignore_heap_size_of = "Because it is non-owning"]
+    event_source: Trusted<EventSource>,
+    #[ignore_heap_size_of = "Because it is non-owning"]
+    action_sender: ipc::IpcSender<FetchResponseMsg>,
+}
+
+impl EventSourceTimeoutCallback {
+    // https://html.spec.whatwg.org/multipage/#reestablish-the-connection
+    pub fn invoke(self) {
+        let event_source = self.event_source.root();
+        let global = event_source.global();
+        // Step 5.1
+        if event_source.ready_state.get() != ReadyState::Connecting {
+            return;
+        }
+        // Step 5.2
+        let mut request = event_source.request();
+        // Step 5.3
+        if !event_source.last_event_id.borrow().is_empty() {
+            request.headers.set(LastEventId(String::from(event_source.last_event_id.borrow().clone())));
+        }
+        // Step 5.4
+        global.core_resource_thread().send(CoreResourceMsg::Fetch(request, self.action_sender)).unwrap();
+    }
+}
+
+pub struct DispatchEventRunnable {
+    event_source: Trusted<EventSource>,
+    event: Trusted<MessageEvent>,
+}
+
+impl Runnable for DispatchEventRunnable {
+    fn name(&self) -> &'static str { "EventSource DispatchEventRunnable" }
+
+    // https://html.spec.whatwg.org/multipage/#dispatchMessage
+    fn handler(self: Box<DispatchEventRunnable>) {
+        let event_source = self.event_source.root();
+        // Step 8
+        if event_source.ready_state.get() != ReadyState::Closed {
+            self.event.root().upcast::<Event>().fire(&event_source.upcast());
+        }
+    }
+}
--- a/servo/components/script/dom/globalscope.rs
+++ b/servo/components/script/dom/globalscope.rs
@@ -37,16 +37,17 @@ use script_thread::{MainThreadScriptChan
 use script_traits::{MsDuration, ScriptMsg as ConstellationMsg, TimerEvent};
 use script_traits::{TimerEventId, TimerEventRequest, TimerSource};
 use std::cell::Cell;
 use std::collections::HashMap;
 use std::collections::hash_map::Entry;
 use std::ffi::CString;
 use std::panic;
 use task_source::file_reading::FileReadingTaskSource;
+use task_source::networking::NetworkingTaskSource;
 use time::{Timespec, get_time};
 use timers::{IsInterval, OneshotTimerCallback, OneshotTimerHandle};
 use timers::{OneshotTimers, TimerCallback};
 use url::Url;
 
 #[dom_struct]
 pub struct GlobalScope {
     eventtarget: EventTarget,
@@ -320,22 +321,22 @@ impl GlobalScope {
         if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
             return worker.script_chan();
         }
         unreachable!();
     }
 
     /// `ScriptChan` to send messages to the networking task source of
     /// this of this global scope.
-    pub fn networking_task_source(&self) -> Box<ScriptChan + Send> {
+    pub fn networking_task_source(&self) -> NetworkingTaskSource {
         if let Some(window) = self.downcast::<Window>() {
             return window.networking_task_source();
         }
         if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
-            return worker.script_chan();
+            return worker.networking_task_source();
         }
         unreachable!();
     }
 
     /// Evaluate JS code on this global scope.
     pub fn evaluate_js_on_global_with_result(
             &self, code: &str, rval: MutableHandleValue) {
         self.evaluate_script_on_global_with_result(code, "", rval)
--- a/servo/components/script/dom/htmlimageelement.rs
+++ b/servo/components/script/dom/htmlimageelement.rs
@@ -21,18 +21,16 @@ use dom::htmlelement::HTMLElement;
 use dom::node::{Node, NodeDamage, document_from_node, window_from_node};
 use dom::values::UNSIGNED_LONG_MAX;
 use dom::virtualmethods::VirtualMethods;
 use html5ever_atoms::LocalName;
 use ipc_channel::ipc;
 use ipc_channel::router::ROUTER;
 use net_traits::image::base::{Image, ImageMetadata};
 use net_traits::image_cache_thread::{ImageResponder, ImageResponse};
-use script_runtime::CommonScriptMsg;
-use script_runtime::ScriptThreadEventCategory::UpdateReplacedElement;
 use script_thread::Runnable;
 use std::i32;
 use std::sync::Arc;
 use style::attr::{AttrValue, LengthOrPercentageOrAuto};
 use task_source::TaskSource;
 use url::Url;
 
 #[derive(JSTraceable, HeapSizeOf)]
@@ -135,27 +133,25 @@ impl HTMLImageElement {
             Some((src, base_url)) => {
                 let img_url = base_url.join(&src);
                 if let Ok(img_url) = img_url {
                     self.current_request.borrow_mut().parsed_url = Some(img_url.clone());
                     self.current_request.borrow_mut().source_url = Some(src);
 
                     let trusted_node = Trusted::new(self);
                     let (responder_sender, responder_receiver) = ipc::channel().unwrap();
-                    let script_chan = window.networking_task_source();
+                    let task_source = window.networking_task_source();
                     let wrapper = window.get_runnable_wrapper();
                     ROUTER.add_route(responder_receiver.to_opaque(), box move |message| {
                         // Return the image via a message to the script thread, which marks the element
                         // as dirty and triggers a reflow.
                         let image_response = message.to().unwrap();
                         let runnable = box ImageResponseHandlerRunnable::new(
                             trusted_node.clone(), image_response);
-                        let runnable = wrapper.wrap_runnable(runnable);
-                        let _ = script_chan.send(CommonScriptMsg::RunnableMsg(
-                            UpdateReplacedElement, runnable));
+                        let _ = task_source.queue_with_wrapper(runnable, &wrapper);
                     });
 
                     image_cache.request_image_and_metadata(img_url,
                                               window.image_cache_chan(),
                                               Some(ImageResponder::new(responder_sender)));
                 } else {
                     // https://html.spec.whatwg.org/multipage/#update-the-image-data
                     // Step 11 (error substeps)
--- a/servo/components/script/dom/htmllinkelement.rs
+++ b/servo/components/script/dom/htmllinkelement.rs
@@ -238,18 +238,18 @@ impl HTMLLinkElement {
             data: vec!(),
             metadata: None,
             url: url.clone(),
         }));
 
         let (action_sender, action_receiver) = ipc::channel().unwrap();
         let listener = NetworkListener {
             context: context,
-            script_chan: document.window().networking_task_source(),
-            wrapper: Some(document.window().get_runnable_wrapper()),
+            task_source: document.window().networking_task_source(),
+            wrapper: Some(document.window().get_runnable_wrapper())
         };
         ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
             listener.notify_fetch(message.to().unwrap());
         });
 
         if self.parser_inserted.get() {
             document.increment_script_blocking_stylesheet_count();
         }
--- a/servo/components/script/dom/htmlmediaelement.rs
+++ b/servo/components/script/dom/htmlmediaelement.rs
@@ -516,21 +516,20 @@ impl HTMLMediaElement {
                 // TODO 4.1.5-7 (state for load that initiates later)
                 return;
             }
 
             // 4.2
             let context = Arc::new(Mutex::new(HTMLMediaElementContext::new(self, url.clone())));
             let (action_sender, action_receiver) = ipc::channel().unwrap();
             let window = window_from_node(self);
-            let script_chan = window.networking_task_source();
             let listener = NetworkListener {
                 context: context,
-                script_chan: script_chan,
-                wrapper: Some(window.get_runnable_wrapper()),
+                task_source: window.networking_task_source(),
+                wrapper: Some(window.get_runnable_wrapper())
             };
 
             ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
                 listener.notify_fetch(message.to().unwrap());
             });
 
             // FIXME: we're supposed to block the load event much earlier than now
             let document = document_from_node(self);
--- a/servo/components/script/dom/htmlscriptelement.rs
+++ b/servo/components/script/dom/htmlscriptelement.rs
@@ -257,18 +257,18 @@ fn fetch_a_classic_script(script: &HTMLS
         metadata: None,
         url: url.clone(),
         status: Ok(())
     }));
 
     let (action_sender, action_receiver) = ipc::channel().unwrap();
     let listener = NetworkListener {
         context: context,
-        script_chan: doc.window().networking_task_source(),
-        wrapper: Some(doc.window().get_runnable_wrapper()),
+        task_source: doc.window().networking_task_source(),
+        wrapper: Some(doc.window().get_runnable_wrapper())
     };
 
     ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
         listener.notify_fetch(message.to().unwrap());
     });
     doc.fetch_async(LoadType::Script(url), request, action_sender);
 }
 
--- a/servo/components/script/dom/webidls/EventSource.webidl
+++ b/servo/components/script/dom/webidls/EventSource.webidl
@@ -2,18 +2,17 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 /*
  * The origin of this IDL file is:
  * https://html.spec.whatwg.org/multipage/#eventsource
  */
 
 [Constructor(DOMString url, optional EventSourceInit eventSourceInitDict),
- Exposed=(Window,Worker),
- Pref="dom.eventsource.enabled"]
+ Exposed=(Window,Worker)]
 interface EventSource : EventTarget {
   readonly attribute DOMString url;
   readonly attribute boolean withCredentials;
 
   // ready state
   const unsigned short CONNECTING = 0;
   const unsigned short OPEN = 1;
   const unsigned short CLOSED = 2;
--- a/servo/components/script/dom/websocket.rs
+++ b/servo/components/script/dom/websocket.rs
@@ -28,24 +28,26 @@ use js::jsapi::JSAutoCompartment;
 use js::jsval::UndefinedValue;
 use libc::{uint32_t, uint8_t};
 use net_traits::{WebSocketCommunicate, WebSocketConnectData, WebSocketDomAction, WebSocketNetworkEvent};
 use net_traits::CookieSource::HTTP;
 use net_traits::CoreResourceMsg::{SetCookiesForUrl, WebsocketConnect};
 use net_traits::MessageData;
 use net_traits::hosts::replace_hosts;
 use net_traits::unwrap_websocket_protocol;
-use script_runtime::{CommonScriptMsg, ScriptChan};
+use script_runtime::CommonScriptMsg;
 use script_runtime::ScriptThreadEventCategory::WebSocketEvent;
-use script_thread::Runnable;
+use script_thread::{Runnable, RunnableWrapper};
 use std::ascii::AsciiExt;
 use std::borrow::ToOwned;
 use std::cell::Cell;
 use std::ptr;
 use std::thread;
+use task_source::TaskSource;
+use task_source::networking::NetworkingTaskSource;
 use websocket::client::request::Url;
 use websocket::header::{Headers, WebSocketProtocol};
 use websocket::ws::util::url::parse_url;
 
 #[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)]
 enum WebSocketRequestState {
     Connecting = 0,
     Open = 1,
@@ -136,36 +138,39 @@ mod close_code {
     pub const POLICY_VIOLATION: u16 = 1008;
     pub const TOO_LARGE: u16 = 1009;
     pub const EXTENSION_MISSING: u16 = 1010;
     pub const INTERNAL_ERROR: u16 = 1011;
     pub const TLS_FAILED: u16 = 1015;
 }
 
 pub fn close_the_websocket_connection(address: Trusted<WebSocket>,
-                                      sender: Box<ScriptChan>,
+                                      task_source: &NetworkingTaskSource,
+                                      wrapper: &RunnableWrapper,
                                       code: Option<u16>,
                                       reason: String) {
     let close_task = box CloseTask {
         address: address,
         failed: false,
         code: code,
         reason: Some(reason),
     };
-    sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, close_task)).unwrap();
+    task_source.queue_with_wrapper(close_task, &wrapper).unwrap();
 }
 
-pub fn fail_the_websocket_connection(address: Trusted<WebSocket>, sender: Box<ScriptChan>) {
+pub fn fail_the_websocket_connection(address: Trusted<WebSocket>,
+                                     task_source: &NetworkingTaskSource,
+                                     wrapper: &RunnableWrapper) {
     let close_task = box CloseTask {
         address: address,
         failed: true,
         code: Some(close_code::ABNORMAL),
         reason: None,
     };
-    sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, close_task)).unwrap();
+    task_source.queue_with_wrapper(close_task, &wrapper).unwrap();
 }
 
 #[dom_struct]
 pub struct WebSocket {
     eventtarget: EventTarget,
     url: Url,
     ready_state: Cell<WebSocketRequestState>,
     buffered_amount: Cell<u64>,
@@ -263,40 +268,43 @@ impl WebSocket {
             action_receiver: resource_action_receiver,
         };
 
         let _ = global.core_resource_thread().send(WebsocketConnect(connect, connect_data));
 
         *ws.sender.borrow_mut() = Some(dom_action_sender);
 
         let moved_address = address.clone();
-        let sender = global.networking_task_source();
+        let task_source = global.networking_task_source();
+        let wrapper = global.get_runnable_wrapper();
         thread::spawn(move || {
             while let Ok(event) = dom_event_receiver.recv() {
                 match event {
                     WebSocketNetworkEvent::ConnectionEstablished(headers, protocols) => {
                         let open_thread = box ConnectionEstablishedTask {
                             address: moved_address.clone(),
                             headers: headers,
                             protocols: protocols,
                         };
-                        sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, open_thread)).unwrap();
+                        task_source.queue_with_wrapper(open_thread, &wrapper).unwrap();
                     },
                     WebSocketNetworkEvent::MessageReceived(message) => {
                         let message_thread = box MessageReceivedTask {
                             address: moved_address.clone(),
                             message: message,
                         };
-                        sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, message_thread)).unwrap();
+                        task_source.queue_with_wrapper(message_thread, &wrapper).unwrap();
                     },
                     WebSocketNetworkEvent::Fail => {
-                        fail_the_websocket_connection(moved_address.clone(), sender.clone());
+                        fail_the_websocket_connection(moved_address.clone(),
+                            &task_source, &wrapper);
                     },
                     WebSocketNetworkEvent::Close(code, reason) => {
-                        close_the_websocket_connection(moved_address.clone(), sender.clone(), code, reason);
+                        close_the_websocket_connection(moved_address.clone(),
+                            &task_source, &wrapper, code, reason);
                     },
                 }
             }
         });
         // Step 7.
         Ok(ws)
     }
 
@@ -431,18 +439,18 @@ impl WebSocketMethods for WebSocket {
         match self.ready_state.get() {
             WebSocketRequestState::Closing | WebSocketRequestState::Closed  => {} //Do nothing
             WebSocketRequestState::Connecting => { //Connection is not yet established
                 /*By setting the state to closing, the open function
                   will abort connecting the websocket*/
                 self.ready_state.set(WebSocketRequestState::Closing);
 
                 let address = Trusted::new(self);
-                let sender = self.global().networking_task_source();
-                fail_the_websocket_connection(address, sender);
+                let task_source = self.global().networking_task_source();
+                fail_the_websocket_connection(address, &task_source, &self.global().get_runnable_wrapper());
             }
             WebSocketRequestState::Open => {
                 self.ready_state.set(WebSocketRequestState::Closing);
 
                 // Kick off _Start the WebSocket Closing Handshake_
                 // https://tools.ietf.org/html/rfc6455#section-7.1.2
                 let reason = reason.map(|reason| reason.0);
                 let mut other_sender = self.sender.borrow_mut();
@@ -465,18 +473,18 @@ struct ConnectionEstablishedTask {
 impl Runnable for ConnectionEstablishedTask {
     fn name(&self) -> &'static str { "ConnectionEstablishedTask" }
 
     fn handler(self: Box<Self>) {
         let ws = self.address.root();
 
         // Step 1: Protocols.
         if !self.protocols.is_empty() && self.headers.get::<WebSocketProtocol>().is_none() {
-            let sender = ws.global().networking_task_source();
-            fail_the_websocket_connection(self.address, sender);
+            let task_source = ws.global().networking_task_source();
+            fail_the_websocket_connection(self.address, &task_source, &ws.global().get_runnable_wrapper());
             return;
         }
 
         // Step 2.
         ws.ready_state.set(WebSocketRequestState::Open);
 
         // Step 3: Extensions.
         //TODO: Set extensions to extensions in use
--- a/servo/components/script/dom/window.rs
+++ b/servo/components/script/dom/window.rs
@@ -262,17 +262,17 @@ impl Window {
     pub fn dom_manipulation_task_source(&self) -> DOMManipulationTaskSource {
         self.dom_manipulation_task_source.clone()
     }
 
     pub fn user_interaction_task_source(&self) -> UserInteractionTaskSource {
         self.user_interaction_task_source.clone()
     }
 
-    pub fn networking_task_source(&self) -> Box<ScriptChan + Send> {
+    pub fn networking_task_source(&self) -> NetworkingTaskSource {
         self.networking_task_source.clone()
     }
 
     pub fn history_traversal_task_source(&self) -> Box<ScriptChan + Send> {
         self.history_traversal_task_source.clone()
     }
 
     pub fn file_reading_task_source(&self) -> FileReadingTaskSource {
--- a/servo/components/script/dom/workerglobalscope.rs
+++ b/servo/components/script/dom/workerglobalscope.rs
@@ -36,16 +36,17 @@ use script_traits::{TimerEvent, TimerEve
 use script_traits::WorkerGlobalScopeInit;
 use std::default::Default;
 use std::panic;
 use std::rc::Rc;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::mpsc::Receiver;
 use task_source::file_reading::FileReadingTaskSource;
+use task_source::networking::NetworkingTaskSource;
 use timers::{IsInterval, TimerCallback};
 use url::Url;
 
 pub fn prepare_workerscope_init(global: &GlobalScope,
                                 devtools_sender: Option<IpcSender<DevtoolScriptControlMsg>>) -> WorkerGlobalScopeInit {
     let init = WorkerGlobalScopeInit {
             resource_threads: global.resource_threads().clone(),
             mem_profiler_chan: global.mem_profiler_chan().clone(),
@@ -356,16 +357,20 @@ impl WorkerGlobalScope {
             panic!("need to implement a sender for SharedWorker")
         }
     }
 
     pub fn file_reading_task_source(&self) -> FileReadingTaskSource {
         FileReadingTaskSource(self.script_chan())
     }
 
+    pub fn networking_task_source(&self) -> NetworkingTaskSource {
+        NetworkingTaskSource(self.script_chan())
+    }
+
     pub fn new_script_pair(&self) -> (Box<ScriptChan + Send>, Box<ScriptPort + Send>) {
         let dedicated = self.downcast::<DedicatedWorkerGlobalScope>();
         if let Some(dedicated) = dedicated {
             return dedicated.new_script_pair();
         } else {
             panic!("need to implement a sender for SharedWorker/ServiceWorker")
         }
     }
--- a/servo/components/script/dom/xmlhttprequest.rs
+++ b/servo/components/script/dom/xmlhttprequest.rs
@@ -44,30 +44,30 @@ use hyper::method::Method;
 use hyper::mime::{self, Attr as MimeAttr, Mime, Value as MimeValue};
 use hyper_serde::Serde;
 use ipc_channel::ipc;
 use ipc_channel::router::ROUTER;
 use js::jsapi::{JSContext, JS_ParseJSON};
 use js::jsapi::JS_ClearPendingException;
 use js::jsval::{JSVal, NullValue, UndefinedValue};
 use msg::constellation_msg::PipelineId;
-use net_traits::{CoreResourceThread, FetchMetadata, FilteredMetadata};
+use net_traits::{FetchMetadata, FilteredMetadata};
 use net_traits::{FetchResponseListener, LoadOrigin, NetworkError, ReferrerPolicy};
 use net_traits::CoreResourceMsg::Fetch;
 use net_traits::request::{CredentialsMode, Destination, RequestInit, RequestMode};
 use net_traits::trim_http_whitespace;
 use network_listener::{NetworkListener, PreInvoke};
-use script_runtime::ScriptChan;
 use servo_atoms::Atom;
 use std::ascii::AsciiExt;
 use std::borrow::ToOwned;
 use std::cell::Cell;
 use std::default::Default;
 use std::str;
 use std::sync::{Arc, Mutex};
+use task_source::networking::NetworkingTaskSource;
 use time;
 use timers::{OneshotTimerCallback, OneshotTimerHandle};
 use url::{Position, Url};
 use util::prefs::PREFS;
 
 #[derive(JSTraceable, PartialEq, Copy, Clone, HeapSizeOf)]
 enum XMLHttpRequestState {
     Unsent = 0,
@@ -209,18 +209,18 @@ impl XMLHttpRequest {
         Ok(XMLHttpRequest::new(global))
     }
 
     fn sync_in_window(&self) -> bool {
         self.sync.get() && self.global().is::<Window>()
     }
 
     fn initiate_async_xhr(context: Arc<Mutex<XHRContext>>,
-                          script_chan: Box<ScriptChan + Send>,
-                          core_resource_thread: CoreResourceThread,
+                          task_source: NetworkingTaskSource,
+                          global: &GlobalScope,
                           init: RequestInit) {
         impl FetchResponseListener for XHRContext {
             fn process_request_body(&mut self) {
                 // todo
             }
 
             fn process_request_eof(&mut self) {
                 // todo
@@ -257,23 +257,23 @@ impl XMLHttpRequest {
             fn should_invoke(&self) -> bool {
                 self.xhr.root().generation_id.get() == self.gen_id
             }
         }
 
         let (action_sender, action_receiver) = ipc::channel().unwrap();
         let listener = NetworkListener {
             context: context,
-            script_chan: script_chan,
-            wrapper: None,
+            task_source: task_source,
+            wrapper: Some(global.get_runnable_wrapper())
         };
         ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
             listener.notify_fetch(message.to().unwrap());
         });
-        core_resource_thread.send(Fetch(init, action_sender)).unwrap();
+        global.core_resource_thread().send(Fetch(init, action_sender)).unwrap();
     }
 }
 
 impl LoadOrigin for XMLHttpRequest {
     fn referrer_url(&self) -> Option<Url> {
         return self.referrer_url.clone();
     }
 
@@ -1288,26 +1288,25 @@ impl XMLHttpRequest {
 
         let context = Arc::new(Mutex::new(XHRContext {
             xhr: xhr,
             gen_id: self.generation_id.get(),
             buf: DOMRefCell::new(vec!()),
             sync_status: DOMRefCell::new(None),
         }));
 
-        let (script_chan, script_port) = if self.sync.get() {
+        let (task_source, script_port) = if self.sync.get() {
             let (tx, rx) = global.new_script_pair();
-            (tx, Some(rx))
+            (NetworkingTaskSource(tx), Some(rx))
         } else {
             (global.networking_task_source(), None)
         };
 
-        let core_resource_thread = global.core_resource_thread();
-        XMLHttpRequest::initiate_async_xhr(context.clone(), script_chan,
-                                           core_resource_thread, init);
+        XMLHttpRequest::initiate_async_xhr(context.clone(), task_source,
+                                           global, init);
 
         if let Some(script_port) = script_port {
             loop {
                 global.process_event(script_port.recv().unwrap());
                 let context = context.lock().unwrap();
                 let sync_status = context.sync_status.borrow();
                 if let Some(ref status) = *sync_status {
                     return status.clone();
--- a/servo/components/script/fetch.rs
+++ b/servo/components/script/fetch.rs
@@ -1,16 +1,16 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
+use dom::bindings::codegen::Bindings::RequestBinding::RequestInfo;
 use dom::bindings::codegen::Bindings::RequestBinding::RequestInit;
 use dom::bindings::codegen::Bindings::ResponseBinding::ResponseBinding::ResponseMethods;
 use dom::bindings::codegen::Bindings::ResponseBinding::ResponseType as DOMResponseType;
-use dom::bindings::codegen::UnionTypes::RequestOrUSVString;
 use dom::bindings::error::Error;
 use dom::bindings::js::Root;
 use dom::bindings::refcounted::{Trusted, TrustedPromise};
 use dom::bindings::reflector::Reflectable;
 use dom::globalscope::GlobalScope;
 use dom::headers::Guard;
 use dom::promise::Promise;
 use dom::request::Request;
@@ -57,22 +57,23 @@ fn request_init_from_request(request: Ne
         // TODO: NetTraitsRequestInit and NetTraitsRequest have different "origin"
         // ... NetTraitsRequestInit.origin: Url
         // ... NetTraitsRequest.origin: RefCell<Origin>
         origin: request.url(),
         referrer_url: from_referrer_to_referrer_url(&request),
         referrer_policy: request.referrer_policy.get(),
         pipeline_id: request.pipeline_id.get(),
         redirect_mode: request.redirect_mode.get(),
+        ..NetTraitsRequestInit::default()
     }
 }
 
 // https://fetch.spec.whatwg.org/#fetch-method
 #[allow(unrooted_must_root)]
-pub fn Fetch(global: &GlobalScope, input: RequestOrUSVString, init: &RequestInit) -> Rc<Promise> {
+pub fn Fetch(global: &GlobalScope, input: RequestInfo, init: &RequestInit) -> Rc<Promise> {
     let core_resource_thread = global.core_resource_thread();
 
     // Step 1
     let promise = Promise::new(global);
     let response = Response::new(global);
 
     // Step 2
     let request = match Request::Constructor(global, input, init) {
@@ -91,18 +92,18 @@ pub fn Fetch(global: &GlobalScope, input
     let (action_sender, action_receiver) = ipc::channel().unwrap();
     let fetch_context = Arc::new(Mutex::new(FetchContext {
         fetch_promise: Some(TrustedPromise::new(promise.clone())),
         response_object: Trusted::new(&*response),
         body: vec![],
     }));
     let listener = NetworkListener {
         context: fetch_context,
-        script_chan: global.networking_task_source(),
-        wrapper: None,
+        task_source: global.networking_task_source(),
+        wrapper: Some(global.get_runnable_wrapper())
     };
 
     ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
         listener.notify_fetch(message.to().unwrap());
     });
     core_resource_thread.send(NetTraitsFetch(request_init, action_sender)).unwrap();
 
     promise
--- a/servo/components/script/lib.rs
+++ b/servo/components/script/lib.rs
@@ -43,16 +43,17 @@ extern crate devtools_traits;
 extern crate encoding;
 extern crate euclid;
 extern crate fnv;
 extern crate gfx_traits;
 extern crate heapsize;
 #[macro_use] extern crate heapsize_derive;
 extern crate html5ever;
 #[macro_use] extern crate html5ever_atoms;
+#[macro_use]
 extern crate hyper;
 extern crate hyper_serde;
 extern crate image;
 extern crate ipc_channel;
 #[macro_use]
 extern crate js;
 #[macro_use]
 extern crate jstraceable_derive;
--- a/servo/components/script/network_listener.rs
+++ b/servo/components/script/network_listener.rs
@@ -1,37 +1,37 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 use bluetooth_traits::{BluetoothResponseListener, BluetoothResponseResult};
 use net_traits::{Action, FetchResponseListener, FetchResponseMsg};
-use script_runtime::{CommonScriptMsg, ScriptChan};
-use script_runtime::ScriptThreadEventCategory::NetworkEvent;
 use script_thread::{Runnable, RunnableWrapper};
 use std::sync::{Arc, Mutex};
+use task_source::TaskSource;
+use task_source::networking::NetworkingTaskSource;
 
 /// An off-thread sink for async network event runnables. All such events are forwarded to
 /// a target thread, where they are invoked on the provided context object.
 pub struct NetworkListener<Listener: PreInvoke + Send + 'static> {
     pub context: Arc<Mutex<Listener>>,
-    pub script_chan: Box<ScriptChan + Send>,
+    pub task_source: NetworkingTaskSource,
     pub wrapper: Option<RunnableWrapper>,
 }
 
 impl<Listener: PreInvoke + Send + 'static> NetworkListener<Listener> {
     pub fn notify<A: Action<Listener> + Send + 'static>(&self, action: A) {
         let runnable = box ListenerRunnable {
             context: self.context.clone(),
             action: action,
         };
         let result = if let Some(ref wrapper) = self.wrapper {
-            self.script_chan.send(CommonScriptMsg::RunnableMsg(NetworkEvent, wrapper.wrap_runnable(runnable)))
+            self.task_source.queue_with_wrapper(runnable, wrapper)
         } else {
-            self.script_chan.send(CommonScriptMsg::RunnableMsg(NetworkEvent, runnable))
+            self.task_source.queue_wrapperless(runnable)
         };
         if let Err(err) = result {
             warn!("failed to deliver network data: {:?}", err);
         }
     }
 }
 
 // helps type inference
--- a/servo/components/script/script_thread.rs
+++ b/servo/components/script/script_thread.rs
@@ -654,17 +654,17 @@ impl ScriptThread {
             resource_threads: state.resource_threads,
             bluetooth_thread: state.bluetooth_thread,
 
             port: port,
 
             chan: MainThreadScriptChan(chan.clone()),
             dom_manipulation_task_source: DOMManipulationTaskSource(chan.clone()),
             user_interaction_task_source: UserInteractionTaskSource(chan.clone()),
-            networking_task_source: NetworkingTaskSource(chan.clone()),
+            networking_task_source: NetworkingTaskSource(boxed_script_sender.clone()),
             history_traversal_task_source: HistoryTraversalTaskSource(chan),
             file_reading_task_source: FileReadingTaskSource(boxed_script_sender),
 
             control_chan: state.control_chan,
             control_port: control_port,
             constellation_chan: state.constellation_chan,
             time_profiler_chan: state.time_profiler_chan,
             mem_profiler_chan: state.mem_profiler_chan,
@@ -1618,29 +1618,28 @@ impl ScriptThread {
             // denies access to most properties (per
             // https://github.com/servo/servo/issues/3939#issuecomment-62287025).
             self.documents.borrow().find_iframe(parent_id, incomplete.frame_id)
         );
 
         let MainThreadScriptChan(ref sender) = self.chan;
         let DOMManipulationTaskSource(ref dom_sender) = self.dom_manipulation_task_source;
         let UserInteractionTaskSource(ref user_sender) = self.user_interaction_task_source;
-        let NetworkingTaskSource(ref network_sender) = self.networking_task_source;
         let HistoryTraversalTaskSource(ref history_sender) = self.history_traversal_task_source;
 
         let (ipc_timer_event_chan, ipc_timer_event_port) = ipc::channel().unwrap();
         ROUTER.route_ipc_receiver_to_mpsc_sender(ipc_timer_event_port,
                                                  self.timer_event_chan.clone());
 
         // Create the window and document objects.
         let window = Window::new(self.js_runtime.clone(),
                                  MainThreadScriptChan(sender.clone()),
                                  DOMManipulationTaskSource(dom_sender.clone()),
                                  UserInteractionTaskSource(user_sender.clone()),
-                                 NetworkingTaskSource(network_sender.clone()),
+                                 self.networking_task_source.clone(),
                                  HistoryTraversalTaskSource(history_sender.clone()),
                                  self.file_reading_task_source.clone(),
                                  self.image_cache_channel.clone(),
                                  self.image_cache_thread.clone(),
                                  self.resource_threads.clone(),
                                  self.bluetooth_thread.clone(),
                                  self.mem_profiler_chan.clone(),
                                  self.time_profiler_chan.clone(),
@@ -2045,17 +2044,17 @@ impl ScriptThread {
     /// argument until a notification is received that the fetch is complete.
     fn start_page_load(&self, incomplete: InProgressLoad, mut load_data: LoadData) {
         let id = incomplete.pipeline_id.clone();
 
         let context = Arc::new(Mutex::new(ParserContext::new(id, load_data.url.clone())));
         let (action_sender, action_receiver) = ipc::channel().unwrap();
         let listener = NetworkListener {
             context: context,
-            script_chan: self.chan.clone(),
+            task_source: self.networking_task_source.clone(),
             wrapper: None,
         };
         ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
             listener.notify_fetch(message.to().unwrap());
         });
 
         if load_data.url.scheme() == "javascript" {
             load_data.url = Url::parse("about:blank").unwrap();
--- a/servo/components/script/task_source/networking.rs
+++ b/servo/components/script/task_source/networking.rs
@@ -1,20 +1,33 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
-use script_runtime::{CommonScriptMsg, ScriptChan};
-use script_thread::MainThreadScriptMsg;
-use std::sync::mpsc::Sender;
+use script_runtime::{CommonScriptMsg, ScriptChan, ScriptThreadEventCategory};
+use script_thread::{Runnable, RunnableWrapper};
+use task_source::TaskSource;
 
 #[derive(JSTraceable)]
-pub struct NetworkingTaskSource(pub Sender<MainThreadScriptMsg>);
+pub struct NetworkingTaskSource(pub Box<ScriptChan + Send + 'static>);
 
-impl ScriptChan for NetworkingTaskSource {
-    fn send(&self, msg: CommonScriptMsg) -> Result<(), ()> {
-        self.0.send(MainThreadScriptMsg::Common(msg)).map_err(|_| ())
-    }
-
-    fn clone(&self) -> Box<ScriptChan + Send> {
-        box NetworkingTaskSource((&self.0).clone())
+impl Clone for NetworkingTaskSource {
+    fn clone(&self) -> NetworkingTaskSource {
+        NetworkingTaskSource(self.0.clone())
     }
 }
+
+impl TaskSource for NetworkingTaskSource {
+    fn queue_with_wrapper<T>(&self,
+                             msg: Box<T>,
+                             wrapper: &RunnableWrapper)
+                             -> Result<(), ()>
+                             where T: Runnable + Send + 'static {
+        self.0.send(CommonScriptMsg::RunnableMsg(ScriptThreadEventCategory::NetworkEvent,
+                                                 wrapper.wrap_runnable(msg)))
+    }
+}
+
+impl NetworkingTaskSource {
+    pub fn queue_wrapperless<T: Runnable + Send + 'static>(&self, msg: Box<T>) -> Result<(), ()> {
+        self.0.send(CommonScriptMsg::RunnableMsg(ScriptThreadEventCategory::NetworkEvent, msg))
+    }
+}
--- a/servo/components/script/timers.rs
+++ b/servo/components/script/timers.rs
@@ -2,16 +2,17 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 use dom::bindings::callback::ExceptionHandling::Report;
 use dom::bindings::cell::DOMRefCell;
 use dom::bindings::codegen::Bindings::FunctionBinding::Function;
 use dom::bindings::reflector::Reflectable;
 use dom::bindings::str::DOMString;
+use dom::eventsource::EventSourceTimeoutCallback;
 use dom::globalscope::GlobalScope;
 use dom::testbinding::TestBindingCallback;
 use dom::xmlhttprequest::XHRTimeoutCallback;
 use euclid::length::Length;
 use heapsize::HeapSizeOf;
 use ipc_channel::ipc::IpcSender;
 use js::jsapi::{HandleValue, Heap};
 use js::jsval::{JSVal, UndefinedValue};
@@ -62,24 +63,26 @@ struct OneshotTimer {
 }
 
 // This enum is required to work around the fact that trait objects do not support generic methods.
 // A replacement trait would have a method such as
 //     `invoke<T: Reflectable>(self: Box<Self>, this: &T, js_timers: &JsTimers);`.
 #[derive(JSTraceable, HeapSizeOf)]
 pub enum OneshotTimerCallback {
     XhrTimeout(XHRTimeoutCallback),
+    EventSourceTimeout(EventSourceTimeoutCallback),
     JsTimer(JsTimerTask),
     TestBindingCallback(TestBindingCallback),
 }
 
 impl OneshotTimerCallback {
     fn invoke<T: Reflectable>(self, this: &T, js_timers: &JsTimers) {
         match self {
             OneshotTimerCallback::XhrTimeout(callback) => callback.invoke(),
+            OneshotTimerCallback::EventSourceTimeout(callback) => callback.invoke(),
             OneshotTimerCallback::JsTimer(task) => task.invoke(this, js_timers),
             OneshotTimerCallback::TestBindingCallback(callback) => callback.invoke(),
         }
     }
 }
 
 impl Ord for OneshotTimer {
     fn cmp(&self, other: &OneshotTimer) -> Ordering {