Bug 1454385 - Add a single producer single consumer lock and wait free queue to mfbt/. r=froydnj
authorPaul Adenot <paul@paul.cx>
Fri, 13 Apr 2018 17:14:05 +0200
changeset 474336 977451308538306aa7ce69e2e33824a57f05b9dc
parent 474335 e9b1a091e83b62b7803436f1f9811fe39b390277
child 474337 06788dddc02d16e56281a5e35091aad168db0d22
push id9374
push userjlund@mozilla.com
push dateMon, 18 Jun 2018 21:43:20 +0000
treeherdermozilla-beta@160e085dfb0b [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersfroydnj
bugs1454385
milestone62.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1454385 - Add a single producer single consumer lock and wait free queue to mfbt/. r=froydnj MozReview-Commit-ID: 6Dq0GQtYgv2
mfbt/SPSCQueue.h
mfbt/moz.build
mfbt/tests/TestSPSCQueue.cpp
mfbt/tests/moz.build
testing/cppunittest.ini
new file mode 100644
--- /dev/null
+++ b/mfbt/SPSCQueue.h
@@ -0,0 +1,428 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/* Single producer single consumer lock-free and wait-free queue. */
+
+#ifndef mozilla_LockFreeQueue_h
+#define mozilla_LockFreeQueue_h
+
+#include "mozilla/Assertions.h"
+#include "mozilla/Attributes.h"
+#include "mozilla/PodOperations.h"
+#include <algorithm>
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <thread>
+
+namespace mozilla {
+
+namespace details {
+template<typename T, bool IsPod = std::is_trivial<T>::value>
+struct MemoryOperations {
+  /**
+   * This allows zeroing (using memset) or default-constructing a number of
+   * elements calling the constructors if necessary.
+   */
+  static void ConstructDefault(T* aDestination, size_t aCount);
+  /**
+   * This allows either moving (if T supports it) or copying a number of
+   * elements from a `aSource` pointer to a `aDestination` pointer.
+   * If it is safe to do so and this call copies, this uses PodCopy. Otherwise,
+   * constructors and destructors are called in a loop.
+   */
+  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount);
+};
+
+template<typename T>
+struct MemoryOperations<T, true>
+{
+  static void ConstructDefault(T* aDestination, size_t aCount)
+  {
+    PodZero(aDestination, aCount);
+  }
+  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount)
+  {
+    PodCopy(aDestination, aSource, aCount);
+  }
+};
+
+template<typename T>
+struct MemoryOperations<T, false>
+{
+  static void ConstructDefault(T* aDestination, size_t aCount)
+  {
+    for (size_t i = 0; i < aCount; i++) {
+      aDestination[i] = T();
+    }
+  }
+  static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount)
+  {
+    std::move(aSource, aSource + aCount, aDestination);
+  }
+};
+}
+
+
+/**
+ * This data structure allows producing data from one thread, and consuming it
+ * on another thread, safely and without explicit synchronization.
+ *
+ * The role for the producer and the consumer must be constant, i.e., the
+ * producer should always be on one thread and the consumer should always be on
+ * another thread.
+ *
+ * Some words about the inner workings of this class:
+ * - Capacity is fixed. Only one allocation is performed, in the constructor.
+ *   When reading and writing, the return value of the method allows checking if
+ *   the ring buffer is empty or full.
+ * - We always keep the read index at least one element ahead of the write
+ *   index, so we can distinguish between an empty and a full ring buffer: an
+ *   empty ring buffer is when the write index is at the same position as the
+ *   read index. A full buffer is when the write index is exactly one position
+ *   before the read index.
+ * - We synchronize updates to the read index after having read the data, and
+ *   the write index after having written the data. This means that the each
+ *   thread can only touch a portion of the buffer that is not touched by the
+ *   other thread.
+ * - Callers are expected to provide buffers. When writing to the queue,
+ *   elements are copied into the internal storage from the buffer passed in.
+ *   When reading from the queue, the user is expected to provide a buffer.
+ *   Because this is a ring buffer, data might not be contiguous in memory;
+ *   providing an external buffer to copy into is an easy way to have linear
+ *   data for further processing.
+ */
+template<typename T>
+class SPSCRingBufferBase
+{
+public:
+  /**
+   * Constructor for a ring buffer.
+   *
+   * This performs an allocation on the heap, but is the only allocation that
+   * will happen for the life time of a `SPSCRingBufferBase`.
+   *
+   * @param Capacity The maximum number of element this ring buffer will hold.
+   */
+  explicit
+  SPSCRingBufferBase(int aCapacity)
+    : mReadIndex(0)
+    , mWriteIndex(0)
+    /* One more element to distinguish from empty and full buffer. */
+    , mCapacity(aCapacity + 1)
+  {
+    MOZ_ASSERT(StorageCapacity() < std::numeric_limits<int>::max() / 2,
+              "buffer too large for the type of index used.");
+    MOZ_ASSERT(mCapacity > 0 && aCapacity != std::numeric_limits<int>::max());
+
+    mData = std::make_unique<T[]>(StorageCapacity());
+
+    std::atomic_thread_fence(std::memory_order::memory_order_seq_cst);
+  }
+  /**
+   * Push `aCount` zero or default constructed elements in the array.
+   *
+   * Only safely called on the producer thread.
+   *
+   * @param count The number of elements to enqueue.
+   * @return The number of element enqueued.
+   */
+  MOZ_MUST_USE
+  int EnqueueDefault(int aCount) {
+    return Enqueue(nullptr, aCount);
+  }
+  /**
+   * @brief Put an element in the queue.
+   *
+   * Only safely called on the producer thread.
+   *
+   * @param element The element to put in the queue.
+   *
+   * @return 1 if the element was inserted, 0 otherwise.
+   */
+  MOZ_MUST_USE
+  int Enqueue(T& aElement) {
+    return Enqueue(&aElement, 1);
+  }
+  /**
+   * Push `aCount` elements in the ring buffer.
+   *
+   * Only safely called on the producer thread.
+   *
+   * @param elements a pointer to a buffer containing at least `count` elements.
+   * If `elements` is nullptr, zero or default constructed elements are enqueud.
+   * @param count The number of elements to read from `elements`
+   * @return The number of elements successfully coped from `elements` and
+   * inserted into the ring buffer.
+   */
+  MOZ_MUST_USE
+  int Enqueue(T* aElements, int aCount)
+  {
+#ifdef DEBUG
+    AssertCorrectThread(mProducerId);
+#endif
+
+    int rdIdx = mReadIndex.load(std::memory_order::memory_order_acquire);
+    int wrIdx = mWriteIndex.load(std::memory_order::memory_order_relaxed);
+
+    if (IsFull(rdIdx, wrIdx)) {
+      return 0;
+    }
+
+    int toWrite = std::min(AvailableWriteInternal(rdIdx, wrIdx), aCount);
+
+    /* First part, from the write index to the end of the array. */
+    int firstPart = std::min(StorageCapacity() - wrIdx, toWrite);
+    /* Second part, from the beginning of the array */
+    int secondPart = toWrite - firstPart;
+
+    if (aElements) {
+      details::MemoryOperations<T>::MoveOrCopy(mData.get() + wrIdx, aElements, firstPart);
+      details::MemoryOperations<T>::MoveOrCopy(mData.get(), aElements + firstPart, secondPart);
+    } else {
+      details::MemoryOperations<T>::ConstructDefault(mData.get() + wrIdx, firstPart);
+      details::MemoryOperations<T>::ConstructDefault(mData.get(), secondPart);
+    }
+
+    mWriteIndex.store(IncrementIndex(wrIdx, toWrite),
+                      std::memory_order::memory_order_release);
+
+    return toWrite;
+  }
+  /**
+   * Retrieve at most `count` elements from the ring buffer, and copy them to
+   * `elements`, if non-null.
+   *
+   * Only safely called on the consumer side.
+   *
+   * @param elements A pointer to a buffer with space for at least `count`
+   * elements. If `elements` is `nullptr`, `count` element will be discarded.
+   * @param count The maximum number of elements to Dequeue.
+   * @return The number of elements written to `elements`.
+   */
+  MOZ_MUST_USE
+  int Dequeue(T* elements, int count)
+  {
+#ifdef DEBUG
+    AssertCorrectThread(mConsumerId);
+#endif
+
+    int wrIdx = mWriteIndex.load(std::memory_order::memory_order_acquire);
+    int rdIdx = mReadIndex.load(std::memory_order::memory_order_relaxed);
+
+    if (IsEmpty(rdIdx, wrIdx)) {
+      return 0;
+    }
+
+    int toRead = std::min(AvailableReadInternal(rdIdx, wrIdx), count);
+
+    int firstPart = std::min(StorageCapacity() - rdIdx, toRead);
+    int secondPart = toRead - firstPart;
+
+    if (elements) {
+      details::MemoryOperations<T>::MoveOrCopy(elements, mData.get() + rdIdx, firstPart);
+      details::MemoryOperations<T>::MoveOrCopy(elements + firstPart, mData.get(), secondPart);
+    }
+
+    mReadIndex.store(IncrementIndex(rdIdx, toRead),
+                     std::memory_order::memory_order_release);
+
+    return toRead;
+  }
+  /**
+   * Get the number of available elements for consuming.
+   *
+   * Only safely called on the consumer thread. This can be less than the actual
+   * number of elements in the queue, since the mWriteIndex is updated at the
+   * very end of the Enqueue method on the producer thread, but consequently
+   * always returns a number of elements such that a call to Dequeue return this
+   * number of elements.
+   *
+   * @return The number of available elements for reading.
+   */
+  int AvailableRead() const
+  {
+#ifdef DEBUG
+    AssertCorrectThread(mConsumerId);
+#endif
+    return AvailableReadInternal(
+      mReadIndex.load(std::memory_order::memory_order_relaxed),
+      mWriteIndex.load(std::memory_order::memory_order_relaxed));
+  }
+  /**
+   * Get the number of available elements for writing.
+   *
+   * Only safely called on the producer thread. This can be less than than the
+   * actual number of slots that are available, because mReadIndex is update at
+   * the very end of the Deque method. It always returns a number such that a
+   * call to Enqueue with this number will succeed in enqueuing this number of
+   * elements.
+   *
+   * @return The number of empty slots in the buffer, available for writing.
+   */
+  int AvailableWrite() const
+  {
+#ifdef DEBUG
+    AssertCorrectThread(mProducerId);
+#endif
+    return AvailableWriteInternal(
+      mReadIndex.load(std::memory_order::memory_order_relaxed),
+      mWriteIndex.load(std::memory_order::memory_order_relaxed));
+  }
+  /**
+   * Get the total Capacity, for this ring buffer.
+   *
+   * Can be called safely on any thread.
+   *
+   * @return The maximum Capacity of this ring buffer.
+   */
+  int Capacity() const { return StorageCapacity() - 1; }
+  /**
+   * Reset the consumer and producer thread identifier, in case the threads are
+   * being changed. This has to be externally synchronized. This is no-op when
+   * asserts are disabled.
+   */
+  void ResetThreadIds()
+  {
+#ifdef DEBUG
+    mConsumerId = mProducerId = std::thread::id();
+#endif
+  }
+private:
+  /** Return true if the ring buffer is empty.
+   *
+   * This can be called from the consumer or the producer thread.
+   *
+   * @param aReadIndex the read index to consider
+   * @param writeIndex the write index to consider
+   * @return true if the ring buffer is empty, false otherwise.
+   **/
+  bool IsEmpty(int aReadIndex, int aWriteIndex) const
+  {
+    return aWriteIndex == aReadIndex;
+  }
+  /** Return true if the ring buffer is full.
+   *
+   * This happens if the write index is exactly one element behind the read
+   * index.
+   *
+   * This can be called from the consummer or the producer thread.
+   *
+   * @param aReadIndex the read index to consider
+   * @param writeIndex the write index to consider
+   * @return true if the ring buffer is full, false otherwise.
+   **/
+  bool IsFull(int aReadIndex, int aWriteIndex) const
+  {
+    return (aWriteIndex + 1) % StorageCapacity() == aReadIndex;
+  }
+  /**
+   * Return the size of the storage. It is one more than the number of elements
+   * that can be stored in the buffer.
+   *
+   * This can be called from any thread.
+   *
+   * @return the number of elements that can be stored in the buffer.
+   */
+  int StorageCapacity() const { return mCapacity; }
+  /**
+   * Returns the number of elements available for reading.
+   *
+   * This can be called from the consummer or producer thread, but see the
+   * comment in `AvailableRead`.
+   *
+   * @return the number of available elements for reading.
+   */
+  int AvailableReadInternal(int aReadIndex, int aWriteIndex) const
+  {
+    if (aWriteIndex >= aReadIndex) {
+      return aWriteIndex - aReadIndex;
+    } else {
+      return aWriteIndex + StorageCapacity() - aReadIndex;
+    }
+  }
+  /**
+   * Returns the number of empty elements, available for writing.
+   *
+   * This can be called from the consummer or producer thread, but see the
+   * comment in `AvailableWrite`.
+   *
+   * @return the number of elements that can be written into the array.
+   */
+  int AvailableWriteInternal(int aReadIndex, int aWriteIndex) const
+  {
+    /* We subtract one element here to always keep at least one sample
+     * free in the buffer, to distinguish between full and empty array. */
+    int rv = aReadIndex - aWriteIndex - 1;
+    if (aWriteIndex >= aReadIndex) {
+      rv += StorageCapacity();
+    }
+    return rv;
+  }
+  /**
+   * Increments an index, wrapping it around the storage.
+   *
+   * Incrementing `mWriteIndex` can be done on the producer thread.
+   * Incrementing `mReadIndex` can be done on the consummer thread.
+   *
+   * @param index a reference to the index to increment.
+   * @param increment the number by which `index` is incremented.
+   * @return the new index.
+   */
+  int IncrementIndex(int aIndex, int aIncrement) const
+  {
+    MOZ_ASSERT(aIncrement >= 0 &&
+               aIncrement < StorageCapacity() &&
+               aIndex < StorageCapacity());
+    return (aIndex + aIncrement) % StorageCapacity();
+  }
+  /**
+   * @brief This allows checking that Enqueue (resp. Dequeue) are always
+   * called by the right thread.
+   *
+   * The role of the thread are assigned the first time they call Enqueue or
+   * Dequeue, and cannot change, except when ResetThreadIds is called..
+   *
+   * @param id the id of the thread that has called the calling method first.
+   */
+#ifdef DEBUG
+  static void AssertCorrectThread(std::thread::id& aId)
+  {
+    if (aId == std::thread::id()) {
+      aId = std::this_thread::get_id();
+      return;
+    }
+    MOZ_ASSERT(aId == std::this_thread::get_id());
+  }
+#endif
+  /** Index at which the oldest element is. */
+  std::atomic<int> mReadIndex;
+  /** Index at which to write new elements. `mWriteIndex` is always at
+   * least one element ahead of `mReadIndex`. */
+  std::atomic<int> mWriteIndex;
+  /** Maximum number of elements that can be stored in the ring buffer. */
+  const int mCapacity;
+  /** Data storage, of size `mCapacity + 1` */
+  std::unique_ptr<T[]> mData;
+#ifdef DEBUG
+  /** The id of the only thread that is allowed to read from the queue. */
+  mutable std::thread::id mConsumerId;
+  /** The id of the only thread that is allowed to write from the queue. */
+  mutable std::thread::id mProducerId;
+#endif
+};
+
+/**
+ * Instantiation of the `SPSCRingBufferBase` type. This is safe to use
+ * from two threads, one producer, one consumer (that never change role),
+ * without explicit synchronization.
+ */
+template<typename T>
+using SPSCQueue = SPSCRingBufferBase<T>;
+
+} // namespace mozilla
+
+#endif // mozilla_LockFreeQueue_h
--- a/mfbt/moz.build
+++ b/mfbt/moz.build
@@ -83,16 +83,17 @@ EXPORTS.mozilla = [
     'ScopeExit.h',
     'SegmentedVector.h',
     'SHA1.h',
     'SharedLibrary.h',
     'SmallPointerArray.h',
     'Span.h',
     'SplayTree.h',
     'Sprintf.h',
+    'SPSCQueue.h',
     'StaticAnalysisFunctions.h',
     'TaggedAnonymousMemory.h',
     'TemplateLib.h',
     'TextUtils.h',
     'ThreadLocal.h',
     'ThreadSafeWeakPtr.h',
     'ToString.h',
     'Tuple.h',
new file mode 100644
--- /dev/null
+++ b/mfbt/tests/TestSPSCQueue.cpp
@@ -0,0 +1,251 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "mozilla/SPSCQueue.h"
+#include "mozilla/PodOperations.h"
+#include <vector>
+#include <iostream>
+#include <thread>
+#include <chrono>
+#include <memory>
+#include <string>
+
+using namespace mozilla;
+
+/* Generate a monotonically increasing sequence of numbers. */
+template<typename T>
+class SequenceGenerator
+{
+public:
+  SequenceGenerator()
+  { }
+  void Get(T * aElements, size_t aCount)
+  {
+    for (size_t i = 0; i < aCount; i++) {
+      aElements[i] = static_cast<T>(mIndex);
+      mIndex++;
+    }
+  }
+  void Rewind(size_t aCount)
+  {
+    mIndex -= aCount;
+  }
+private:
+  size_t mIndex = 0;
+};
+
+/* Checks that a sequence is monotonically increasing. */
+template<typename T>
+class SequenceVerifier
+{
+public:
+  SequenceVerifier()
+  { }
+  void Check(T * aElements, size_t aCount)
+  {
+    for (size_t i = 0; i < aCount; i++) {
+      if (aElements[i] != static_cast<T>(mIndex)) {
+        std::cerr << "Element " << i << " is different. Expected "
+          << static_cast<T>(mIndex) << ", got " << aElements[i]
+          << "." << std::endl;
+        MOZ_RELEASE_ASSERT(false);
+      }
+      mIndex++;
+    }
+  }
+private:
+  size_t mIndex = 0;
+};
+
+const int BLOCK_SIZE = 127;
+
+template<typename T>
+void TestRing(int capacity)
+{
+  SPSCQueue<T> buf(capacity);
+  std::unique_ptr<T[]> seq(new T[capacity]);
+  SequenceGenerator<T> gen;
+  SequenceVerifier<T> checker;
+
+  int iterations = 1002;
+
+  while(iterations--) {
+    gen.Get(seq.get(), BLOCK_SIZE);
+    int rv = buf.Enqueue(seq.get(), BLOCK_SIZE);
+    MOZ_RELEASE_ASSERT(rv == BLOCK_SIZE);
+    PodZero(seq.get(), BLOCK_SIZE);
+    rv = buf.Dequeue(seq.get(), BLOCK_SIZE);
+    MOZ_RELEASE_ASSERT(rv == BLOCK_SIZE);
+    checker.Check(seq.get(), BLOCK_SIZE);
+  }
+}
+
+template<typename T>
+void TestRingMultiThread(int capacity)
+{
+  SPSCQueue<T> buf(capacity);
+  SequenceVerifier<T> checker;
+  std::unique_ptr<T[]> outBuffer(new T[capacity]);
+
+  std::thread t([&buf, capacity] {
+    int iterations = 1002;
+    std::unique_ptr<T[]> inBuffer(new T[capacity]);
+    SequenceGenerator<T> gen;
+
+    while(iterations--) {
+      std::this_thread::sleep_for(std::chrono::microseconds(10));
+      gen.Get(inBuffer.get(), BLOCK_SIZE);
+      int rv = buf.Enqueue(inBuffer.get(), BLOCK_SIZE);
+      MOZ_RELEASE_ASSERT(rv <= BLOCK_SIZE);
+      if (rv != BLOCK_SIZE) {
+        gen.Rewind(BLOCK_SIZE - rv);
+      }
+    }
+  });
+
+  int remaining = 1002;
+
+  while(remaining--) {
+    std::this_thread::sleep_for(std::chrono::microseconds(10));
+    int rv = buf.Dequeue(outBuffer.get(), BLOCK_SIZE);
+    MOZ_RELEASE_ASSERT(rv <= BLOCK_SIZE);
+    checker.Check(outBuffer.get(), rv);
+  }
+
+  t.join();
+}
+
+template<typename T>
+void BasicAPITest(T& ring)
+{
+  MOZ_RELEASE_ASSERT(ring.Capacity() == 128);
+
+  MOZ_RELEASE_ASSERT(ring.AvailableRead() == 0);
+  MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 128);
+
+  int rv = ring.EnqueueDefault(63);
+
+  MOZ_RELEASE_ASSERT(rv == 63);
+  MOZ_RELEASE_ASSERT(ring.AvailableRead() == 63);
+  MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 65);
+
+  rv = ring.EnqueueDefault(65);
+
+  MOZ_RELEASE_ASSERT(rv == 65);
+  MOZ_RELEASE_ASSERT(ring.AvailableRead() == 128);
+  MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 0);
+
+  rv = ring.Dequeue(nullptr, 63);
+
+  MOZ_RELEASE_ASSERT(ring.AvailableRead() == 65);
+  MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 63);
+
+  rv = ring.Dequeue(nullptr, 65);
+
+  MOZ_RELEASE_ASSERT(ring.AvailableRead() == 0);
+  MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 128);
+}
+
+const size_t RING_BUFFER_SIZE = 128;
+const size_t ENQUEUE_SIZE = RING_BUFFER_SIZE / 2;
+
+void TestResetAPI() {
+  SPSCQueue<float> ring(RING_BUFFER_SIZE);
+  std::thread t([&ring] {
+    std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]);
+    int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE);
+    MOZ_RELEASE_ASSERT(rv > 0);
+  });
+
+  t.join();
+
+  ring.ResetThreadIds();
+
+  // Enqueue with a different thread. We have reset the thread ID
+  // in the ring buffer, this should work.
+  std::thread t2([&ring] {
+    std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]);
+    int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE);
+    MOZ_RELEASE_ASSERT(rv > 0);
+  });
+
+  t2.join();
+}
+
+void
+TestMove()
+{
+  const size_t ELEMENT_COUNT = 16;
+  struct Thing {
+    Thing()
+      : mStr("")
+    { }
+    explicit
+    Thing(const std::string& aStr)
+      :mStr(aStr)
+    { }
+    Thing(Thing&& aOtherThing)
+    {
+      mStr = std::move(aOtherThing.mStr);
+      // aOtherThing.mStr.clear();
+    }
+    Thing& operator=(Thing&& aOtherThing)
+    {
+      mStr = std::move(aOtherThing.mStr);
+      return *this;
+    }
+    std::string mStr;
+  };
+
+  std::vector<Thing> vec_in;
+  std::vector<Thing> vec_out;
+
+  for (uint32_t i = 0; i < ELEMENT_COUNT; i++) {
+    vec_in.push_back(Thing(std::to_string(i)));
+    vec_out.push_back(Thing());
+  }
+
+  SPSCQueue<Thing> queue(ELEMENT_COUNT);
+
+  int rv = queue.Enqueue(&vec_in[0], ELEMENT_COUNT);
+  MOZ_RELEASE_ASSERT(rv == ELEMENT_COUNT);
+
+  // Check that we've moved the std::string into the queue.
+  for (uint32_t i = 0; i < ELEMENT_COUNT; i++) {
+    MOZ_RELEASE_ASSERT(vec_in[i].mStr.empty());
+  }
+
+  rv = queue.Dequeue(&vec_out[0], ELEMENT_COUNT);
+  MOZ_RELEASE_ASSERT(rv == ELEMENT_COUNT);
+
+  for (uint32_t i = 0; i < ELEMENT_COUNT; i++) {
+    MOZ_RELEASE_ASSERT(std::stoul(vec_out[i].mStr) == i);
+  }
+}
+
+int main()
+{
+  const int minCapacity = 199;
+  const int maxCapacity = 1277;
+  const int capacityIncrement = 27;
+
+  SPSCQueue<float> q1(128);
+  BasicAPITest(q1);
+  SPSCQueue<char> q2(128);
+  BasicAPITest(q2);
+
+  for (uint32_t i = minCapacity; i < maxCapacity; i+=capacityIncrement) {
+    TestRing<uint32_t>(i);
+    TestRingMultiThread<uint32_t>(i);
+    TestRing<float>(i);
+    TestRingMultiThread<float>(i);
+  }
+
+  TestResetAPI();
+  TestMove();
+
+  return 0;
+}
--- a/mfbt/tests/moz.build
+++ b/mfbt/tests/moz.build
@@ -46,16 +46,17 @@ CppUnitTests([
     'TestResult',
     'TestRollingMean',
     'TestSaturate',
     'TestScopeExit',
     'TestSegmentedVector',
     'TestSHA1',
     'TestSmallPointerArray',
     'TestSplayTree',
+    'TestSPSCQueue',
     'TestTemplateLib',
     'TestTextUtils',
     'TestThreadSafeWeakPtr',
     'TestTuple',
     'TestTypedEnum',
     'TestTypeTraits',
     'TestUniquePtr',
     'TestVariant',
--- a/testing/cppunittest.ini
+++ b/testing/cppunittest.ini
@@ -37,16 +37,18 @@ skip-if = os == 'android' # Bug 1147630
 [TestRefPtr]
 [TestRollingMean]
 [TestScopeExit]
 [TestSegmentedVector]
 [TestSHA1]
 [TestSmallPointerArray]
 [TestSaturate]
 [TestSplayTree]
+[TestSPSCQueue]
+skip-if = os == 'linux' # Bug 1464084
 [TestSyncRunnable]
 [TestTemplateLib]
 [TestTuple]
 [TestTypeTraits]
 [TestTypedEnum]
 [TestUniquePtr]
 [TestVariant]
 [TestVector]