Bug 1254730 - ChannelEventQueue must be thread-safe, r=michal
authorAndrea Marchesini <amarchesini@mozilla.com>
Mon, 14 Mar 2016 17:10:26 +0100
changeset 288593 db5ef9e43187441dea6aedd46d3d27bc4c9cd66b
parent 288592 6a6e000c7a42f5a2b88b60ee1b7134f5bd95017d
child 288594 96b5a49990152aeb3198bdd08b94557cf7ec65f3
push id18174
push usercbook@mozilla.com
push dateTue, 15 Mar 2016 09:44:58 +0000
treeherderfx-team@dd0baa33759d [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersmichal
bugs1254730
milestone48.0a1
Bug 1254730 - ChannelEventQueue must be thread-safe, r=michal
netwerk/ipc/ChannelEventQueue.cpp
netwerk/ipc/ChannelEventQueue.h
netwerk/protocol/ftp/FTPChannelChild.cpp
netwerk/protocol/ftp/FTPChannelParent.cpp
netwerk/protocol/http/HttpChannelChild.cpp
netwerk/protocol/http/HttpChannelParent.cpp
netwerk/protocol/websocket/WebSocketChannelChild.cpp
netwerk/protocol/wyciwyg/WyciwygChannelChild.cpp
--- a/netwerk/ipc/ChannelEventQueue.cpp
+++ b/netwerk/ipc/ChannelEventQueue.cpp
@@ -7,63 +7,78 @@
 
 #include "nsISupports.h"
 #include "mozilla/net/ChannelEventQueue.h"
 #include "nsThreadUtils.h"
 
 namespace mozilla {
 namespace net {
 
+ChannelEvent*
+ChannelEventQueue::TakeEvent()
+{
+  MutexAutoLock lock(mMutex);
+  MOZ_ASSERT(mFlushing);
+
+  if (mSuspended || mEventQueue.IsEmpty()) {
+    return nullptr;
+  }
+
+  UniquePtr<ChannelEvent> event(Move(mEventQueue[0]));
+  mEventQueue.RemoveElementAt(0);
+
+  return event.release();
+}
+
 void
 ChannelEventQueue::FlushQueue()
 {
   // Events flushed could include destruction of channel (and our own
   // destructor) unless we make sure its refcount doesn't drop to 0 while this
   // method is running.
   nsCOMPtr<nsISupports> kungFuDeathGrip(mOwner);
 
   // Prevent flushed events from flushing the queue recursively
-  mFlushing = true;
-
-  uint32_t i;
-  for (i = 0; i < mEventQueue.Length(); i++) {
-    mEventQueue[i]->Run();
-    if (mSuspended)
-      break;
+  {
+    MutexAutoLock lock(mMutex);
+    mFlushing = true;
   }
 
-  // We will always want to remove at least one finished callback.
-  if (i < mEventQueue.Length())
-    i++;
+  while (true) {
+    UniquePtr<ChannelEvent> event(TakeEvent());
+    if (!event) {
+      break;
+    }
 
-  // It is possible for new callbacks to be enqueued as we are
-  // flushing the queue, so the queue must not be cleared until
-  // all callbacks have run.
-  mEventQueue.RemoveElementsAt(0, i);
+    event->Run();
+  }
 
+  MutexAutoLock lock(mMutex);
   mFlushing = false;
 }
 
 void
 ChannelEventQueue::Resume()
 {
+  MutexAutoLock lock(mMutex);
+
   // Resuming w/o suspend: error in debug mode, ignore in build
   MOZ_ASSERT(mSuspendCount > 0);
   if (mSuspendCount <= 0) {
     return;
   }
 
   if (!--mSuspendCount) {
     RefPtr<nsRunnableMethod<ChannelEventQueue> > event =
       NS_NewRunnableMethod(this, &ChannelEventQueue::CompleteResume);
     if (mTargetThread) {
       mTargetThread->Dispatch(event, NS_DISPATCH_NORMAL);
     } else {
       MOZ_RELEASE_ASSERT(NS_IsMainThread());
-      NS_DispatchToCurrentThread(event);
+      NS_WARN_IF(NS_FAILED(NS_DispatchToCurrentThread(event)));
     }
   }
 }
 
 nsresult
 ChannelEventQueue::RetargetDeliveryTo(nsIEventTarget* aTargetThread)
 {
   MOZ_RELEASE_ASSERT(NS_IsMainThread());
--- a/netwerk/ipc/ChannelEventQueue.h
+++ b/netwerk/ipc/ChannelEventQueue.h
@@ -5,16 +5,17 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef mozilla_net_ChannelEventQueue_h
 #define mozilla_net_ChannelEventQueue_h
 
 #include "nsTArray.h"
 #include "nsAutoPtr.h"
+#include "mozilla/Mutex.h"
 #include "mozilla/UniquePtr.h"
 
 class nsISupports;
 class nsIEventTarget;
 
 namespace mozilla {
 namespace net {
 
@@ -38,38 +39,41 @@ class ChannelEventQueue final
   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(ChannelEventQueue)
 
  public:
   explicit ChannelEventQueue(nsISupports *owner)
     : mSuspendCount(0)
     , mSuspended(false)
     , mForced(false)
     , mFlushing(false)
-    , mOwner(owner) {}
-
-  // Checks to determine if an IPDL-generated channel event can be processed
-  // immediately, or needs to be queued using Enqueue().
-  inline bool ShouldEnqueue();
+    , mOwner(owner)
+    , mMutex("ChannelEventQueue::mMutex")
+  {}
 
   // Puts IPDL-generated channel event into queue, to be run later
   // automatically when EndForcedQueueing and/or Resume is called.
-  inline void Enqueue(ChannelEvent* callback);
+  //
+  // @param aCallback - the ChannelEvent
+  // @param aAssertWhenNotQueued - this optional param will be used in an
+  //   assertion when the event is executed directly.
+  inline void RunOrEnqueue(ChannelEvent* aCallback,
+                           bool aAssertWhenNotQueued = true);
   inline nsresult PrependEvents(nsTArray<UniquePtr<ChannelEvent>>& aEvents);
 
-  // After StartForcedQueueing is called, ShouldEnqueue() will return true and
-  // no events will be run/flushed until EndForcedQueueing is called.
+  // After StartForcedQueueing is called, RunOrEnqueue() will start enqueuing
+  // events that will be run/flushed when EndForcedQueueing is called.
   // - Note: queueing may still be required after EndForcedQueueing() (if the
-  //   queue is suspended, etc):  always call ShouldEnqueue() to determine
-  //   whether queueing is needed.
+  //   queue is suspended, etc):  always call RunOrEnqueue() to avoid race
+  //   conditions.
   inline void StartForcedQueueing();
   inline void EndForcedQueueing();
 
-  // Suspend/resume event queue.  ShouldEnqueue() will return true and no events
-  // will be run/flushed until resume is called.  These should be called when
-  // the channel owning the event queue is suspended/resumed.
+  // Suspend/resume event queue.  RunOrEnqueue() will start enqueuing
+  // events and they will be run/flushed when resume is called.  These should be
+  // called when the channel owning the event queue is suspended/resumed.
   inline void Suspend();
   // Resume flushes the queue asynchronously, i.e. items in queue will be
   // dispatched in a new event on the current thread.
   void Resume();
 
   // Retargets delivery of events to the target thread specified.
   nsresult RetargetDeliveryTo(nsIEventTarget* aTargetThread);
 
@@ -78,109 +82,144 @@ class ChannelEventQueue final
   ~ChannelEventQueue()
   {
   }
 
   inline void MaybeFlushQueue();
   void FlushQueue();
   inline void CompleteResume();
 
+  ChannelEvent* TakeEvent();
+
   nsTArray<UniquePtr<ChannelEvent>> mEventQueue;
 
   uint32_t mSuspendCount;
   bool     mSuspended;
   bool mForced;
   bool mFlushing;
 
   // Keep ptr to avoid refcount cycle: only grab ref during flushing.
   nsISupports *mOwner;
 
+  Mutex mMutex;
+
   // EventTarget for delivery of events to the correct thread.
   nsCOMPtr<nsIEventTarget> mTargetThread;
 
   friend class AutoEventEnqueuer;
 };
 
-inline bool
-ChannelEventQueue::ShouldEnqueue()
+inline void
+ChannelEventQueue::RunOrEnqueue(ChannelEvent* aCallback,
+                                bool aAssertWhenNotQueued)
 {
-  bool answer =  mForced || mSuspended || mFlushing;
+  MOZ_ASSERT(aCallback);
 
-  MOZ_ASSERT(answer == true || mEventQueue.IsEmpty(),
-             "Should always enqueue if ChannelEventQueue not empty");
+  {
+    MutexAutoLock lock(mMutex);
 
-  return answer;
-}
+    bool enqueue =  mForced || mSuspended || mFlushing;
+    MOZ_ASSERT(enqueue == true || mEventQueue.IsEmpty(),
+               "Should always enqueue if ChannelEventQueue not empty");
 
-inline void
-ChannelEventQueue::Enqueue(ChannelEvent* callback)
-{
-  mEventQueue.AppendElement(callback);
+    if (enqueue) {
+      mEventQueue.AppendElement(aCallback);
+      return;
+    }
+  }
+
+  MOZ_RELEASE_ASSERT(aAssertWhenNotQueued);
+  aCallback->Run();
 }
 
 inline void
 ChannelEventQueue::StartForcedQueueing()
 {
+  MutexAutoLock lock(mMutex);
   mForced = true;
 }
 
 inline void
 ChannelEventQueue::EndForcedQueueing()
 {
-  mForced = false;
+  {
+    MutexAutoLock lock(mMutex);
+    mForced = false;
+  }
+
   MaybeFlushQueue();
 }
 
 inline nsresult
 ChannelEventQueue::PrependEvents(nsTArray<UniquePtr<ChannelEvent>>& aEvents)
 {
+  MutexAutoLock lock(mMutex);
+
   UniquePtr<ChannelEvent>* newEvents =
     mEventQueue.InsertElementsAt(0, aEvents.Length());
   if (!newEvents) {
     return NS_ERROR_OUT_OF_MEMORY;
   }
 
   for (uint32_t i = 0; i < aEvents.Length(); i++) {
     newEvents[i] = Move(aEvents[i]);
   }
+
   return NS_OK;
 }
 
 inline void
 ChannelEventQueue::Suspend()
 {
+  MutexAutoLock lock(mMutex);
+
   mSuspended = true;
   mSuspendCount++;
 }
 
 inline void
 ChannelEventQueue::CompleteResume()
 {
-  // channel may have been suspended again since Resume fired event to call this.
-  if (!mSuspendCount) {
-    // we need to remain logically suspended (for purposes of queuing incoming
-    // messages) until this point, else new incoming messages could run before
-    // queued ones.
-    mSuspended = false;
-    MaybeFlushQueue();
+  {
+    MutexAutoLock lock(mMutex);
+
+    // channel may have been suspended again since Resume fired event to call
+    // this.
+    if (!mSuspendCount) {
+      // we need to remain logically suspended (for purposes of queuing incoming
+      // messages) until this point, else new incoming messages could run before
+      // queued ones.
+      mSuspended = false;
+    }
   }
+
+  MaybeFlushQueue();
 }
 
 inline void
 ChannelEventQueue::MaybeFlushQueue()
 {
   // Don't flush if forced queuing on, we're already being flushed, or
   // suspended, or there's nothing to flush
-  if (!mForced && !mFlushing && !mSuspended && !mEventQueue.IsEmpty())
+  bool flushQueue = false;
+
+  {
+    MutexAutoLock lock(mMutex);
+    flushQueue = !mForced && !mFlushing && !mSuspended &&
+                 !mEventQueue.IsEmpty();
+  }
+
+  if (flushQueue) {
     FlushQueue();
+  }
 }
 
-// Ensures that ShouldEnqueue() will be true during its lifetime (letting
-// caller know incoming IPDL msgs should be queued). Flushes the queue when it
-// goes out of scope.
+// Ensures that RunOrEnqueue() will be collecting events during its lifetime
+// (letting caller know incoming IPDL msgs should be queued). Flushes the queue
+// when it goes out of scope.
 class MOZ_STACK_CLASS AutoEventEnqueuer
 {
  public:
   explicit AutoEventEnqueuer(ChannelEventQueue *queue) : mEventQueue(queue) {
     mEventQueue->StartForcedQueueing();
   }
   ~AutoEventEnqueuer() {
     mEventQueue->EndForcedQueueing();
--- a/netwerk/protocol/ftp/FTPChannelChild.cpp
+++ b/netwerk/protocol/ftp/FTPChannelChild.cpp
@@ -279,24 +279,20 @@ FTPChannelChild::RecvOnStartRequest(cons
   // stage, as they are set in the listener's OnStartRequest.
   MOZ_RELEASE_ASSERT(!mFlushedForDiversion,
     "mFlushedForDiversion should be unset before OnStartRequest!");
   MOZ_RELEASE_ASSERT(!mDivertingToParent,
     "mDivertingToParent should be unset before OnStartRequest!");
 
   LOG(("FTPChannelChild::RecvOnStartRequest [this=%p]\n", this));
 
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new FTPStartRequestEvent(this, aChannelStatus,
-                                              aContentLength, aContentType,
-                                              aLastModified, aEntityID, aURI));
-  } else {
-    DoOnStartRequest(aChannelStatus, aContentLength, aContentType,
-                     aLastModified, aEntityID, aURI);
-  }
+  mEventQ->RunOrEnqueue(new FTPStartRequestEvent(this, aChannelStatus,
+                                                 aContentLength, aContentType,
+                                                 aLastModified, aEntityID,
+                                                 aURI));
   return true;
 }
 
 void
 FTPChannelChild::DoOnStartRequest(const nsresult& aChannelStatus,
                                   const int64_t& aContentLength,
                                   const nsCString& aContentType,
                                   const PRTime& aLastModified,
@@ -374,25 +370,20 @@ FTPChannelChild::RecvOnDataAvailable(con
                                      const uint64_t& offset,
                                      const uint32_t& count)
 {
   MOZ_RELEASE_ASSERT(!mFlushedForDiversion,
                      "Should not be receiving any more callbacks from parent!");
 
   LOG(("FTPChannelChild::RecvOnDataAvailable [this=%p]\n", this));
 
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(
-      new FTPDataAvailableEvent(this, channelStatus, data, offset, count));
-  } else {
-    MOZ_RELEASE_ASSERT(!mDivertingToParent,
-                       "ShouldEnqueue when diverting to parent!");
+  mEventQ->RunOrEnqueue(new FTPDataAvailableEvent(this, channelStatus, data,
+                                                  offset, count),
+                        !mDivertingToParent);
 
-    DoOnDataAvailable(channelStatus, data, offset, count);
-  }
   return true;
 }
 
 class MaybeDivertOnDataFTPEvent : public ChannelEvent
 {
  public:
   MaybeDivertOnDataFTPEvent(FTPChannelChild* child,
                             const nsCString& data,
@@ -499,21 +490,17 @@ bool
 FTPChannelChild::RecvOnStopRequest(const nsresult& aChannelStatus)
 {
   MOZ_RELEASE_ASSERT(!mFlushedForDiversion,
     "Should not be receiving any more callbacks from parent!");
 
   LOG(("FTPChannelChild::RecvOnStopRequest [this=%p status=%x]\n",
        this, aChannelStatus));
 
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new FTPStopRequestEvent(this, aChannelStatus));
-  } else {
-    DoOnStopRequest(aChannelStatus);
-  }
+  mEventQ->RunOrEnqueue(new FTPStopRequestEvent(this, aChannelStatus));
   return true;
 }
 
 class MaybeDivertOnStopFTPEvent : public ChannelEvent
 {
  public:
   MaybeDivertOnStopFTPEvent(FTPChannelChild* child,
                             const nsresult& aChannelStatus)
@@ -588,21 +575,17 @@ class FTPFailedAsyncOpenEvent : public C
   nsresult mStatus;
 };
 
 bool
 FTPChannelChild::RecvFailedAsyncOpen(const nsresult& statusCode)
 {
   LOG(("FTPChannelChild::RecvFailedAsyncOpen [this=%p status=%x]\n",
        this, statusCode));
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new FTPFailedAsyncOpenEvent(this, statusCode));
-  } else {
-    DoFailedAsyncOpen(statusCode);
-  }
+  mEventQ->RunOrEnqueue(new FTPFailedAsyncOpenEvent(this, statusCode));
   return true;
 }
 
 void
 FTPChannelChild::DoFailedAsyncOpen(const nsresult& statusCode)
 {
   LOG(("FTPChannelChild::DoFailedAsyncOpen [this=%p status=%x]\n",
        this, statusCode));
@@ -644,21 +627,17 @@ class FTPFlushedForDiversionEvent : publ
 };
 
 bool
 FTPChannelChild::RecvFlushedForDiversion()
 {
   LOG(("FTPChannelChild::RecvFlushedForDiversion [this=%p]\n", this));
   MOZ_ASSERT(mDivertingToParent);
 
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new FTPFlushedForDiversionEvent(this));
-  } else {
-    MOZ_CRASH();
-  }
+  mEventQ->RunOrEnqueue(new FTPFlushedForDiversionEvent(this));
   return true;
 }
 
 void
 FTPChannelChild::FlushedForDiversion()
 {
   LOG(("FTPChannelChild::FlushedForDiversion [this=%p]\n", this));
   MOZ_RELEASE_ASSERT(mDivertingToParent);
@@ -694,21 +673,17 @@ class FTPDeleteSelfEvent : public Channe
   void Run() { mChild->DoDeleteSelf(); }
  private:
   FTPChannelChild* mChild;
 };
 
 bool
 FTPChannelChild::RecvDeleteSelf()
 {
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new FTPDeleteSelfEvent(this));
-  } else {
-    DoDeleteSelf();
-  }
+  mEventQ->RunOrEnqueue(new FTPDeleteSelfEvent(this));
   return true;
 }
 
 void
 FTPChannelChild::DoDeleteSelf()
 {
   if (mIPCOpen)
     Send__delete__(this);
--- a/netwerk/protocol/ftp/FTPChannelParent.cpp
+++ b/netwerk/protocol/ftp/FTPChannelParent.cpp
@@ -285,23 +285,18 @@ FTPChannelParent::RecvDivertOnDataAvaila
     return false;
   }
 
   // Drop OnDataAvailables if the parent was canceled already.
   if (NS_FAILED(mStatus)) {
     return true;
   }
 
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new FTPDivertDataAvailableEvent(this, data, offset,
-                                                     count));
-    return true;
-  }
-
-  DivertOnDataAvailable(data, offset, count);
+  mEventQ->RunOrEnqueue(new FTPDivertDataAvailableEvent(this, data, offset,
+                                                        count));
   return true;
 }
 
 void
 FTPChannelParent::DivertOnDataAvailable(const nsCString& data,
                                         const uint64_t& offset,
                                         const uint32_t& count)
 {
@@ -367,22 +362,17 @@ FTPChannelParent::RecvDivertOnStopReques
 {
   if (NS_WARN_IF(!mDivertingFromChild)) {
     MOZ_ASSERT(mDivertingFromChild,
                "Cannot RecvDivertOnStopRequest if diverting is not set!");
     FailDiversion(NS_ERROR_UNEXPECTED);
     return false;
   }
 
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new FTPDivertStopRequestEvent(this, statusCode));
-    return true;
-  }
-
-  DivertOnStopRequest(statusCode);
+  mEventQ->RunOrEnqueue(new FTPDivertStopRequestEvent(this, statusCode));
   return true;
 }
 
 void
 FTPChannelParent::DivertOnStopRequest(const nsresult& statusCode)
 {
   LOG(("FTPChannelParent::DivertOnStopRequest [this=%p]\n", this));
 
@@ -429,22 +419,17 @@ FTPChannelParent::RecvDivertComplete()
 {
   if (NS_WARN_IF(!mDivertingFromChild)) {
     MOZ_ASSERT(mDivertingFromChild,
                "Cannot RecvDivertComplete if diverting is not set!");
     FailDiversion(NS_ERROR_UNEXPECTED);
     return false;
   }
 
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new FTPDivertCompleteEvent(this));
-    return true;
-  }
-
-  DivertComplete();
+  mEventQ->RunOrEnqueue(new FTPDivertCompleteEvent(this));
   return true;
 }
 
 void
 FTPChannelParent::DivertComplete()
 {
   LOG(("FTPChannelParent::DivertComplete [this=%p]\n", this));
 
--- a/netwerk/protocol/http/HttpChannelChild.cpp
+++ b/netwerk/protocol/http/HttpChannelChild.cpp
@@ -275,21 +275,18 @@ class AssociateApplicationCacheEvent : p
     nsCString clientID;
 };
 
 bool
 HttpChannelChild::RecvAssociateApplicationCache(const nsCString &groupID,
                                                 const nsCString &clientID)
 {
   LOG(("HttpChannelChild::RecvAssociateApplicationCache [this=%p]\n", this));
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new AssociateApplicationCacheEvent(this, groupID, clientID));
-  } else {
-    AssociateApplicationCache(groupID, clientID);
-  }
+  mEventQ->RunOrEnqueue(new AssociateApplicationCacheEvent(this, groupID,
+                                                           clientID));
   return true;
 }
 
 void
 HttpChannelChild::AssociateApplicationCache(const nsCString &groupID,
                                             const nsCString &clientID)
 {
   LOG(("HttpChannelChild::AssociateApplicationCache [this=%p]\n", this));
@@ -379,29 +376,23 @@ HttpChannelChild::RecvOnStartRequest(con
   MOZ_RELEASE_ASSERT(!mFlushedForDiversion,
     "mFlushedForDiversion should be unset before OnStartRequest!");
   MOZ_RELEASE_ASSERT(!mDivertingToParent,
     "mDivertingToParent should be unset before OnStartRequest!");
 
 
   mRedirectCount = redirectCount;
 
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new StartRequestEvent(this, channelStatus, responseHead,
-                                           useResponseHead, requestHeaders,
-                                           isFromCache, cacheEntryAvailable,
-                                           cacheExpirationTime, cachedCharset,
-                                           securityInfoSerialization, selfAddr,
-                                           peerAddr, cacheKey));
-  } else {
-    OnStartRequest(channelStatus, responseHead, useResponseHead, requestHeaders,
-                   isFromCache, cacheEntryAvailable, cacheExpirationTime,
-                   cachedCharset, securityInfoSerialization, selfAddr,
-                   peerAddr, cacheKey);
-  }
+  mEventQ->RunOrEnqueue(new StartRequestEvent(this, channelStatus, responseHead,
+                                              useResponseHead, requestHeaders,
+                                              isFromCache, cacheEntryAvailable,
+                                              cacheExpirationTime,
+                                              cachedCharset,
+                                              securityInfoSerialization,
+                                              selfAddr, peerAddr, cacheKey));
   return true;
 }
 
 void
 HttpChannelChild::OnStartRequest(const nsresult& channelStatus,
                                  const nsHttpResponseHead& responseHead,
                                  const bool& useResponseHead,
                                  const nsHttpHeaderArray& requestHeaders,
@@ -611,28 +602,21 @@ HttpChannelChild::RecvOnTransportAndData
                                          const nsCString& data,
                                          const uint64_t& offset,
                                          const uint32_t& count)
 {
   LOG(("HttpChannelChild::RecvOnTransportAndData [this=%p]\n", this));
   MOZ_RELEASE_ASSERT(!mFlushedForDiversion,
                      "Should not be receiving any more callbacks from parent!");
 
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new TransportAndDataEvent(this, channelStatus,
-                                               transportStatus, progress,
-                                               progressMax, data, offset,
-                                               count));
-  } else {
-    MOZ_RELEASE_ASSERT(!mDivertingToParent,
-                       "ShouldEnqueue when diverting to parent!");
-
-    OnTransportAndData(channelStatus, transportStatus, progress, progressMax,
-                       data, offset, count);
-  }
+  mEventQ->RunOrEnqueue(new TransportAndDataEvent(this, channelStatus,
+                                                  transportStatus, progress,
+                                                  progressMax, data, offset,
+                                                  count),
+                        !mDivertingToParent);
   return true;
 }
 
 class MaybeDivertOnDataHttpEvent : public ChannelEvent
 {
  public:
   MaybeDivertOnDataHttpEvent(HttpChannelChild* child,
                              const nsCString& data,
@@ -821,23 +805,17 @@ class StopRequestEvent : public ChannelE
 bool
 HttpChannelChild::RecvOnStopRequest(const nsresult& channelStatus,
                                     const ResourceTimingStruct& timing)
 {
   LOG(("HttpChannelChild::RecvOnStopRequest [this=%p]\n", this));
   MOZ_RELEASE_ASSERT(!mFlushedForDiversion,
     "Should not be receiving any more callbacks from parent!");
 
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new StopRequestEvent(this, channelStatus, timing));
-  } else {
-    MOZ_ASSERT(!mDivertingToParent, "ShouldEnqueue when diverting to parent!");
-
-    OnStopRequest(channelStatus, timing);
-  }
+  mEventQ->RunOrEnqueue(new StopRequestEvent(this, channelStatus, timing));
   return true;
 }
 
 class MaybeDivertOnStopHttpEvent : public ChannelEvent
 {
  public:
   MaybeDivertOnStopHttpEvent(HttpChannelChild* child,
                              const nsresult& channelStatus)
@@ -987,21 +965,17 @@ class ProgressEvent : public ChannelEven
   HttpChannelChild* mChild;
   int64_t mProgress, mProgressMax;
 };
 
 bool
 HttpChannelChild::RecvOnProgress(const int64_t& progress,
                                  const int64_t& progressMax)
 {
-  if (mEventQ->ShouldEnqueue())  {
-    mEventQ->Enqueue(new ProgressEvent(this, progress, progressMax));
-  } else {
-    OnProgress(progress, progressMax);
-  }
+  mEventQ->RunOrEnqueue(new ProgressEvent(this, progress, progressMax));
   return true;
 }
 
 void
 HttpChannelChild::OnProgress(const int64_t& progress,
                              const int64_t& progressMax)
 {
   LOG(("HttpChannelChild::OnProgress [this=%p progress=%lld/%lld]\n",
@@ -1040,21 +1014,17 @@ class StatusEvent : public ChannelEvent
  private:
   HttpChannelChild* mChild;
   nsresult mStatus;
 };
 
 bool
 HttpChannelChild::RecvOnStatus(const nsresult& status)
 {
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new StatusEvent(this, status));
-  } else {
-    OnStatus(status);
-  }
+  mEventQ->RunOrEnqueue(new StatusEvent(this, status));
   return true;
 }
 
 void
 HttpChannelChild::OnStatus(const nsresult& status)
 {
   LOG(("HttpChannelChild::OnStatus [this=%p status=%x]\n", this, status));
 
@@ -1091,21 +1061,17 @@ class FailedAsyncOpenEvent : public Chan
   HttpChannelChild* mChild;
   nsresult mStatus;
 };
 
 bool
 HttpChannelChild::RecvFailedAsyncOpen(const nsresult& status)
 {
   LOG(("HttpChannelChild::RecvFailedAsyncOpen [this=%p]\n", this));
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new FailedAsyncOpenEvent(this, status));
-  } else {
-    FailedAsyncOpen(status);
-  }
+  mEventQ->RunOrEnqueue(new FailedAsyncOpenEvent(this, status));
   return true;
 }
 
 // We need to have an implementation of this function just so that we can keep
 // all references to mCallOnResume of type HttpChannelChild:  it's not OK in C++
 // to set a member function ptr to a base class function.
 void
 HttpChannelChild::HandleAsyncAbort()
@@ -1144,21 +1110,17 @@ class DeleteSelfEvent : public ChannelEv
  private:
   HttpChannelChild* mChild;
 };
 
 bool
 HttpChannelChild::RecvDeleteSelf()
 {
   LOG(("HttpChannelChild::RecvDeleteSelf [this=%p]\n", this));
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new DeleteSelfEvent(this));
-  } else {
-    DeleteSelf();
-  }
+  mEventQ->RunOrEnqueue(new DeleteSelfEvent(this));
   return true;
 }
 
 void
 HttpChannelChild::DeleteSelf()
 {
   Send__delete__(this);
 }
@@ -1205,24 +1167,19 @@ bool
 HttpChannelChild::RecvRedirect1Begin(const uint32_t& newChannelId,
                                      const URIParams& newUri,
                                      const uint32_t& redirectFlags,
                                      const nsHttpResponseHead& responseHead,
                                      const nsCString& securityInfoSerialization)
 {
   // TODO: handle security info
   LOG(("HttpChannelChild::RecvRedirect1Begin [this=%p]\n", this));
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new Redirect1Event(this, newChannelId, newUri,
-                                       redirectFlags, responseHead,
-                                       securityInfoSerialization));
-  } else {
-    Redirect1Begin(newChannelId, newUri, redirectFlags, responseHead,
-                   securityInfoSerialization);
-  }
+  mEventQ->RunOrEnqueue(new Redirect1Event(this, newChannelId, newUri,
+                                           redirectFlags, responseHead,
+                                           securityInfoSerialization));
   return true;
 }
 
 nsresult
 HttpChannelChild::SetupRedirect(nsIURI* uri,
                                 const nsHttpResponseHead* responseHead,
                                 const uint32_t& redirectFlags,
                                 nsIChannel** outChannel)
@@ -1350,21 +1307,17 @@ class Redirect3Event : public ChannelEve
  private:
   HttpChannelChild* mChild;
 };
 
 bool
 HttpChannelChild::RecvRedirect3Complete()
 {
   LOG(("HttpChannelChild::RecvRedirect3Complete [this=%p]\n", this));
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new Redirect3Event(this));
-  } else {
-    Redirect3Complete();
-  }
+  mEventQ->RunOrEnqueue(new Redirect3Event(this));
   return true;
 }
 
 class HttpFlushedForDiversionEvent : public ChannelEvent
 {
  public:
   explicit HttpFlushedForDiversionEvent(HttpChannelChild* aChild)
   : mChild(aChild)
@@ -1380,19 +1333,18 @@ class HttpFlushedForDiversionEvent : pub
   HttpChannelChild* mChild;
 };
 
 bool
 HttpChannelChild::RecvFlushedForDiversion()
 {
   LOG(("HttpChannelChild::RecvFlushedForDiversion [this=%p]\n", this));
   MOZ_RELEASE_ASSERT(mDivertingToParent);
-  MOZ_RELEASE_ASSERT(mEventQ->ShouldEnqueue());
-
-  mEventQ->Enqueue(new HttpFlushedForDiversionEvent(this));
+
+  mEventQ->RunOrEnqueue(new HttpFlushedForDiversionEvent(this));
 
   return true;
 }
 
 bool
 HttpChannelChild::RecvNotifyTrackingProtectionDisabled()
 {
   nsChannelClassifier::NotifyTrackingProtectionDisabled(this);
--- a/netwerk/protocol/http/HttpChannelParent.cpp
+++ b/netwerk/protocol/http/HttpChannelParent.cpp
@@ -756,22 +756,18 @@ HttpChannelParent::RecvDivertOnDataAvail
     return false;
   }
 
   // Drop OnDataAvailables if the parent was canceled already.
   if (NS_FAILED(mStatus)) {
     return true;
   }
 
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new DivertDataAvailableEvent(this, data, offset, count));
-    return true;
-  }
-
-  DivertOnDataAvailable(data, offset, count);
+  mEventQ->RunOrEnqueue(new DivertDataAvailableEvent(this, data, offset,
+                                                     count));
   return true;
 }
 
 void
 HttpChannelParent::DivertOnDataAvailable(const nsCString& data,
                                          const uint64_t& offset,
                                          const uint32_t& count)
 {
@@ -841,22 +837,17 @@ HttpChannelParent::RecvDivertOnStopReque
   MOZ_ASSERT(mParentListener);
   if (NS_WARN_IF(!mDivertingFromChild)) {
     MOZ_ASSERT(mDivertingFromChild,
                "Cannot RecvDivertOnStopRequest if diverting is not set!");
     FailDiversion(NS_ERROR_UNEXPECTED);
     return false;
   }
 
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new DivertStopRequestEvent(this, statusCode));
-    return true;
-  }
-
-  DivertOnStopRequest(statusCode);
+  mEventQ->RunOrEnqueue(new DivertStopRequestEvent(this, statusCode));
   return true;
 }
 
 void
 HttpChannelParent::DivertOnStopRequest(const nsresult& statusCode)
 {
   LOG(("HttpChannelParent::DivertOnStopRequest [this=%p]\n", this));
 
@@ -904,22 +895,17 @@ HttpChannelParent::RecvDivertComplete()
   MOZ_ASSERT(mParentListener);
   if (NS_WARN_IF(!mDivertingFromChild)) {
     MOZ_ASSERT(mDivertingFromChild,
                "Cannot RecvDivertComplete if diverting is not set!");
     FailDiversion(NS_ERROR_UNEXPECTED);
     return false;
   }
 
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new DivertCompleteEvent(this));
-    return true;
-  }
-
-  DivertComplete();
+  mEventQ->RunOrEnqueue(new DivertCompleteEvent(this));
   return true;
 }
 
 void
 HttpChannelParent::DivertComplete()
 {
   LOG(("HttpChannelParent::DivertComplete [this=%p]\n", this));
 
--- a/netwerk/protocol/websocket/WebSocketChannelChild.cpp
+++ b/netwerk/protocol/websocket/WebSocketChannelChild.cpp
@@ -209,27 +209,21 @@ class StartEvent : public ChannelEvent
 };
 
 bool
 WebSocketChannelChild::RecvOnStart(const nsCString& aProtocol,
                                    const nsCString& aExtensions,
                                    const nsString& aEffectiveURL,
                                    const bool& aEncrypted)
 {
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new EventTargetDispatcher(
-                       new StartEvent(this, aProtocol, aExtensions,
-                                      aEffectiveURL, aEncrypted),
-                       mTargetThread));
-  } else if (mTargetThread) {
-    DispatchToTargetThread(new StartEvent(this, aProtocol, aExtensions,
-                                          aEffectiveURL, aEncrypted));
-  } else {
-    OnStart(aProtocol, aExtensions, aEffectiveURL, aEncrypted);
-  }
+  mEventQ->RunOrEnqueue(
+    new EventTargetDispatcher(new StartEvent(this, aProtocol, aExtensions,
+                                             aEffectiveURL, aEncrypted),
+                              mTargetThread));
+
   return true;
 }
 
 void
 WebSocketChannelChild::OnStart(const nsCString& aProtocol,
                                const nsCString& aExtensions,
                                const nsString& aEffectiveURL,
                                const bool& aEncrypted)
@@ -262,24 +256,20 @@ class StopEvent : public ChannelEvent
  private:
   RefPtr<WebSocketChannelChild> mChild;
   nsresult mStatusCode;
 };
 
 bool
 WebSocketChannelChild::RecvOnStop(const nsresult& aStatusCode)
 {
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new EventTargetDispatcher(
-                       new StopEvent(this, aStatusCode), mTargetThread));
-  } else if (mTargetThread) {
-    DispatchToTargetThread(new StopEvent(this, aStatusCode));
-  } else {
-    OnStop(aStatusCode);
-  }
+  mEventQ->RunOrEnqueue(
+    new EventTargetDispatcher(new StopEvent(this, aStatusCode),
+                              mTargetThread));
+
   return true;
 }
 
 void
 WebSocketChannelChild::OnStop(const nsresult& aStatusCode)
 {
   LOG(("WebSocketChannelChild::RecvOnStop() %p\n", this));
   if (mListenerMT) {
@@ -311,48 +301,40 @@ class MessageEvent : public ChannelEvent
   RefPtr<WebSocketChannelChild> mChild;
   nsCString mMessage;
   bool mBinary;
 };
 
 bool
 WebSocketChannelChild::RecvOnMessageAvailable(const nsCString& aMsg)
 {
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new EventTargetDispatcher(
-                       new MessageEvent(this, aMsg, false), mTargetThread));
-  } else if (mTargetThread) {
-    DispatchToTargetThread(new MessageEvent(this, aMsg, false));
-   } else {
-    OnMessageAvailable(aMsg);
-  }
+  mEventQ->RunOrEnqueue(
+    new EventTargetDispatcher(new MessageEvent(this, aMsg, false),
+                              mTargetThread));
+
   return true;
 }
 
 void
 WebSocketChannelChild::OnMessageAvailable(const nsCString& aMsg)
 {
   LOG(("WebSocketChannelChild::RecvOnMessageAvailable() %p\n", this));
   if (mListenerMT) {
     AutoEventEnqueuer ensureSerialDispatch(mEventQ);
     mListenerMT->mListener->OnMessageAvailable(mListenerMT->mContext, aMsg);
   }
 }
 
 bool
 WebSocketChannelChild::RecvOnBinaryMessageAvailable(const nsCString& aMsg)
 {
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new EventTargetDispatcher(
-                       new MessageEvent(this, aMsg, true), mTargetThread));
-  } else if (mTargetThread) {
-    DispatchToTargetThread(new MessageEvent(this, aMsg, true));
-  } else {
-    OnBinaryMessageAvailable(aMsg);
-  }
+  mEventQ->RunOrEnqueue(
+    new EventTargetDispatcher(new MessageEvent(this, aMsg, true),
+                              mTargetThread));
+
   return true;
 }
 
 void
 WebSocketChannelChild::OnBinaryMessageAvailable(const nsCString& aMsg)
 {
   LOG(("WebSocketChannelChild::RecvOnBinaryMessageAvailable() %p\n", this));
   if (mListenerMT) {
@@ -378,24 +360,20 @@ class AcknowledgeEvent : public ChannelE
  private:
   RefPtr<WebSocketChannelChild> mChild;
   uint32_t mSize;
 };
 
 bool
 WebSocketChannelChild::RecvOnAcknowledge(const uint32_t& aSize)
 {
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new EventTargetDispatcher(
-                       new AcknowledgeEvent(this, aSize), mTargetThread));
-  } else if (mTargetThread) {
-    DispatchToTargetThread(new AcknowledgeEvent(this, aSize));
-  } else {
-    OnAcknowledge(aSize);
-  }
+  mEventQ->RunOrEnqueue(
+    new EventTargetDispatcher(new AcknowledgeEvent(this, aSize),
+                              mTargetThread));
+
   return true;
 }
 
 void
 WebSocketChannelChild::OnAcknowledge(const uint32_t& aSize)
 {
   LOG(("WebSocketChannelChild::RecvOnAcknowledge() %p\n", this));
   if (mListenerMT) {
@@ -424,25 +402,20 @@ class ServerCloseEvent : public ChannelE
   uint16_t mCode;
   nsCString mReason;
 };
 
 bool
 WebSocketChannelChild::RecvOnServerClose(const uint16_t& aCode,
                                          const nsCString& aReason)
 {
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new EventTargetDispatcher(
-                       new ServerCloseEvent(this, aCode, aReason),
-                       mTargetThread));
-  } else if (mTargetThread) {
-    DispatchToTargetThread(new ServerCloseEvent(this, aCode, aReason));
-  } else {
-    OnServerClose(aCode, aReason);
-  }
+  mEventQ->RunOrEnqueue(
+    new EventTargetDispatcher(new ServerCloseEvent(this, aCode, aReason),
+                              mTargetThread));
+
   return true;
 }
 
 void
 WebSocketChannelChild::OnServerClose(const uint16_t& aCode,
                                      const nsCString& aReason)
 {
   LOG(("WebSocketChannelChild::RecvOnServerClose() %p\n", this));
--- a/netwerk/protocol/wyciwyg/WyciwygChannelChild.cpp
+++ b/netwerk/protocol/wyciwyg/WyciwygChannelChild.cpp
@@ -148,23 +148,19 @@ private:
 
 bool
 WyciwygChannelChild::RecvOnStartRequest(const nsresult& statusCode,
                                         const int64_t& contentLength,
                                         const int32_t& source,
                                         const nsCString& charset,
                                         const nsCString& securityInfo)
 {
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new WyciwygStartRequestEvent(this, statusCode,
-                                                 contentLength, source,
-                                                 charset, securityInfo));
-  } else {
-    OnStartRequest(statusCode, contentLength, source, charset, securityInfo);
-  }
+  mEventQ->RunOrEnqueue(new WyciwygStartRequestEvent(this, statusCode,
+                                                     contentLength, source,
+                                                     charset, securityInfo));
   return true;
 }
 
 void
 WyciwygChannelChild::OnStartRequest(const nsresult& statusCode,
                                     const int64_t& contentLength,
                                     const int32_t& source,
                                     const nsCString& charset,
@@ -203,21 +199,17 @@ private:
   nsCString mData;
   uint64_t mOffset;
 };
 
 bool
 WyciwygChannelChild::RecvOnDataAvailable(const nsCString& data,
                                          const uint64_t& offset)
 {
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new WyciwygDataAvailableEvent(this, data, offset));
-  } else {
-    OnDataAvailable(data, offset);
-  }
+  mEventQ->RunOrEnqueue(new WyciwygDataAvailableEvent(this, data, offset));
   return true;
 }
 
 void
 WyciwygChannelChild::OnDataAvailable(const nsCString& data,
                                      const uint64_t& offset)
 {
   LOG(("WyciwygChannelChild::RecvOnDataAvailable [this=%p]\n", this));
@@ -265,21 +257,17 @@ public:
 private:
   WyciwygChannelChild* mChild;
   nsresult mStatusCode;
 };
 
 bool
 WyciwygChannelChild::RecvOnStopRequest(const nsresult& statusCode)
 {
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new WyciwygStopRequestEvent(this, statusCode));
-  } else {
-    OnStopRequest(statusCode);
-  }
+  mEventQ->RunOrEnqueue(new WyciwygStopRequestEvent(this, statusCode));
   return true;
 }
 
 void
 WyciwygChannelChild::OnStopRequest(const nsresult& statusCode)
 {
   LOG(("WyciwygChannelChild::RecvOnStopRequest [this=%p status=%u]\n",
            this, statusCode));
@@ -322,21 +310,17 @@ class WyciwygCancelEvent : public Channe
  private:
   WyciwygChannelChild* mChild;
   nsresult mStatus;
 };
 
 bool
 WyciwygChannelChild::RecvCancelEarly(const nsresult& statusCode)
 {
-  if (mEventQ->ShouldEnqueue()) {
-    mEventQ->Enqueue(new WyciwygCancelEvent(this, statusCode));
-  } else {
-    CancelEarly(statusCode);
-  }
+  mEventQ->RunOrEnqueue(new WyciwygCancelEvent(this, statusCode));
   return true;
 }
 
 void WyciwygChannelChild::CancelEarly(const nsresult& statusCode)
 {
   LOG(("WyciwygChannelChild::CancelEarly [this=%p]\n", this));
   
   if (mCanceled)