Bug 1506697 [wpt PR 14024] - [wptserve] Eliminate race condition, a=testonly
☠☠ backed out by 46ad93280b06 ☠ ☠
authorjugglinmike <mike@mikepennisi.com>
Mon, 19 Nov 2018 18:44:42 +0000
changeset 503709 378d0c633eaee8cac785d3f0aa0e24f397efa6e2
parent 503708 eed5fefba0897a4dfd2ad9ae9408a02fcc024976
child 503710 dfbc81d5e5187d2e804f8fad2f8dedd9f51734ef
push id10290
push userffxbld-merge
push dateMon, 03 Dec 2018 16:23:23 +0000
treeherdermozilla-beta@700bed2445e6 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerstestonly
bugs1506697, 14024
milestone65.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1506697 [wpt PR 14024] - [wptserve] Eliminate race condition, a=testonly Automatic update from web-platform-tests[wptserve] Eliminate race condition (#14024) This race condition was expressed during testing sessions where the first test to use the Stash feature issued did so with multiple requests made in parallel. -- wpt-commits: cbb25e2c99696956ed2a36e7bcdbdee1dca71705 wpt-pr: 14024
testing/web-platform/tests/tools/wptserve/tests/test_stash.py
testing/web-platform/tests/tools/wptserve/wptserve/stash.py
new file mode 100644
--- /dev/null
+++ b/testing/web-platform/tests/tools/wptserve/tests/test_stash.py
@@ -0,0 +1,130 @@
+import threading
+import multiprocessing
+from multiprocessing.managers import BaseManager
+
+import pytest
+
+Stash = pytest.importorskip("wptserve.stash").Stash
+
+@pytest.fixture()
+def add_cleanup():
+    fns = []
+
+    def add(fn):
+        fns.append(fn)
+
+    yield add
+
+    for fn in fns:
+        fn()
+
+def run(process_queue, request_lock, response_lock):
+    """Create two Stash instances in parallel threads. Use the provided locks
+    to ensure the first thread is actively establishing an interprocess
+    communication channel at the moment the second thread executes."""
+
+    def target(thread_queue):
+        stash = Stash("/", ("localhost", 4543), b"some key")
+
+        # The `lock` property of the Stash instance should always be set
+        # immediately following initialization. These values are asserted in
+        # the active test.
+        thread_queue.put(stash.lock is None)
+
+    thread_queue = multiprocessing.Queue()
+    first = threading.Thread(target=target, args=(thread_queue,))
+    second = threading.Thread(target=target, args=(thread_queue,))
+
+    request_lock.acquire()
+    response_lock.acquire()
+    first.start()
+
+    request_lock.acquire()
+
+    # At this moment, the `first` thread is waiting for a proxied object.
+    # Create a second thread in order to inspect the behavior of the Stash
+    # constructor at this moment.
+
+    second.start()
+
+    # Allow the `first` thread to proceed
+
+    response_lock.release()
+
+    # Wait for both threads to complete and report their stateto the test
+    process_queue.put(thread_queue.get())
+    process_queue.put(thread_queue.get())
+
+
+def test_delayed_lock(add_cleanup):
+    """Ensure that delays in proxied Lock retrieval do not interfere with
+    initialization in parallel threads."""
+
+    class SlowLock(BaseManager):
+        pass
+
+    request_lock = multiprocessing.Lock()
+    response_lock = multiprocessing.Lock()
+
+    queue = multiprocessing.Queue()
+
+    def mutex_lock_request():
+        """This request handler allows the caller to delay execution of a
+        thread which has requested a proxied representation of the `lock`
+        property, simulating a "slow" interprocess communication channel."""
+
+        request_lock.release()
+        response_lock.acquire()
+        return threading.Lock()
+
+    SlowLock.register("get_dict", callable=lambda: {})
+    SlowLock.register("Lock", callable=mutex_lock_request)
+
+    slowlock = SlowLock(("localhost", 4543), b"some key")
+    slowlock.start()
+    add_cleanup(lambda: slowlock.shutdown())
+
+    parallel = multiprocessing.Process(target=run,
+                                       args=(queue, request_lock, response_lock))
+    parallel.start()
+    add_cleanup(lambda: parallel.terminate())
+
+    assert [queue.get(), queue.get()] == [False, False], (
+        "both instances had valid locks")
+
+def test_delayed_dict(add_cleanup):
+    """Ensure that delays in proxied `dict` retrieval do not interfere with
+    initialization in parallel threads."""
+
+    class SlowDict(BaseManager):
+        pass
+
+    request_lock = multiprocessing.Lock()
+    response_lock = multiprocessing.Lock()
+
+    queue = multiprocessing.Queue()
+
+    # This request handler allows the caller to delay execution of a thread
+    # which has requested a proxied representation of the "get_dict" property.
+    def mutex_dict_request():
+        """This request handler allows the caller to delay execution of a
+        thread which has requested a proxied representation of the `get_dict`
+        property, simulating a "slow" interprocess communication channel."""
+        request_lock.release()
+        response_lock.acquire()
+        return {}
+
+    SlowDict.register("get_dict", callable=mutex_dict_request)
+    SlowDict.register("Lock", callable=lambda: threading.Lock())
+
+    slowdict = SlowDict(("localhost", 4543), b"some key")
+    slowdict.start()
+    add_cleanup(lambda: slowdict.shutdown())
+
+    parallel = multiprocessing.Process(target=run,
+                                       args=(queue, request_lock, response_lock))
+    parallel.start()
+    add_cleanup(lambda: parallel.terminate())
+
+    assert [queue.get(), queue.get()] == [False, False], (
+        "both instances had valid locks")
--- a/testing/web-platform/tests/tools/wptserve/wptserve/stash.py
+++ b/testing/web-platform/tests/tools/wptserve/wptserve/stash.py
@@ -99,28 +99,38 @@ class Stash(object):
     written and the read operation (called "take") is destructive. Taken together,
     these properties make it difficult for data to accidentally leak
     between different resources or different requests for the same
     resource.
     """
 
     _proxy = None
     lock = None
+    _initializing = threading.Lock()
 
     def __init__(self, default_path, address=None, authkey=None):
         self.default_path = default_path
         self._get_proxy(address, authkey)
         self.data = Stash._proxy
 
     def _get_proxy(self, address=None, authkey=None):
         if address is None and authkey is None:
             Stash._proxy = {}
             Stash.lock = threading.Lock()
 
-        if Stash._proxy is None:
+        # Initializing the proxy involves connecting to the remote process and
+        # retrieving two proxied objects. This process is not inherently
+        # atomic, so a lock must be used to make it so. Atomicity ensures that
+        # only one thread attempts to initialize the connection and that any
+        # threads running in parallel correctly wait for initialization to be
+        # fully complete.
+        with Stash._initializing:
+            if Stash.lock:
+                return
+
             manager = ClientDictManager(address, authkey)
             manager.connect()
             Stash._proxy = manager.get_dict()
             Stash.lock = LockWrapper(manager.Lock())
 
     def _wrap_key(self, key, path):
         if path is None:
             path = self.default_path