Bug 1639018 - Change `TaskRunnable::dispatch` to take owned runnables. r=froydnj
authorLina Cambridge <lina@yakshaving.ninja>
Wed, 20 May 2020 20:54:49 +0000
changeset 531333 767dac9c6d2b6fdcc8ca4a8cdbf31a9b31d03d53
parent 531332 87b6bc1c13222b570d79a4228f334d8e20d794fe
child 531334 f9b54c69e77011a018985e4863108ed8f59e9e51
push id37438
push userabutkovits@mozilla.com
push dateThu, 21 May 2020 09:36:57 +0000
treeherdermozilla-central@2d00a1a6495c [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersfroydnj
bugs1639018, 44874
milestone78.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1639018 - Change `TaskRunnable::dispatch` to take owned runnables. r=froydnj This matches how the `Dispatch(already_AddRefed<nsIRunnable>)` overloads work in C++: `Dispatch` takes ownership of the runnable, and leaks it if dispatch fails—because the thread manager is shutting down, for instance. This avoids a race where a runnable can be released on either the owning or target thread. Rust doesn't allow arbitrary `Self` types yet (see rust-lang/rust#44874), so we need to change `dispatch` and `dispatch_with_options` to be associated methods. Differential Revision: https://phabricator.services.mozilla.com/D75858
security/manager/ssl/cert_storage/src/lib.rs
services/fxaccounts/rust-bridge/firefox-accounts-bridge/src/punt/task.rs
services/sync/golden_gate/src/log.rs
services/sync/golden_gate/src/task.rs
toolkit/components/bitsdownload/src/bits_interface/mod.rs
toolkit/components/extensions/storage/webext_storage_bridge/src/area.rs
toolkit/components/kvstore/src/lib.rs
toolkit/components/places/bookmark_sync/src/driver.rs
toolkit/components/places/bookmark_sync/src/merger.rs
toolkit/components/xulstore/src/persist.rs
xpcom/rust/moz_task/src/lib.rs
--- a/security/manager/ssl/cert_storage/src/lib.rs
+++ b/security/manager/ssl/cert_storage/src/lib.rs
@@ -1159,17 +1159,17 @@ impl CertStorage {
         }
         let task = Box::new(SecurityStateTask::new(
             &*callback,
             &self.security_state,
             move |ss| ss.get_has_prior_data(data_type),
         ));
         let thread = try_ns!(self.thread.lock());
         let runnable = try_ns!(TaskRunnable::new("HasPriorData", task));
-        try_ns!(runnable.dispatch(&*thread));
+        try_ns!(TaskRunnable::dispatch(runnable, &*thread));
         NS_OK
     }
 
     unsafe fn SetRevocations(
         &self,
         revocations: *const ThinVec<RefPtr<nsIRevocationState>>,
         callback: *const nsICertStorageCallback,
     ) -> nserror::nsresult {
@@ -1221,17 +1221,17 @@ impl CertStorage {
 
         let task = Box::new(SecurityStateTask::new(
             &*callback,
             &self.security_state,
             move |ss| ss.set_batch_state(&entries, nsICertStorage::DATA_TYPE_REVOCATION as u8),
         ));
         let thread = try_ns!(self.thread.lock());
         let runnable = try_ns!(TaskRunnable::new("SetRevocations", task));
-        try_ns!(runnable.dispatch(&*thread));
+        try_ns!(TaskRunnable::dispatch(runnable, &*thread));
         NS_OK
     }
 
     unsafe fn GetRevocationState(
         &self,
         issuer: *const ThinVec<u8>,
         serial: *const ThinVec<u8>,
         subject: *const ThinVec<u8>,
@@ -1300,17 +1300,17 @@ impl CertStorage {
 
         let task = Box::new(SecurityStateTask::new(
             &*callback,
             &self.security_state,
             move |ss| ss.set_batch_state(&crlite_entries, nsICertStorage::DATA_TYPE_CRLITE as u8),
         ));
         let thread = try_ns!(self.thread.lock());
         let runnable = try_ns!(TaskRunnable::new("SetCRLiteState", task));
-        try_ns!(runnable.dispatch(&*thread));
+        try_ns!(TaskRunnable::dispatch(runnable, &*thread));
         NS_OK
     }
 
     unsafe fn GetCRLiteState(
         &self,
         subject: *const ThinVec<u8>,
         pub_key: *const ThinVec<u8>,
         state: *mut i16,
@@ -1346,17 +1346,17 @@ impl CertStorage {
         let filter_owned = (*filter).to_vec();
         let task = Box::new(SecurityStateTask::new(
             &*callback,
             &self.security_state,
             move |ss| ss.set_full_crlite_filter(filter_owned, timestamp),
         ));
         let thread = try_ns!(self.thread.lock());
         let runnable = try_ns!(TaskRunnable::new("SetFullCRLiteFilter", task));
-        try_ns!(runnable.dispatch(&*thread));
+        try_ns!(TaskRunnable::dispatch(runnable, &*thread));
         NS_OK
     }
 
     unsafe fn GetCRLiteRevocationState(
         &self,
         issuer: *const ThinVec<u8>,
         issuerSPKI: *const ThinVec<u8>,
         serialNumber: *const ThinVec<u8>,
@@ -1412,17 +1412,17 @@ impl CertStorage {
         }
         let task = Box::new(SecurityStateTask::new(
             &*callback,
             &self.security_state,
             move |ss| ss.add_certs(&cert_entries),
         ));
         let thread = try_ns!(self.thread.lock());
         let runnable = try_ns!(TaskRunnable::new("AddCerts", task));
-        try_ns!(runnable.dispatch(&*thread));
+        try_ns!(TaskRunnable::dispatch(runnable, &*thread));
         NS_OK
     }
 
     unsafe fn RemoveCertsByHashes(
         &self,
         hashes: *const ThinVec<nsCString>,
         callback: *const nsICertStorageCallback,
     ) -> nserror::nsresult {
@@ -1440,17 +1440,17 @@ impl CertStorage {
         }
         let task = Box::new(SecurityStateTask::new(
             &*callback,
             &self.security_state,
             move |ss| ss.remove_certs_by_hashes(&hash_entries),
         ));
         let thread = try_ns!(self.thread.lock());
         let runnable = try_ns!(TaskRunnable::new("RemoveCertsByHashes", task));
-        try_ns!(runnable.dispatch(&*thread));
+        try_ns!(TaskRunnable::dispatch(runnable, &*thread));
         NS_OK
     }
 
     unsafe fn FindCertsBySubject(
         &self,
         subject: *const ThinVec<u8>,
         certs: *mut ThinVec<ThinVec<u8>>,
     ) -> nserror::nsresult {
--- a/services/fxaccounts/rust-bridge/firefox-accounts-bridge/src/punt/task.rs
+++ b/services/fxaccounts/rust-bridge/firefox-accounts-bridge/src/punt/task.rs
@@ -367,17 +367,21 @@ impl PuntTask {
         )
     }
 
     /// Dispatches the task to the given thread `target`.
     pub fn dispatch(self, target: &nsIEventTarget) -> Result<(), Error> {
         let runnable = TaskRunnable::new(self.name, Box::new(self))?;
         // `may_block` schedules the task on the I/O thread pool, since we
         // expect most operations to wait on I/O.
-        runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?;
+        TaskRunnable::dispatch_with_options(
+            runnable,
+            target,
+            DispatchOptions::default().may_block(true),
+        )?;
         Ok(())
     }
 
     fn run_with_punt(&self, punt: Punt) -> Result<PuntResult, Error> {
         let fxa = self.fxa.upgrade().ok_or_else(|| Error::AlreadyTornDown)?;
         let mut fxa = fxa.lock()?;
         Ok(match punt {
             Punt::ToJson => fxa.to_json().map(PuntResult::String),
--- a/services/sync/golden_gate/src/log.rs
+++ b/services/sync/golden_gate/src/log.rs
@@ -113,17 +113,17 @@ impl Log for LogSink {
                 Ok(_) => {
                     let task = LogTask {
                         logger: logger.clone(),
                         level: record.metadata().level(),
                         message,
                     };
                     let _ =
                         TaskRunnable::new("extension_storage_sync::Logger::log", Box::new(task))
-                            .and_then(|r| r.dispatch(logger.owning_thread()));
+                            .and_then(|r| TaskRunnable::dispatch(r, logger.owning_thread()));
                 }
                 Err(_) => {}
             }
         }
     }
 
     fn flush(&self) {}
 }
--- a/services/sync/golden_gate/src/task.rs
+++ b/services/sync/golden_gate/src/task.rs
@@ -191,17 +191,21 @@ where
         })
     }
 
     /// Dispatches the task to the given thread `target`.
     pub fn dispatch(self, target: &nsIEventTarget) -> Result<()> {
         let runnable = TaskRunnable::new(self.ferry.name(), Box::new(self))?;
         // `may_block` schedules the task on the I/O thread pool, since we
         // expect most operations to wait on I/O.
-        runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?;
+        TaskRunnable::dispatch_with_options(
+            runnable,
+            target,
+            DispatchOptions::default().may_block(true),
+        )?;
         Ok(())
     }
 }
 
 impl<N> FerryTask<N>
 where
     N: ?Sized + BridgedEngine,
     N::Error: BridgedError,
@@ -334,17 +338,21 @@ where
             )?,
             result: AtomicRefCell::new(Err(Error::DidNotRun(Self::name()).into())),
         })
     }
 
     /// Dispatches the task to the given thread `target`.
     pub fn dispatch(self, target: &nsIEventTarget) -> Result<()> {
         let runnable = TaskRunnable::new(Self::name(), Box::new(self))?;
-        runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?;
+        TaskRunnable::dispatch_with_options(
+            runnable,
+            target,
+            DispatchOptions::default().may_block(true),
+        )?;
         Ok(())
     }
 }
 
 impl<N> Task for ApplyTask<N>
 where
     N: ?Sized + BridgedEngine,
     N::Error: BridgedError,
--- a/toolkit/components/bitsdownload/src/bits_interface/mod.rs
+++ b/toolkit/components/bitsdownload/src/bits_interface/mod.rs
@@ -125,17 +125,17 @@ impl BitsService {
         action: Action,
     ) -> Result<(), BitsTaskError> {
         let command_thread = self
             .get_command_thread()
             .map_err(|rv| BitsTaskError::from_nsresult(FailedToStartThread, action, Pretask, rv))?;
         let runnable = TaskRunnable::new(task_runnable_name, task).map_err(|rv| {
             BitsTaskError::from_nsresult(FailedToConstructTaskRunnable, action, Pretask, rv)
         })?;
-        runnable.dispatch(&command_thread).map_err(|rv| {
+        TaskRunnable::dispatch(runnable, &command_thread).map_err(|rv| {
             BitsTaskError::from_nsresult(FailedToDispatchRunnable, action, Pretask, rv)
         })
     }
 
     fn inc_request_count(&self) {
         self.request_count.set(self.request_count.get() + 1);
     }
 
--- a/toolkit/components/extensions/storage/webext_storage_bridge/src/area.rs
+++ b/toolkit/components/extensions/storage/webext_storage_bridge/src/area.rs
@@ -72,18 +72,21 @@ impl StorageSyncArea {
     }
 
     /// Dispatches a task for a storage operation to the task queue.
     fn dispatch(&self, punt: Punt, callback: &mozIExtensionStorageCallback) -> Result<()> {
         let name = punt.name();
         let task = PuntTask::new(Arc::downgrade(&*self.store()?), punt, callback)?;
         let runnable = TaskRunnable::new(name, Box::new(task))?;
         // `may_block` schedules the runnable on a dedicated I/O pool.
-        runnable
-            .dispatch_with_options(self.queue.coerce(), DispatchOptions::new().may_block(true))?;
+        TaskRunnable::dispatch_with_options(
+            runnable,
+            self.queue.coerce(),
+            DispatchOptions::new().may_block(true),
+        )?;
         Ok(())
     }
 
     xpcom_method!(
         configure => Configure(
             database_file: *const nsIFile
         )
     );
@@ -235,41 +238,38 @@ impl StorageSyncArea {
         // strong reference to the store, since all other tasks that called
         // `Weak::upgrade` will have already finished. The `TeardownTask` can
         // then consume the `Arc` and destroy the store.
         let mut maybe_store = self.store.borrow_mut();
         match mem::take(&mut *maybe_store) {
             Some(store) => {
                 // Interrupt any currently-running statements.
                 store.interrupt();
-                // If dispatching the runnable fails, we'll drop the store and
-                // close its database connection on the main thread. This is a
-                // last resort, and can also happen if the last `RefPtr` to this
-                // storage area is released without calling `teardown`. In that
-                // case, the destructor for `self.store` will run, which
-                // automatically closes its database connection. mozStorage's
-                // `Connection::Release` also falls back to closing the
-                // connection on the main thread if it can't dispatch to the
-                // background thread.
+                // If dispatching the runnable fails, we'll leak the store
+                // without closing its database connection.
                 teardown(&self.queue, store, callback)?;
             }
             None => return Err(Error::AlreadyTornDown),
         }
         Ok(())
     }
 }
 
 fn teardown(
     queue: &nsISerialEventTarget,
     store: Arc<LazyStore>,
     callback: &mozIExtensionStorageCallback,
 ) -> Result<()> {
     let task = TeardownTask::new(store, callback)?;
     let runnable = TaskRunnable::new(TeardownTask::name(), Box::new(task))?;
-    runnable.dispatch_with_options(queue.coerce(), DispatchOptions::new().may_block(true))?;
+    TaskRunnable::dispatch_with_options(
+        runnable,
+        queue.coerce(),
+        DispatchOptions::new().may_block(true),
+    )?;
     Ok(())
 }
 
 /// `mozIInterruptible` implementation.
 impl StorageSyncArea {
     xpcom_method!(
         interrupt => Interrupt()
     );
--- a/toolkit/components/kvstore/src/lib.rs
+++ b/toolkit/components/kvstore/src/lib.rs
@@ -129,17 +129,17 @@ impl KeyValueService {
 
         let task = Box::new(GetOrCreateTask::new(
             RefPtr::new(callback),
             RefPtr::new(thread),
             nsCString::from(path),
             nsCString::from(name),
         ));
 
-        TaskRunnable::new("KVService::GetOrCreate", task)?.dispatch(thread)
+        TaskRunnable::dispatch(TaskRunnable::new("KVService::GetOrCreate", task)?, thread)
     }
 }
 
 #[derive(xpcom)]
 #[xpimplements(nsIKeyValueDatabase)]
 #[refcnt = "atomic"]
 pub struct InitKeyValueDatabase {
     rkv: Arc<RwLock<Rkv>>,
@@ -177,17 +177,17 @@ impl KeyValueDatabase {
             Arc::clone(&self.rkv),
             self.store,
             nsCString::from(key),
             value,
         ));
 
         let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
 
-        TaskRunnable::new("KVDatabase::Put", task)?.dispatch(thread)
+        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Put", task)?, thread)
     }
 
     xpcom_method!(
         write_many => WriteMany(
             callback: *const nsIKeyValueVoidCallback,
             pairs: *const ThinVec<RefPtr<nsIKeyValuePair>>
         )
     );
@@ -215,17 +215,17 @@ impl KeyValueDatabase {
             RefPtr::new(callback),
             Arc::clone(&self.rkv),
             self.store,
             entries,
         ));
 
         let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
 
-        TaskRunnable::new("KVDatabase::WriteMany", task)?.dispatch(thread)
+        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::WriteMany", task)?, thread)
     }
 
     xpcom_method!(
         get => Get(
             callback: *const nsIKeyValueVariantCallback,
             key: *const nsACString,
             default_value: *const nsIVariant
         )
@@ -242,67 +242,67 @@ impl KeyValueDatabase {
             Arc::clone(&self.rkv),
             self.store,
             nsCString::from(key),
             variant_to_owned(default_value)?,
         ));
 
         let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
 
-        TaskRunnable::new("KVDatabase::Get", task)?.dispatch(thread)
+        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Get", task)?, thread)
     }
 
     xpcom_method!(
         has => Has(callback: *const nsIKeyValueVariantCallback, key: *const nsACString)
     );
 
     fn has(&self, callback: &nsIKeyValueVariantCallback, key: &nsACString) -> Result<(), nsresult> {
         let task = Box::new(HasTask::new(
             RefPtr::new(callback),
             Arc::clone(&self.rkv),
             self.store,
             nsCString::from(key),
         ));
 
         let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
 
-        TaskRunnable::new("KVDatabase::Has", task)?.dispatch(thread)
+        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Has", task)?, thread)
     }
 
     xpcom_method!(
         delete => Delete(callback: *const nsIKeyValueVoidCallback, key: *const nsACString)
     );
 
     fn delete(&self, callback: &nsIKeyValueVoidCallback, key: &nsACString) -> Result<(), nsresult> {
         let task = Box::new(DeleteTask::new(
             RefPtr::new(callback),
             Arc::clone(&self.rkv),
             self.store,
             nsCString::from(key),
         ));
 
         let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
 
-        TaskRunnable::new("KVDatabase::Delete", task)?.dispatch(thread)
+        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Delete", task)?, thread)
     }
 
     xpcom_method!(
         clear => Clear(callback: *const nsIKeyValueVoidCallback)
     );
 
     fn clear(&self, callback: &nsIKeyValueVoidCallback) -> Result<(), nsresult> {
         let task = Box::new(ClearTask::new(
             RefPtr::new(callback),
             Arc::clone(&self.rkv),
             self.store,
         ));
 
         let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
 
-        TaskRunnable::new("KVDatabase::Clear", task)?.dispatch(thread)
+        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Clear", task)?, thread)
     }
 
     xpcom_method!(
         enumerate => Enumerate(
             callback: *const nsIKeyValueEnumeratorCallback,
             from_key: *const nsACString,
             to_key: *const nsACString
         )
@@ -319,17 +319,17 @@ impl KeyValueDatabase {
             Arc::clone(&self.rkv),
             self.store,
             nsCString::from(from_key),
             nsCString::from(to_key),
         ));
 
         let thread = self.thread.get_ref().ok_or(NS_ERROR_FAILURE)?;
 
-        TaskRunnable::new("KVDatabase::Enumerate", task)?.dispatch(thread)
+        TaskRunnable::dispatch(TaskRunnable::new("KVDatabase::Enumerate", task)?, thread)
     }
 }
 
 #[derive(xpcom)]
 #[xpimplements(nsIKeyValueEnumerator)]
 #[refcnt = "atomic"]
 pub struct InitKeyValueEnumerator {
     iter: AtomicRefCell<IntoIter<KeyValuePairResult>>,
--- a/toolkit/components/places/bookmark_sync/src/driver.rs
+++ b/toolkit/components/places/bookmark_sync/src/driver.rs
@@ -95,17 +95,17 @@ impl dogear::Driver for Driver {
             let task = RecordTelemetryEventTask {
                 progress: progress.clone(),
                 event,
             };
             let _ = TaskRunnable::new(
                 "bookmark_sync::Driver::record_telemetry_event",
                 Box::new(task),
             )
-            .and_then(|r| r.dispatch(progress.owning_thread()));
+            .and_then(|r| TaskRunnable::dispatch(r, progress.owning_thread()));
         }
     }
 }
 
 pub struct Logger {
     pub max_level: LevelFilter,
     logger: Option<ThreadPtrHandle<mozIServicesLogger>>,
 }
@@ -135,17 +135,17 @@ impl Log for Logger {
             match write!(message, "{}", record.args()) {
                 Ok(_) => {
                     let task = LogTask {
                         logger: logger.clone(),
                         level: record.metadata().level(),
                         message,
                     };
                     let _ = TaskRunnable::new("bookmark_sync::Logger::log", Box::new(task))
-                        .and_then(|r| r.dispatch(logger.owning_thread()));
+                        .and_then(|r| TaskRunnable::dispatch(r, logger.owning_thread()));
                 }
                 Err(_) => {}
             }
         }
     }
 
     fn flush(&self) {}
 }
--- a/toolkit/components/places/bookmark_sync/src/merger.rs
+++ b/toolkit/components/places/bookmark_sync/src/merger.rs
@@ -103,17 +103,17 @@ impl SyncedBookmarksMerger {
                 .map(|w| w.as_slice().to_vec())
                 .unwrap_or_default(),
             callback,
         )?;
         let runnable = TaskRunnable::new(
             "bookmark_sync::SyncedBookmarksMerger::merge",
             Box::new(task),
         )?;
-        runnable.dispatch(&async_thread)?;
+        TaskRunnable::dispatch(runnable, &async_thread)?;
         let op = MergeOp::new(controller);
         Ok(RefPtr::new(op.coerce()))
     }
 
     xpcom_method!(reset => Reset());
     fn reset(&self) -> Result<(), nsresult> {
         mem::drop(self.db.borrow_mut().take());
         mem::drop(self.logger.borrow_mut().take());
--- a/toolkit/components/xulstore/src/persist.rs
+++ b/toolkit/components/xulstore/src/persist.rs
@@ -171,17 +171,17 @@ pub(crate) fn persist(key: String, value
         // the last time we persisted, so dispatch a new PersistTask.
         let task = Box::new(PersistTask::new());
         let thread_guard = THREAD.lock()?;
         let thread = thread_guard
             .as_ref()
             .ok_or(XULStoreError::Unavailable)?
             .get_ref()
             .ok_or(XULStoreError::Unavailable)?;
-        TaskRunnable::new("XULStore::Persist", task)?.dispatch(thread)?;
+        TaskRunnable::dispatch(TaskRunnable::new("XULStore::Persist", task)?, thread)?;
     }
 
     // Now insert the key/value pair into the map.  The unwrap() call here
     // should never panic, since the code above sets `writes` to a Some(HashMap)
     // if it's None.
     changes.as_mut().unwrap().insert(key, value);
 
     Ok(())
--- a/xpcom/rust/moz_task/src/lib.rs
+++ b/xpcom/rust/moz_task/src/lib.rs
@@ -178,39 +178,52 @@ impl TaskRunnable {
         Ok(TaskRunnable::allocate(InitTaskRunnable {
             name,
             original_thread: get_current_thread()?,
             task,
             has_run: AtomicBool::new(false),
         }))
     }
 
+    /// Dispatches this task runnable to an event target with the default
+    /// options.
     #[inline]
-    pub fn dispatch(&self, target_thread: &nsIEventTarget) -> Result<(), nsresult> {
-        self.dispatch_with_options(target_thread, DispatchOptions::default())
+    pub fn dispatch(this: RefPtr<Self>, target: &nsIEventTarget) -> Result<(), nsresult> {
+        Self::dispatch_with_options(this, target, DispatchOptions::default())
     }
 
+    /// Dispatches this task runnable to an event target, like a thread or a
+    /// task queue, with the given options.
+    ///
+    /// Note that this is an associated function, not a method, because it takes
+    /// an owned reference to the runnable, and must be called like
+    /// `TaskRunnable::dispatch_with_options(runnable, options)` and *not*
+    /// `runnable.dispatch_with_options(options)`.
+    ///
+    /// ### Safety
+    ///
+    /// This function leaks the runnable if dispatch fails.
     pub fn dispatch_with_options(
-        &self,
-        target_thread: &nsIEventTarget,
+        this: RefPtr<Self>,
+        target: &nsIEventTarget,
         options: DispatchOptions,
     ) -> Result<(), nsresult> {
-        unsafe { target_thread.DispatchFromScript(self.coerce(), options.flags()) }.to_result()
+        unsafe { target.DispatchFromScript(this.coerce(), options.flags()) }.to_result()
     }
 
     xpcom_method!(run => Run());
     fn run(&self) -> Result<(), nsresult> {
         match self
             .has_run
             .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
         {
             Ok(_) => {
                 assert!(!is_current_thread(&self.original_thread));
                 self.task.run();
-                self.dispatch(&self.original_thread)
+                Self::dispatch(RefPtr::new(self), &self.original_thread)
             }
             Err(_) => {
                 assert!(is_current_thread(&self.original_thread));
                 self.task.done()
             }
         }
     }