Bug 1444976 - Implement a lock-free asynchronous logging system to be used from real-time audio and video code. r=froydnj
☠☠ backed out by ab04b1ca218d ☠ ☠
authorPaul Adenot <paul@paul.cx>
Tue, 20 Mar 2018 18:14:47 +0100
changeset 467545 0fa5f7cfee1da60eca676b977deb7ad65dc28082
parent 467544 09030f59fbeae4360be8c46d7e85ff32e9c00201
child 467546 822a57090d848da8e0ec608040a43ff96fbaa72d
push id9165
push userasasaki@mozilla.com
push dateThu, 26 Apr 2018 21:04:54 +0000
treeherdermozilla-beta@064c3804de2e [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersfroydnj
bugs1444976
milestone61.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1444976 - Implement a lock-free asynchronous logging system to be used from real-time audio and video code. r=froydnj This outputs to MOZ_LOG and using an MPSC lock-free queue so we can log to a particular module from any thread. MozReview-Commit-ID: INtlki4PEJs
dom/media/AsyncLogger.h
new file mode 100644
--- /dev/null
+++ b/dom/media/AsyncLogger.h
@@ -0,0 +1,269 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/* Implementation of an asynchronous lock-free logging system. */
+
+#ifndef mozilla_dom_AsyncLogger_h
+#define mozilla_dom_AsyncLogger_h
+
+#include <atomic>
+#include <thread>
+#include "mozilla/Logging.h"
+#include "mozilla/Attributes.h"
+#include "mozilla/MathAlgorithms.h"
+#include "mozilla/Sprintf.h"
+
+namespace mozilla {
+
+namespace detail {
+
+ // This class implements a lock-free multiple producer single consumer queue of
+ // fixed size log messages, with the following characteristics:
+ // - Unbounded (uses a intrinsic linked list)
+ // - Allocates on Push. Push can be called on any thread.
+ // - Deallocates on Pop. Pop MUST always be called on the same thread for the
+ // life-time of the queue.
+ //
+ // In our scenario, the producer threads are real-time, they can't block. The
+ // consummer thread runs every now and then and empties the queue to a log
+ // file, on disk.
+ //
+ // Having fixed size messages and jemalloc is probably not the fastest, but
+ // allows having a simpler design, we count on the fact that jemalloc will get
+ // the memory from a thread-local source most of the time.
+template<size_t MESSAGE_LENGTH>
+class MPSCQueue
+{
+public:
+    struct Message {
+        Message()
+        {
+           mNext.store(nullptr, std::memory_order_relaxed);
+        }
+        Message(const Message& aMessage) = delete;
+        void operator=(const Message& aMessage) = delete;
+
+        char data[MESSAGE_LENGTH];
+        std::atomic<Message*> mNext;
+    };
+    // Creates a new MPSCQueue. Initially, the queue has a single sentinel node,
+    // pointed to by both mHead and mTail.
+    MPSCQueue()
+    // At construction, the initial message points to nullptr (it has no
+    // successor). It is a sentinel node, that does not contain meaningful
+    // data.
+    : mHead(new Message())
+    , mTail(mHead.load(std::memory_order_relaxed))
+    { }
+
+    ~MPSCQueue()
+    {
+        Message dummy;
+        while (this->Pop(dummy.data)) {}
+        Message* front = mHead.load(std::memory_order_relaxed);
+        delete front;
+    }
+
+    void
+    Push(MPSCQueue<MESSAGE_LENGTH>::Message* aMessage)
+    {
+        // The next two non-commented line are called A and B in this paragraph.
+        // Producer threads i, i-1, etc. are numbered in the order they reached
+        // A in time, thread i being the thread that has reached A first.
+        // Atomically, on line A the new `mHead` is set to be the node that was
+        // just allocated, with strong memory order. From now one, any thread
+        // that reaches A will see that the node just allocated is
+        // effectively the head of the list, and will make itself the new head
+        // of the list.
+        // In a bad case (when thread i executes A and then
+        // is not scheduled for a long time), it is possible that thread i-1 and
+        // subsequent threads create a seemingly disconnected set of nodes, but
+        // they all have the correct value for the next node to set as their
+        // mNext member on their respective stacks (in `prev`), and this is
+        // always correct. When the scheduler resumes, and line B is executed,
+        // the correct linkage is resumed.
+        // Before line B, since mNext for the node the was the last element of
+        // the queue still has an mNext of nullptr, Pop will not see the node
+        // added.
+        // For line A, it's critical to have strong ordering both ways (since
+        // it's going to possibly be read and write repeatidly by multiple
+        // threads)
+        // Line B can have weaker guarantees, it's only going to be written by a
+        // single thread, and we just need to ensure it's read properly by a
+        // single other one.
+        Message* prev = mHead.exchange(aMessage, std::memory_order_acq_rel);
+        prev->mNext.store(aMessage, std::memory_order_release);
+    }
+
+    // Allocates a new node, copy aInput to the new memory location, and pushes
+    // it to the end of the list.
+    void
+    Push(const char aInput[MESSAGE_LENGTH])
+    {
+        // Create a new message, and copy the messages passed on argument to the
+        // new memory location. We are not touching the queue right now. The
+        // successor for this new node is set to be nullptr.
+        Message* msg = new Message();
+        strncpy(msg->data, aInput, MESSAGE_LENGTH);
+
+        Push(msg);
+    }
+
+    // Copy the content of the first message of the queue to aOutput, and
+    // frees the message. Returns true if there was a message, in which case
+    // `aOutput` contains a valid value. If the queue was empty, returns false,
+    // in which case `aOutput` is left untouched.
+    bool
+    Pop(char aOutput[MESSAGE_LENGTH])
+    {
+        // Similarly, in this paragraph, the two following lines are called A
+        // and B, and threads are called thread i, i-1, etc. in order of
+        // execution of line A.
+        // On line A, the first element of the queue is acquired. It is simply a
+        // sentinel node.
+        // On line B, we acquire the node that has the data we want. If B is
+        // null, then only the sentinel node was present in the queue, we can
+        // safely return false.
+        // mTail can be loaded with relaxed ordering, since it's not written nor
+        // read by any other thread (this queue is single consumer).
+        // mNext can be written to by one of the producer, so it's necessary to
+        // ensure those writes are seen, hence the stricter ordering.
+        Message* tail = mTail.load(std::memory_order_relaxed);
+        Message* next = tail->mNext.load(std::memory_order_acquire);
+
+        if (next == nullptr) {
+            return false;
+        }
+
+        strncpy(aOutput, next->data, MESSAGE_LENGTH);
+
+        // Simply shift the queue one node further, so that the sentinel node is
+        // now pointing to the correct most ancient node. It contains stale data,
+        // but this data will never be read again.
+        // It's only necessary to ensure the previous load on this thread is not
+        // reordered past this line, so release ordering is sufficient here.
+        mTail.store(next, std::memory_order_release);
+
+        // This thread is now the only thing that points to `tail`, it can be
+        // safely deleted.
+        delete tail;
+
+        return true;
+    }
+
+private:
+    // An atomic pointer to the most recent message in the queue.
+    std::atomic<Message*> mHead;
+    // An atomic pointer to a sentinel node, that points to the oldest message
+    // in the queue.
+    std::atomic<Message*> mTail;
+
+    MPSCQueue(const MPSCQueue&) = delete;
+    void operator=(const MPSCQueue&) = delete;
+public:
+    // The goal here is to make it easy on the allocator. We pack a pointer in the
+    // message struct, and we still want to do power of two allocations to
+    // minimize allocator slop. The allocation size are going to be constant, so
+    // the allocation is probably going to hit the thread local cache in jemalloc,
+    // making it cheap and, more importantly, lock-free enough.
+    static const size_t MESSAGE_PADDING = sizeof(Message::mNext);
+private:
+    static_assert(IsPowerOfTwo(MESSAGE_LENGTH + MESSAGE_PADDING),
+                  "MPSCQueue internal allocations must have a size that is a"
+                  "power of two ");
+};
+} // end namespace detail
+
+// This class implements a lock-free asynchronous logger, that outputs to
+// MOZ_LOG.
+// Any thread can use this logger without external synchronization and without
+// being blocked. This log is suitable for use in real-time audio threads.
+// Log formatting is best done externally, this class implements the output
+// mechanism only.
+// This class uses a thread internally, and must be started and stopped
+// manually.
+// If logging is disabled, all the calls are no-op.
+class AsyncLogger
+{
+public:
+  static const uint32_t MAX_MESSAGE_LENGTH = 512 - detail::MPSCQueue<8>::MESSAGE_PADDING;
+
+  // aLogModuleName is the name of the MOZ_LOG module.
+  explicit AsyncLogger(const char* aLogModuleName)
+  : mThread(nullptr)
+  , mLogModule(aLogModuleName)
+  , mRunning(false)
+  { }
+
+  ~AsyncLogger()
+  {
+    if (Enabled()) {
+      Stop();
+    }
+  }
+
+  void Start()
+  {
+    MOZ_ASSERT(!mRunning, "Double calls to AsyncLogger::Start");
+    if (Enabled()) {
+      mRunning = true;
+      Run();
+    }
+  }
+
+  void Stop()
+  {
+    if (Enabled()) {
+      if (mRunning) {
+        mRunning = false;
+        mThread->join();
+      }
+    } else {
+      MOZ_ASSERT(!mRunning && !mThread);
+    }
+  }
+
+  void Log(const char* format, ...) MOZ_FORMAT_PRINTF(2,3)
+  {
+    if (Enabled()) {
+      auto* msg = new detail::MPSCQueue<MAX_MESSAGE_LENGTH>::Message();
+      va_list args;
+      va_start(args, format);
+      VsprintfLiteral(msg->data, format, args);
+      va_end(args);
+      mMessageQueue.Push(msg);
+    }
+  }
+
+  bool Enabled()
+  {
+    return MOZ_LOG_TEST(mLogModule, mozilla::LogLevel::Verbose);
+  }
+
+private:
+  void Run()
+  {
+    MOZ_ASSERT(Enabled());
+    mThread.reset(new std::thread([this]() {
+      while (mRunning) {
+        char message[MAX_MESSAGE_LENGTH];
+        while (mMessageQueue.Pop(message) && mRunning) {
+          MOZ_LOG(mLogModule, mozilla::LogLevel::Verbose, ("%s", message));
+        }
+        Sleep();
+      }
+    }));
+  }
+
+  void Sleep() { std::this_thread::sleep_for(std::chrono::milliseconds(10)); }
+
+  std::unique_ptr<std::thread> mThread;
+  mozilla::LazyLogModule mLogModule;
+  detail::MPSCQueue<MAX_MESSAGE_LENGTH> mMessageQueue;
+  std::atomic<bool> mRunning;
+};
+
+} // end namespace mozilla
+
+#endif // mozilla_dom_AsyncLogger_h