servo: Merge #11497 - send a reply when thread is done exiting (from ab22:11467-resource-threads-race-with-shutdown-to-write-out-data); r=Ms2ger
authorAbelardo E. Mendoza <abelardo22.9@gmail.com>
Wed, 01 Jun 2016 06:46:58 -0500
changeset 338974 97bab307c0ad6fab34b6c4b91efed7ec439c256a
parent 338973 ce3b753ff7eb73e78ad6077d77beda3b4d9e48e1
child 338975 dd747f278fc40be1e92f81225d036982fd0a5da1
push id31307
push usergszorc@mozilla.com
push dateSat, 04 Feb 2017 00:59:06 +0000
treeherdermozilla-central@94079d43835f [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersMs2ger
servo: Merge #11497 - send a reply when thread is done exiting (from ab22:11467-resource-threads-race-with-shutdown-to-write-out-data); r=Ms2ger <!-- Please describe your changes on the following line: --> --- <!-- Thank you for contributing to Servo! Please replace each `[ ]` by `[X]` when the step is complete, and replace `__` with appropriate data: --> - [x] `./mach build -d` does not report any errors - [x] `./mach test-tidy` does not report any errors - [x] These changes fix #11467 <!-- Either: --> - [x] There are tests for these changes OR - [ ] These changes do not require tests because: in this case if code compiles then it's good enough. <!-- Pull requests that do not address these steps are welcome, but they will require additional verification as part of the review process. --> Source-Repo: https://github.com/servo/servo Source-Revision: 351b851e21ab34f30280b506e48c2d61aa115f1a
servo/components/constellation/constellation.rs
servo/components/net/resource_thread.rs
servo/components/net/storage_thread.rs
servo/components/net_traits/lib.rs
servo/components/net_traits/storage_thread.rs
servo/tests/unit/net/resource_thread.rs
--- a/servo/components/constellation/constellation.rs
+++ b/servo/components/constellation/constellation.rs
@@ -812,41 +812,54 @@ impl<Message, LTF, STF> Constellation<Me
             (pipeline_id, panic_reason, backtrace) => {
                 debug!("handling panic message ({:?})", pipeline_id);
                 self.handle_panic(pipeline_id, panic_reason, backtrace);
             }
         }
     }
 
     fn handle_exit(&mut self) {
+        // Channels to recieve signals when threads are done exiting.
+        let (core_sender, core_receiver) = ipc::channel().expect("Failed to create IPC channel!");
+        let (storage_sender, storage_receiver) = ipc::channel().expect("Failed to create IPC channel!");
+
         for (_id, ref pipeline) in &self.pipelines {
             pipeline.exit();
         }
         self.image_cache_thread.exit();
-        if let Err(e) = self.resource_threads.send(net_traits::CoreResourceMsg::Exit) {
+        if let Err(e) = self.resource_threads.send(net_traits::CoreResourceMsg::Exit(core_sender)) {
             warn!("Exit resource thread failed ({})", e);
         }
         if let Some(ref chan) = self.devtools_chan {
             let msg = DevtoolsControlMsg::FromChrome(ChromeToDevtoolsControlMsg::ServerExitMsg);
             if let Err(e) = chan.send(msg) {
                 warn!("Exit devtools failed ({})", e);
             }
         }
-        if let Err(e) = self.resource_threads.send(StorageThreadMsg::Exit) {
+        if let Err(e) = self.resource_threads.send(StorageThreadMsg::Exit(storage_sender)) {
             warn!("Exit storage thread failed ({})", e);
         }
 
         if let Err(e) = self.resource_threads.send(FileManagerThreadMsg::Exit) {
             warn!("Exit storage thread failed ({})", e);
         }
 
         if let Err(e) = self.bluetooth_thread.send(BluetoothMethodMsg::Exit) {
             warn!("Exit bluetooth thread failed ({})", e);
         }
         self.font_cache_thread.exit();
+
+        // Receive exit signals from threads.
+        if let Err(e) = core_receiver.recv() {
+            warn!("Exit resource thread failed ({})", e);
+        }
+        if let Err(e) = storage_receiver.recv() {
+            warn!("Exit storage thread failed ({})", e);
+        }
+
         self.compositor_proxy.send(ToCompositorMsg::ShutdownComplete);
     }
 
     fn handle_send_error(&mut self, pipeline_id: PipelineId, err: IOError) {
         // Treat send error the same as receiving a panic message
         debug!("Pipeline {:?} send error ({}).", pipeline_id, err);
         self.handle_panic(Some(pipeline_id), format!("Send failed ({})", err), String::from("<none>"));
     }
--- a/servo/components/net/resource_thread.rs
+++ b/servo/components/net/resource_thread.rs
@@ -206,31 +206,32 @@ impl ResourceChannelManager {
                     if let Some(cancel_sender) = self.resource_manager.cancel_load_map.get(&res_id) {
                         let _ = cancel_sender.send(());
                     }
                     self.resource_manager.cancel_load_map.remove(&res_id);
                 }
                 CoreResourceMsg::Synchronize(sender) => {
                     let _ = sender.send(());
                 }
-                CoreResourceMsg::Exit => {
+                CoreResourceMsg::Exit(sender) => {
                     if let Some(ref config_dir) = opts::get().config_dir {
                         match self.resource_manager.auth_cache.read() {
                             Ok(auth_cache) => write_json_to_file(&*auth_cache, config_dir, "auth_cache.json"),
                             Err(_) => warn!("Error writing auth cache to disk"),
                         }
                         match self.resource_manager.cookie_jar.read() {
                             Ok(jar) => write_json_to_file(&*jar, config_dir, "cookie_jar.json"),
                             Err(_) => warn!("Error writing cookie jar to disk"),
                         }
                         match self.resource_manager.hsts_list.read() {
                             Ok(hsts) => write_json_to_file(&*hsts, config_dir, "hsts_list.json"),
                             Err(_) => warn!("Error writing hsts list to disk"),
                         }
                     }
+                    let _ = sender.send(());
                     break;
                 }
 
             }
         }
     }
 }
 
--- a/servo/components/net/storage_thread.rs
+++ b/servo/components/net/storage_thread.rs
@@ -69,20 +69,21 @@ impl StorageManager {
                     self.request_item(sender, url, storage_type, name)
                 }
                 StorageThreadMsg::RemoveItem(sender, url, storage_type, name) => {
                     self.remove_item(sender, url, storage_type, name)
                 }
                 StorageThreadMsg::Clear(sender, url, storage_type) => {
                     self.clear(sender, url, storage_type)
                 }
-                StorageThreadMsg::Exit => {
+                StorageThreadMsg::Exit(sender) => {
                     if let Some(ref config_dir) = opts::get().config_dir {
                         resource_thread::write_json_to_file(&self.local_data, config_dir, "local_data.json");
                     }
+                    let _ = sender.send(());
                     break
                 }
             }
         }
     }
 
     fn select_data(&self, storage_type: StorageType)
                    -> &HashMap<String, (usize, BTreeMap<String, String>)> {
--- a/servo/components/net_traits/lib.rs
+++ b/servo/components/net_traits/lib.rs
@@ -335,18 +335,19 @@ pub enum CoreResourceMsg {
     /// Store a set of cookies for a given originating URL
     SetCookiesForUrl(Url, String, CookieSource),
     /// Retrieve the stored cookies for a given URL
     GetCookiesForUrl(Url, IpcSender<Option<String>>, CookieSource),
     /// Cancel a network request corresponding to a given `ResourceId`
     Cancel(ResourceId),
     /// Synchronization message solely for knowing the state of the ResourceChannelManager loop
     Synchronize(IpcSender<()>),
-    /// Break the load handler loop and exit
-    Exit,
+    /// Break the load handler loop, send a reply when done cleaning up local resources
+    //  and exit
+    Exit(IpcSender<()>),
 }
 
 /// Initialized but unsent request. Encapsulates everything necessary to instruct
 /// the resource thread to make a new request. The `load` method *must* be called before
 /// destruction or the thread will panic.
 pub struct PendingAsyncLoad {
     core_resource_thread: CoreResourceThread,
     url: Url,
--- a/servo/components/net_traits/storage_thread.rs
+++ b/servo/components/net_traits/storage_thread.rs
@@ -30,11 +30,11 @@ pub enum StorageThreadMsg {
     SetItem(IpcSender<Result<(bool, Option<String>), ()>>, Url, StorageType, String, String),
 
     /// removes the key/value pair for the given key in the associated storage data
     RemoveItem(IpcSender<Option<String>>, Url, StorageType, String),
 
     /// clears the associated storage data by removing all the key/value pairs
     Clear(IpcSender<bool>, Url, StorageType),
 
-    /// shut down this thread
-    Exit
+    /// send a reply when done cleaning up thread resources and then shut it down
+    Exit(IpcSender<()>)
 }
--- a/servo/tests/unit/net/resource_thread.rs
+++ b/servo/tests/unit/net/resource_thread.rs
@@ -34,35 +34,39 @@ impl LoadOrigin for ResourceTest {
     fn pipeline_id(&self) -> Option<PipelineId> {
         None
     }
 }
 
 #[test]
 fn test_exit() {
     let (tx, _rx) = ipc::channel().unwrap();
+    let (sender, receiver) = ipc::channel().unwrap();
     let resource_thread = new_core_resource_thread("".to_owned(), None, ProfilerChan(tx));
-    resource_thread.send(CoreResourceMsg::Exit).unwrap();
+    resource_thread.send(CoreResourceMsg::Exit(sender)).unwrap();
+    receiver.recv().unwrap();
 }
 
 #[test]
 fn test_bad_scheme() {
     let (tx, _rx) = ipc::channel().unwrap();
+    let (sender, receiver) = ipc::channel().unwrap();
     let resource_thread = new_core_resource_thread("".to_owned(), None, ProfilerChan(tx));
     let (start_chan, start) = ipc::channel().unwrap();
     let url = Url::parse("bogus://whatever").unwrap();
     resource_thread.send(CoreResourceMsg::Load(LoadData::new(LoadContext::Browsing, url, &ResourceTest),
 
     LoadConsumer::Channel(start_chan), None)).unwrap();
     let response = start.recv().unwrap();
     match response.progress_port.recv().unwrap() {
       ProgressMsg::Done(result) => { assert!(result.is_err()) }
       _ => panic!("bleh")
     }
-    resource_thread.send(CoreResourceMsg::Exit).unwrap();
+    resource_thread.send(CoreResourceMsg::Exit(sender)).unwrap();
+    receiver.recv().unwrap();
 }
 
 #[test]
 fn test_parse_hostsfile() {
     let mock_hosts_file_content = "127.0.0.1 foo.bar.com\n127.0.0.2 servo.test.server";
     let hosts_table = parse_hostsfile(mock_hosts_file_content);
     assert_eq!(2, hosts_table.len());
     assert_eq!(ip("127.0.0.1"), *hosts_table.get("foo.bar.com").unwrap());
@@ -218,16 +222,17 @@ fn test_cancelled_listener() {
             // wait for the main thread to send the body, so as to ensure that we're
             // doing everything sequentially
             let body_vec: Vec<&str> = body_receiver.recv().unwrap();
             let _ = stream.write(body_vec.join("\r\n").as_bytes());
         }
     });
 
     let (tx, _rx) = ipc::channel().unwrap();
+    let (exit_sender, exit_receiver) = ipc::channel().unwrap();
     let resource_thread = new_core_resource_thread("".to_owned(), None, ProfilerChan(tx));
     let (sender, receiver) = ipc::channel().unwrap();
     let (id_sender, id_receiver) = ipc::channel().unwrap();
     let (sync_sender, sync_receiver) = ipc::channel().unwrap();
     let url = Url::parse(&format!("http://127.0.0.1:{}", port)).unwrap();
 
     resource_thread.send(CoreResourceMsg::Load(LoadData::new(LoadContext::Browsing, url, &ResourceTest),
                                         LoadConsumer::Channel(sender),
@@ -239,10 +244,11 @@ fn test_cancelled_listener() {
     resource_thread.send(CoreResourceMsg::Synchronize(sync_sender)).unwrap();
     let _ = sync_receiver.recv();
     // now, let's send the body, because the connection is still active and data would be loaded
     // (but, the loading has been cancelled)
     let _ = body_sender.send(body);
     let response = receiver.recv().unwrap();
     assert_eq!(response.progress_port.recv().unwrap(),
                ProgressMsg::Done(Err(NetworkError::LoadCancelled)));
-    resource_thread.send(CoreResourceMsg::Exit).unwrap();
+    resource_thread.send(CoreResourceMsg::Exit(exit_sender)).unwrap();
+    exit_receiver.recv().unwrap();
 }