Bug 1529612 - Part 4: Work around issue where receiver would get message before sender saw its bufferedAmount drop. r=mjf
authorByron Campen [:bwc] <docfaraday@gmail.com>
Thu, 09 May 2019 16:12:35 +0000
changeset 532092 074678d618bbbd75055a45de59c0d810bd80f8b0
parent 532091 5ea4e8902e7ebea07211b37e4cb50b3e15cd5fc4
child 532093 c75e0b9c3a1cbe4956e3cfd384ab0321a0d37774
push id11265
push userffxbld-merge
push dateMon, 13 May 2019 10:53:39 +0000
treeherdermozilla-beta@77e0fe8dbdd3 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersmjf
bugs1529612
milestone68.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1529612 - Part 4: Work around issue where receiver would get message before sender saw its bufferedAmount drop. r=mjf Differential Revision: https://phabricator.services.mozilla.com/D30389
netwerk/sctp/datachannel/DataChannel.cpp
netwerk/sctp/datachannel/DataChannel.h
--- a/netwerk/sctp/datachannel/DataChannel.cpp
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -294,17 +294,18 @@ DataChannelConnection::DataChannelConnec
                                              MediaTransportHandler* aHandler)
     : NeckoTargetHolder(aTarget),
       mLock("netwerk::sctp::DataChannelConnection"),
       mSendInterleaved(false),
       mPpidFragmentation(false),
       mMaxMessageSizeSet(false),
       mMaxMessageSize(0),
       mAllocateEven(false),
-      mTransportHandler(aHandler) {
+      mTransportHandler(aHandler),
+      mDeferSend(false) {
   mCurrentStream = 0;
   mState = CLOSED;
   mSocket = nullptr;
   mMasterSocket = nullptr;
   mListener = listener;
   mLocalPort = 0;
   mRemotePort = 0;
   mPendingType = PENDING_NONE;
@@ -839,21 +840,26 @@ void DataChannelConnection::SctpDtlsInpu
       usrsctp_freedumpbuffer(buf);
     }
   }
   // Pass the data to SCTP
   MutexAutoLock lock(mLock);
   usrsctp_conninput(static_cast<void*>(this), packet.data(), packet.len(), 0);
 }
 
-void DataChannelConnection::SendPacket(nsAutoPtr<MediaPacket> packet) {
-  // LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
-  if (!mTransportId.empty() && mTransportHandler) {
-    mTransportHandler->SendPacket(mTransportId, std::move(*packet));
-  }
+void DataChannelConnection::SendPacket(std::unique_ptr<MediaPacket>&& packet) {
+  mSTS->Dispatch(NS_NewRunnableFunction(
+      "DataChannelConnection::SendPacket",
+      [this, self = RefPtr<DataChannelConnection>(this),
+       packet = std::move(packet)]() mutable {
+        // LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
+        if (!mTransportId.empty() && mTransportHandler) {
+          mTransportHandler->SendPacket(mTransportId, std::move(*packet));
+        }
+      }));
 }
 
 /* static */
 int DataChannelConnection::SctpDtlsOutput(void* addr, void* buffer,
                                           size_t length, uint8_t tos,
                                           uint8_t set_df) {
   DataChannelConnection* peer = static_cast<DataChannelConnection*>(addr);
   MOZ_DIAGNOSTIC_ASSERT(!peer->mShutdown);
@@ -868,27 +874,26 @@ int DataChannelConnection::SctpDtlsOutpu
     }
   }
 
   // We're async proxying even if on the STSThread because this is called
   // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
   // SCTP has an option for Apple, on IP connections only, to release at least
   // one of the locks before calling a packet output routine; with changes to
   // the underlying SCTP stack this might remove the need to use an async proxy.
-  nsAutoPtr<MediaPacket> packet(new MediaPacket);
+  std::unique_ptr<MediaPacket> packet(new MediaPacket);
   packet->SetType(MediaPacket::SCTP);
   packet->Copy(static_cast<const uint8_t*>(buffer), length);
 
-  // XXX It might be worthwhile to add an assertion against the thread
-  // somehow getting into the DataChannel/SCTP code again, as
-  // DISPATCH_SYNC is not fully blocking.  This may be tricky, as it
-  // needs to be a per-thread check, not a global.
-  peer->mSTS->Dispatch(WrapRunnable(RefPtr<DataChannelConnection>(peer),
-                                    &DataChannelConnection::SendPacket, packet),
-                       NS_DISPATCH_NORMAL);
+  if (NS_IsMainThread() && peer->mDeferSend) {
+    peer->mDeferredSend.emplace_back(std::move(packet));
+    return 0;
+  }
+
+  peer->SendPacket(std::move(packet));
   return 0;  // cheat!  Packets can always be dropped later anyways
 }
 #endif
 
 #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
 // listen for incoming associations
 // Blocks! - Don't call this from main thread!
 
@@ -1267,21 +1272,29 @@ bool DataChannelConnection::SendDeferred
 
     // Send buffered data messages
     // Warning: This will fail in case ndata is inactive and a previously
     //          deallocated data channel has not been closed properly. If you
     //          ever see that no messages can be sent on any channel, this is
     //          likely the cause (an explicit EOR message partially sent whose
     //          remaining chunks are still being waited for).
     size_t written = 0;
+    mDeferSend = true;
     blocked = SendBufferedMessages(channel->mBufferedData, &written);
+    mDeferSend = false;
     if (written) {
       channel->DecrementBufferedAmount(written);
     }
 
+    for (auto&& packet : mDeferredSend) {
+      MOZ_ASSERT(written);
+      SendPacket(std::move(packet));
+    }
+    mDeferredSend.clear();
+
     // Update current stream index
     // Note: If ndata is not active, the outstanding data messages on this
     //       stream need to be sent first before other streams can be used for
     //       sending.
     if (mSendInterleaved || !blocked) {
       i = UpdateCurrentStreamIndex();
     }
   } while (!blocked && i != end);
@@ -2572,16 +2585,17 @@ int DataChannelConnection::SendMsgIntern
 
     // Send (or try at least)
     // SCTP will return EMSGSIZE if the message is bigger than the buffer
     // size (or EAGAIN if there isn't space). However, we can avoid EMSGSIZE
     // by carefully crafting small enough message chunks.
     ssize_t written = usrsctp_sendv(
         mSocket, msg.GetData(), length, nullptr, 0, (void*)&msg.GetInfo(),
         (socklen_t)sizeof(struct sctp_sendv_spa), SCTP_SENDV_SPA, 0);
+
     if (written < 0) {
       error = errno;
       goto out;
     }
 
     if (aWritten) {
       *aWritten += written;
     }
@@ -2721,22 +2735,30 @@ int DataChannelConnection::SendDataMsgIn
     info.sendv_flags |= SCTP_SEND_PRINFO_VALID;
   }
 
   // Create message instance and send
   OutgoingMsg msg(info, data, len);
   MutexAutoLock lock(mLock);
   bool buffered;
   size_t written = 0;
+  mDeferSend = true;
   int error =
       SendMsgInternalOrBuffer(channel.mBufferedData, msg, buffered, &written);
+  mDeferSend = false;
   if (written) {
     channel.DecrementBufferedAmount(written);
   }
 
+  for (auto&& packet : mDeferredSend) {
+    MOZ_ASSERT(written);
+    SendPacket(std::move(packet));
+  }
+  mDeferredSend.clear();
+
   // Set pending type and stream index (if buffered)
   if (!error && buffered && !mPendingType) {
     mPendingType = PENDING_DATA;
     mCurrentStream = channel.mStream;
   }
   return error;
 }
 
--- a/netwerk/sctp/datachannel/DataChannel.h
+++ b/netwerk/sctp/datachannel/DataChannel.h
@@ -6,17 +6,19 @@
 
 #ifndef NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_
 #define NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_
 
 #ifdef MOZ_WEBRTC_SIGNALING
 #  define SCTP_DTLS_SUPPORTED 1
 #endif
 
+#include <memory>
 #include <string>
+#include <vector>
 #include <errno.h>
 #include "nsISupports.h"
 #include "nsCOMPtr.h"
 #include "mozilla/WeakPtr.h"
 #include "nsString.h"
 #include "nsThreadUtils.h"
 #include "nsTArray.h"
 #include "nsDeque.h"
@@ -224,17 +226,17 @@ class DataChannelConnection final : publ
   // Use from main thread only as WeakPtr is not threadsafe
   WeakPtr<DataConnectionListener> mListener;
 
  private:
   friend class DataChannelConnectRunnable;
 
 #ifdef SCTP_DTLS_SUPPORTED
   static void DTLSConnectThread(void* data);
-  void SendPacket(nsAutoPtr<MediaPacket> packet);
+  void SendPacket(std::unique_ptr<MediaPacket>&& packet);
   void SctpDtlsInput(const std::string& aTransportId, MediaPacket& packet);
   static int SctpDtlsOutput(void* addr, void* buffer, size_t length,
                             uint8_t tos, uint8_t set_df);
 #endif
   DataChannel* FindChannelByStream(uint16_t stream);
   uint16_t FindFreeStream();
   bool RequestMoreStreams(int32_t aNeeded = 16);
   uint32_t UpdateCurrentStreamIndex();
@@ -334,16 +336,21 @@ class DataChannelConnection final : publ
 #endif
   uint16_t mLocalPort;  // Accessed from connect thread
   uint16_t mRemotePort;
 
   nsCOMPtr<nsIThread> mInternalIOThread;
   uint8_t mPendingType;
   nsCString mRecvBuffer;
 
+  // Workaround to prevent a message from being received on main before the
+  // sender sees the decrease in bufferedAmount.
+  bool mDeferSend;
+  std::vector<std::unique_ptr<MediaPacket>> mDeferredSend;
+
 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
   bool mShutdown;
 #endif
 };
 
 #define ENSURE_DATACONNECTION \
   do {                        \
     MOZ_ASSERT(mConnection);  \