Bug 1638918 - Convert kvstore to use a background task queue instead of a one-off thread. r=KrisWright
authorLina Cambridge <lina@yakshaving.ninja>
Wed, 20 May 2020 20:55:10 +0000
changeset 531334 f9b54c69e77011a018985e4863108ed8f59e9e51
parent 531333 767dac9c6d2b6fdcc8ca4a8cdbf31a9b31d03d53
child 531335 84c51f3b6e8601e96def8f3cdb229ffe55013110
push id37438
push userabutkovits@mozilla.com
push dateThu, 21 May 2020 09:36:57 +0000
treeherdermozilla-central@2d00a1a6495c [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersKrisWright
bugs1638918
milestone78.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1638918 - Convert kvstore to use a background task queue instead of a one-off thread. r=KrisWright Differential Revision: https://phabricator.services.mozilla.com/D75863
Cargo.lock
toolkit/components/kvstore/Cargo.toml
toolkit/components/kvstore/src/lib.rs
toolkit/components/kvstore/src/task.rs
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2486,16 +2486,17 @@ source = "registry+https://github.com/ru
 checksum = "e2db585e1d738fc771bf08a151420d3ed193d9d895a36df7f6f8a9456b911ddc"
 
 [[package]]
 name = "kvstore"
 version = "0.1.0"
 dependencies = [
  "atomic_refcell",
  "crossbeam-utils 0.6.5",
+ "cstr",
  "failure",
  "lazy_static",
  "libc",
  "lmdb-rkv",
  "log",
  "moz_task",
  "nserror",
  "nsstring",
--- a/toolkit/components/kvstore/Cargo.toml
+++ b/toolkit/components/kvstore/Cargo.toml
@@ -1,16 +1,17 @@
 [package]
 name = "kvstore"
 version = "0.1.0"
 authors = ["Myk Melez <myk@mykzilla.org>"]
 
 [dependencies]
 atomic_refcell = "0.1"
 crossbeam-utils = "0.6.3"
+cstr = "0.1"
 lazy_static = "1"
 libc = "0.2"
 lmdb-rkv = "0.14"
 log = "0.4"
 moz_task = { path = "../../../xpcom/rust/moz_task" }
 nserror = { path = "../../../xpcom/rust/nserror" }
 nsstring = { path = "../../../xpcom/rust/nsstring" }
 rkv = "0.10.2"
--- a/toolkit/components/kvstore/src/lib.rs
+++ b/toolkit/components/kvstore/src/lib.rs
@@ -1,15 +1,17 @@
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 extern crate atomic_refcell;
 extern crate crossbeam_utils;
 #[macro_use]
+extern crate cstr;
+#[macro_use]
 extern crate failure;
 #[macro_use]
 extern crate lazy_static;
 extern crate libc;
 extern crate lmdb;
 extern crate log;
 extern crate moz_task;
 extern crate nserror;
@@ -23,17 +25,20 @@ extern crate xpcom;
 mod error;
 mod manager;
 mod owned_value;
 mod task;
 
 use atomic_refcell::AtomicRefCell;
 use error::KeyValueError;
 use libc::c_void;
-use moz_task::{create_thread, TaskRunnable};
+use moz_task::{
+    create_background_task_queue, dispatch_background_task_with_options, DispatchOptions,
+    TaskRunnable,
+};
 use nserror::{nsresult, NS_ERROR_FAILURE, NS_ERROR_NO_AGGREGATION, NS_OK};
 use nsstring::{nsACString, nsCString};
 use owned_value::{owned_to_variant, variant_to_owned};
 use rkv::{OwnedValue, Rkv, SingleStore};
 use std::{
     ptr,
     sync::{Arc, RwLock},
     vec::IntoIter,
@@ -41,41 +46,37 @@ use std::{
 use task::{
     ClearTask, DeleteTask, EnumerateTask, GetOrCreateTask, GetTask, HasTask, PutTask, WriteManyTask,
 };
 use thin_vec::ThinVec;
 use xpcom::{
     getter_addrefs,
     interfaces::{
         nsIKeyValueDatabaseCallback, nsIKeyValueEnumeratorCallback, nsIKeyValuePair,
-        nsIKeyValueVariantCallback, nsIKeyValueVoidCallback, nsISupports, nsIThread, nsIVariant,
+        nsIKeyValueVariantCallback, nsIKeyValueVoidCallback, nsISerialEventTarget, nsISupports,
+        nsIVariant,
     },
-    nsIID, xpcom, xpcom_method, RefPtr, ThreadBoundRefPtr,
+    nsIID, xpcom, xpcom_method, RefPtr,
 };
 
 type KeyValuePairResult = Result<(String, OwnedValue), KeyValueError>;
 
 #[no_mangle]
 pub unsafe extern "C" fn nsKeyValueServiceConstructor(
     outer: *const nsISupports,
     iid: &nsIID,
     result: *mut *mut c_void,
 ) -> nsresult {
     *result = ptr::null_mut();
 
     if !outer.is_null() {
         return NS_ERROR_NO_AGGREGATION;
     }
 
-    let thread: RefPtr<nsIThread> = match create_thread("KeyValDB") {
-        Ok(thread) => thread,
-        Err(error) => return error,
-    };
-
-    let service: RefPtr<KeyValueService> = KeyValueService::new(thread);
+    let service = KeyValueService::new();
     service.QueryInterface(iid, result)
 }
 
 // For each public XPCOM method in the nsIKeyValue* interfaces, we implement
 // a pair of Rust methods:
 //
 //   1. a method named after the XPCOM (as modified by the XPIDL parser, i.e.
 //      by capitalization of its initial letter) that returns an nsresult;
@@ -95,70 +96,70 @@ pub unsafe extern "C" fn nsKeyValueServi
 // compatible nsresult values to XPCOM callers.
 //
 // The XPCOM methods are implemented using the xpcom_method! declarative macro
 // from the xpcom crate.
 
 #[derive(xpcom)]
 #[xpimplements(nsIKeyValueService)]
 #[refcnt = "atomic"]
-pub struct InitKeyValueService {
-    thread: ThreadBoundRefPtr<nsIThread>,
-}
+pub struct InitKeyValueService {}
 
 impl KeyValueService {
-    fn new(thread: RefPtr<nsIThread>) -> RefPtr<KeyValueService> {
-        KeyValueService::allocate(InitKeyValueService {
-            thread: ThreadBoundRefPtr::new(thread),
-        })
+    fn new() -> RefPtr<KeyValueService> {
+        KeyValueService::allocate(InitKeyValueService {})
     }
 
     xpcom_method!(
         get_or_create => GetOrCreate(
             callback: *const nsIKeyValueDatabaseCallback,
             path: *const nsACString,
             name: *const nsACString
         )
     );
 
     fn get_or_create(
         &self,
         callback: &nsIKeyValueDatabaseCallback,
         path: &nsACString,
         name: &nsACString,
     ) -> Result<(), nsresult> {
-        let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
-
         let task = Box::new(GetOrCreateTask::new(
             RefPtr::new(callback),
-            RefPtr::new(thread),
             nsCString::from(path),
             nsCString::from(name),
         ));
 
-        TaskRunnable::dispatch(TaskRunnable::new("KVService::GetOrCreate", task)?, thread)
+        dispatch_background_task_with_options(
+            RefPtr::new(TaskRunnable::new("KVService::GetOrCreate", task)?.coerce()),
+            DispatchOptions::default().may_block(true),
+        )
     }
 }
 
 #[derive(xpcom)]
 #[xpimplements(nsIKeyValueDatabase)]
 #[refcnt = "atomic"]
 pub struct InitKeyValueDatabase {
     rkv: Arc<RwLock<Rkv>>,
     store: SingleStore,
-    thread: ThreadBoundRefPtr<nsIThread>,
+    queue: RefPtr<nsISerialEventTarget>,
 }
 
 impl KeyValueDatabase {
     fn new(
         rkv: Arc<RwLock<Rkv>>,
         store: SingleStore,
-        thread: ThreadBoundRefPtr<nsIThread>,
-    ) -> RefPtr<KeyValueDatabase> {
-        KeyValueDatabase::allocate(InitKeyValueDatabase { rkv, store, thread })
+    ) -> Result<RefPtr<KeyValueDatabase>, KeyValueError> {
+        let queue = create_background_task_queue(cstr!("KeyValueDatabase"))?;
+        Ok(KeyValueDatabase::allocate(InitKeyValueDatabase {
+            rkv,
+            store,
+            queue,
+        }))
     }
 
     xpcom_method!(
         put => Put(
             callback: *const nsIKeyValueVoidCallback,
             key: *const nsACString,
             value: *const nsIVariant
         )
@@ -175,19 +176,17 @@ impl KeyValueDatabase {
         let task = Box::new(PutTask::new(
             RefPtr::new(callback),
             Arc::clone(&self.rkv),
             self.store,
             nsCString::from(key),
             value,
         ));
 
-        let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
-
-        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Put", task)?, thread)
+        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Put", task)?, &self.queue)
     }
 
     xpcom_method!(
         write_many => WriteMany(
             callback: *const nsIKeyValueVoidCallback,
             pairs: *const ThinVec<RefPtr<nsIKeyValuePair>>
         )
     );
@@ -213,19 +212,20 @@ impl KeyValueDatabase {
 
         let task = Box::new(WriteManyTask::new(
             RefPtr::new(callback),
             Arc::clone(&self.rkv),
             self.store,
             entries,
         ));
 
-        let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
-
-        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::WriteMany", task)?, thread)
+        TaskRunnable::dispatch(
+            TaskRunnable::new("KVDatabase::WriteMany", task)?,
+            &self.queue,
+        )
     }
 
     xpcom_method!(
         get => Get(
             callback: *const nsIKeyValueVariantCallback,
             key: *const nsACString,
             default_value: *const nsIVariant
         )
@@ -240,69 +240,61 @@ impl KeyValueDatabase {
         let task = Box::new(GetTask::new(
             RefPtr::new(callback),
             Arc::clone(&self.rkv),
             self.store,
             nsCString::from(key),
             variant_to_owned(default_value)?,
         ));
 
-        let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
-
-        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Get", task)?, thread)
+        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Get", task)?, &self.queue)
     }
 
     xpcom_method!(
         has => Has(callback: *const nsIKeyValueVariantCallback, key: *const nsACString)
     );
 
     fn has(&self, callback: &nsIKeyValueVariantCallback, key: &nsACString) -> Result<(), nsresult> {
         let task = Box::new(HasTask::new(
             RefPtr::new(callback),
             Arc::clone(&self.rkv),
             self.store,
             nsCString::from(key),
         ));
 
-        let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
-
-        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Has", task)?, thread)
+        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Has", task)?, &self.queue)
     }
 
     xpcom_method!(
         delete => Delete(callback: *const nsIKeyValueVoidCallback, key: *const nsACString)
     );
 
     fn delete(&self, callback: &nsIKeyValueVoidCallback, key: &nsACString) -> Result<(), nsresult> {
         let task = Box::new(DeleteTask::new(
             RefPtr::new(callback),
             Arc::clone(&self.rkv),
             self.store,
             nsCString::from(key),
         ));
 
-        let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
-
-        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Delete", task)?, thread)
+        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Delete", task)?, &self.queue)
     }
 
     xpcom_method!(
         clear => Clear(callback: *const nsIKeyValueVoidCallback)
     );
 
     fn clear(&self, callback: &nsIKeyValueVoidCallback) -> Result<(), nsresult> {
         let task = Box::new(ClearTask::new(
             RefPtr::new(callback),
             Arc::clone(&self.rkv),
             self.store,
         ));
 
-        let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
-
-        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Clear", task)?, thread)
+        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Clear", task)?, &self.queue)
     }
 
     xpcom_method!(
         enumerate => Enumerate(
             callback: *const nsIKeyValueEnumeratorCallback,
             from_key: *const nsACString,
             to_key: *const nsACString
         )
@@ -317,19 +309,20 @@ impl KeyValueDatabase {
         let task = Box::new(EnumerateTask::new(
             RefPtr::new(callback),
             Arc::clone(&self.rkv),
             self.store,
             nsCString::from(from_key),
             nsCString::from(to_key),
         ));
 
-        let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
-
-        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Enumerate", task)?, thread)
+        TaskRunnable::dispatch(
+            TaskRunnable::new("KVDatabase::Enumerate", task)?,
+            &self.queue,
+        )
     }
 }
 
 #[derive(xpcom)]
 #[xpimplements(nsIKeyValueEnumerator)]
 #[refcnt = "atomic"]
 pub struct InitKeyValueEnumerator {
     iter: AtomicRefCell<IntoIter<KeyValuePairResult>>,
--- a/toolkit/components/kvstore/src/task.rs
+++ b/toolkit/components/kvstore/src/task.rs
@@ -16,17 +16,17 @@ use std::{
     path::Path,
     str,
     sync::{Arc, RwLock},
 };
 use storage_variant::VariantType;
 use xpcom::{
     interfaces::{
         nsIKeyValueDatabaseCallback, nsIKeyValueEnumeratorCallback, nsIKeyValueVariantCallback,
-        nsIKeyValueVoidCallback, nsIThread, nsIVariant,
+        nsIKeyValueVoidCallback, nsIVariant,
     },
     RefPtr, ThreadBoundRefPtr,
 };
 use KeyValueDatabase;
 use KeyValueEnumerator;
 use KeyValuePairResult;
 
 /// A macro to generate a done() implementation for a Task.
@@ -154,41 +154,37 @@ fn passive_resize(env: &Rkv, wanted: usi
     let info = env.info()?;
     let current_size = info.map_size();
     env.set_map_size(current_size + wanted)?;
     Ok(())
 }
 
 pub struct GetOrCreateTask {
     callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueDatabaseCallback>>>,
-    thread: AtomicCell<Option<ThreadBoundRefPtr<nsIThread>>>,
     path: nsCString,
     name: nsCString,
     result: AtomicCell<Option<Result<RkvStoreTuple, KeyValueError>>>,
 }
 
 impl GetOrCreateTask {
     pub fn new(
         callback: RefPtr<nsIKeyValueDatabaseCallback>,
-        thread: RefPtr<nsIThread>,
         path: nsCString,
         name: nsCString,
     ) -> GetOrCreateTask {
         GetOrCreateTask {
             callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
-            thread: AtomicCell::new(Some(ThreadBoundRefPtr::new(thread))),
             path,
             name,
             result: AtomicCell::default(),
         }
     }
 
     fn convert(&self, result: RkvStoreTuple) -> Result<RefPtr<KeyValueDatabase>, KeyValueError> {
-        let thread = self.thread.swap(None).ok_or(NS_ERROR_FAILURE)?;
-        Ok(KeyValueDatabase::new(result.0, result.1, thread))
+        Ok(KeyValueDatabase::new(result.0, result.1)?)
     }
 }
 
 impl Task for GetOrCreateTask {
     fn run(&self) {
         // We do the work within a closure that returns a Result so we can
         // use the ? operator to simplify the implementation.
         self.result