Bug 1529612 - Part 2: Keep/update bufferedAmount on main only. Some simplifications. r=mjf
authorByron Campen [:bwc] <docfaraday@gmail.com>
Thu, 09 May 2019 15:39:05 +0000
changeset 532090 2375b78d0f7a16a27cdec42b8475e806a0f9d5eb
parent 532089 138bf6e11c05b5fd56b80ab8ed34d0ee87deebb3
child 532091 5ea4e8902e7ebea07211b37e4cb50b3e15cd5fc4
push id11265
push userffxbld-merge
push dateMon, 13 May 2019 10:53:39 +0000
treeherdermozilla-beta@77e0fe8dbdd3 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersmjf
bugs1529612
milestone68.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1529612 - Part 2: Keep/update bufferedAmount on main only. Some simplifications. r=mjf Differential Revision: https://phabricator.services.mozilla.com/D28526
netwerk/sctp/datachannel/DataChannel.cpp
netwerk/sctp/datachannel/DataChannel.h
--- a/netwerk/sctp/datachannel/DataChannel.cpp
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -1140,17 +1140,17 @@ int DataChannelConnection::SendControlMe
   // Note: Main-thread IO, but doesn't block
 #if (UINT32_MAX > SIZE_MAX)
   if (len > SIZE_MAX) {
     return EMSGSIZE;
   }
 #endif
   OutgoingMsg msg(info, data, (size_t)len);
   bool buffered;
-  int error = SendMsgInternalOrBuffer(mBufferedControl, msg, buffered);
+  int error = SendMsgInternalOrBuffer(mBufferedControl, msg, buffered, nullptr);
 
   // Set pending type (if buffered)
   if (!error && buffered && !mPendingType) {
     mPendingType = PENDING_DCEP;
   }
   return error;
 }
 
@@ -1235,17 +1235,17 @@ bool DataChannelConnection::SendDeferred
   }
 
   // Send pending control messages
   // Note: If ndata is not active, check if DCEP messages are currently
   // outstanding. These need to
   //       be sent first before other streams can be used for sending.
   if (!mBufferedControl.IsEmpty() &&
       (mSendInterleaved || mPendingType == PENDING_DCEP)) {
-    if (SendBufferedMessages(mBufferedControl)) {
+    if (SendBufferedMessages(mBufferedControl, nullptr)) {
       return true;
     }
 
     // Note: There may or may not be pending data messages
     mPendingType = PENDING_DATA;
   }
 
   bool blocked = false;
@@ -1260,44 +1260,26 @@ bool DataChannelConnection::SendDeferred
 
     // Clear if closing/closed
     if (channel->mState == CLOSED || channel->mState == CLOSING) {
       channel->mBufferedData.Clear();
       i = UpdateCurrentStreamIndex();
       continue;
     }
 
-    size_t bufferedAmount = channel->GetBufferedAmountLocked();
-    size_t threshold = channel->mBufferedThreshold;
-    bool wasOverThreshold = bufferedAmount >= threshold;
-
     // Send buffered data messages
     // Warning: This will fail in case ndata is inactive and a previously
     //          deallocated data channel has not been closed properly. If you
     //          ever see that no messages can be sent on any channel, this is
     //          likely the cause (an explicit EOR message partially sent whose
     //          remaining chunks are still being waited for).
-    blocked = SendBufferedMessages(channel->mBufferedData);
-    bufferedAmount = channel->GetBufferedAmountLocked();
-
-    // can never fire with default threshold of 0
-    if (wasOverThreshold && bufferedAmount < threshold) {
-      LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
-           channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
-      Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
-          DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD, this, channel)));
-    }
-
-    if (bufferedAmount == 0) {
-      // buffered-to-not-buffered transition; tell the DOM code in case this
-      // makes it available for GC
-      LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
-           channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
-      Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
-          DataChannelOnMessageAvailable::NO_LONGER_BUFFERED, this, channel)));
+    size_t written = 0;
+    blocked = SendBufferedMessages(channel->mBufferedData, &written);
+    if (written) {
+      channel->DecrementBufferedAmount(written);
     }
 
     // Update current stream index
     // Note: If ndata is not active, the outstanding data messages on this
     //       stream need to be sent first before other streams can be used for
     //       sending.
     if (mSendInterleaved || !blocked) {
       i = UpdateCurrentStreamIndex();
@@ -1309,20 +1291,20 @@ bool DataChannelConnection::SendDeferred
   }
   return blocked;
 }
 
 // Called with mLock locked!
 // buffer MUST have at least one item!
 // returns if we're still blocked (true)
 bool DataChannelConnection::SendBufferedMessages(
-    nsTArray<nsAutoPtr<BufferedOutgoingMsg>>& buffer) {
+    nsTArray<nsAutoPtr<BufferedOutgoingMsg>>& buffer, size_t* aWritten) {
   do {
     // Re-send message
-    int error = SendMsgInternal(*buffer[0]);
+    int error = SendMsgInternal(*buffer[0], aWritten);
     switch (error) {
       case 0:
         buffer.RemoveElementAt(0);
         break;
       case EAGAIN:
 #if (EAGAIN != EWOULDBLOCK)
       case EWOULDBLOCK:
 #endif
@@ -2556,17 +2538,17 @@ request_error_cleanup:
   // we'll be destroying the channel, but it never really got set up
   // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
   // Dispatch it to ourselves
   return nullptr;
 }
 
 // Requires mLock to be locked!
 // Returns a POSIX error code directly instead of setting errno.
-int DataChannelConnection::SendMsgInternal(OutgoingMsg& msg) {
+int DataChannelConnection::SendMsgInternal(OutgoingMsg& msg, size_t* aWritten) {
   auto& info = msg.GetInfo().sendv_sndinfo;
   int error;
 
   // EOR set?
   bool eor_set = info.snd_flags & SCTP_EOR ? true : false;
 
   // Send until buffer is empty
   size_t left = msg.GetLeft();
@@ -2594,16 +2576,20 @@ int DataChannelConnection::SendMsgIntern
     // by carefully crafting small enough message chunks.
     ssize_t written = usrsctp_sendv(
         mSocket, msg.GetData(), length, nullptr, 0, (void*)&msg.GetInfo(),
         (socklen_t)sizeof(struct sctp_sendv_spa), SCTP_SENDV_SPA, 0);
     if (written < 0) {
       error = errno;
       goto out;
     }
+
+    if (aWritten) {
+      *aWritten += written;
+    }
     LOG(("Sent buffer (written=%zu, len=%zu, left=%zu)", (size_t)written,
          length, left - (size_t)written));
 
     // TODO: Remove once resolved
     // (https://github.com/sctplab/usrsctp/issues/132)
     if (written == 0) {
       LOG(("@tuexen: usrsctp_sendv returned 0"));
       error = EAGAIN;
@@ -2637,17 +2623,17 @@ out:
   return error;
 }
 
 // Requires mLock to be locked!
 // Returns a POSIX error code directly instead of setting errno.
 // IMPORTANT: Ensure that the buffer passed is guarded by mLock!
 int DataChannelConnection::SendMsgInternalOrBuffer(
     nsTArray<nsAutoPtr<BufferedOutgoingMsg>>& buffer, OutgoingMsg& msg,
-    bool& buffered) {
+    bool& buffered, size_t* aWritten) {
   NS_WARNING_ASSERTION(msg.GetLength() > 0, "Length is 0?!");
 
   int error = 0;
   bool need_buffering = false;
 
   // Note: Main-thread IO, but doesn't block!
   // XXX FIX!  to deal with heavy overruns of JS trying to pass data in
   // (more than the buffersize) queue data onto another thread to do the
@@ -2661,17 +2647,17 @@ int DataChannelConnection::SendMsgIntern
   // queue - which would sit there.  Also, if we later send more data, it
   // would arrive ahead of the buffered message, but if the buffer ever
   // got to 1/2 full, the message would get sent - but at a semi-random
   // time, after other data it was supposed to be in front of.
 
   // Must lock before empty check for similar reasons!
   mLock.AssertCurrentThreadOwns();
   if (buffer.IsEmpty() && (mSendInterleaved || !mPendingType)) {
-    error = SendMsgInternal(msg);
+    error = SendMsgInternal(msg, aWritten);
     switch (error) {
       case 0:
         break;
       case EAGAIN:
 #if (EAGAIN != EWOULDBLOCK)
       case EWOULDBLOCK:
 #endif
         need_buffering = true;
@@ -2734,17 +2720,22 @@ int DataChannelConnection::SendDataMsgIn
     info.sendv_prinfo.pr_value = channel.mPrValue;
     info.sendv_flags |= SCTP_SEND_PRINFO_VALID;
   }
 
   // Create message instance and send
   OutgoingMsg msg(info, data, len);
   MutexAutoLock lock(mLock);
   bool buffered;
-  int error = SendMsgInternalOrBuffer(channel.mBufferedData, msg, buffered);
+  size_t written = 0;
+  int error =
+      SendMsgInternalOrBuffer(channel.mBufferedData, msg, buffered, &written);
+  if (written) {
+    channel.DecrementBufferedAmount(written);
+  }
 
   // Set pending type and stream index (if buffered)
   if (!error && buffered && !mPendingType) {
     mPendingType = PENDING_DATA;
     mCurrentStream = channel.mStream;
   }
   return error;
 }
@@ -3122,38 +3113,96 @@ void DataChannel::SendErrnoToErrorResult
       aRv.Throw(NS_ERROR_DOM_TYPE_ERR);
       break;
     default:
       aRv.Throw(NS_ERROR_DOM_OPERATION_ERR);
       break;
   }
 }
 
+void DataChannel::IncrementBufferedAmount(uint32_t aSize, ErrorResult& aRv) {
+  ASSERT_WEBRTC(NS_IsMainThread());
+  if (mBufferedAmount > UINT32_MAX - aSize) {
+    aRv.Throw(NS_ERROR_FILE_TOO_BIG);
+    return;
+  }
+
+  mBufferedAmount += aSize;
+}
+
+void DataChannel::DecrementBufferedAmount(uint32_t aSize) {
+  mMainThreadEventTarget->Dispatch(NS_NewRunnableFunction(
+      "DataChannel::DecrementBufferedAmount",
+      [this, self = RefPtr<DataChannel>(this), aSize] {
+        MOZ_ASSERT(aSize <= mBufferedAmount);
+        bool wasLow = mBufferedAmount <= mBufferedThreshold;
+        mBufferedAmount -= aSize;
+        if (!wasLow && mBufferedAmount <= mBufferedThreshold) {
+          LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
+               mLabel.get(), mProtocol.get(), mStream));
+          mListener->OnBufferLow(mContext);
+        }
+        if (mBufferedAmount == 0) {
+          LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
+               mLabel.get(), mProtocol.get(), mStream));
+          mListener->NotBuffered(mContext);
+        }
+      }));
+}
+
 void DataChannel::SendMsg(const nsACString& aMsg, ErrorResult& aRv) {
   if (!EnsureValidStream(aRv)) {
     return;
   }
 
   SendErrnoToErrorResult(mConnection->SendMsg(mStream, aMsg), aRv);
+  if (!aRv.Failed()) {
+    IncrementBufferedAmount(aMsg.Length(), aRv);
+  }
 }
 
 void DataChannel::SendBinaryMsg(const nsACString& aMsg, ErrorResult& aRv) {
   if (!EnsureValidStream(aRv)) {
     return;
   }
 
   SendErrnoToErrorResult(mConnection->SendBinaryMsg(mStream, aMsg), aRv);
+  if (!aRv.Failed()) {
+    IncrementBufferedAmount(aMsg.Length(), aRv);
+  }
 }
 
-void DataChannel::SendBinaryStream(nsIInputStream* aBlob, ErrorResult& aRv) {
+void DataChannel::SendBinaryBlob(dom::Blob& aBlob, ErrorResult& aRv) {
   if (!EnsureValidStream(aRv)) {
     return;
   }
 
-  SendErrnoToErrorResult(mConnection->SendBlob(mStream, aBlob), aRv);
+  uint64_t msgLength = aBlob.GetSize(aRv);
+  if (aRv.Failed()) {
+    return;
+  }
+
+  if (msgLength > UINT32_MAX) {
+    aRv.Throw(NS_ERROR_FILE_TOO_BIG);
+    return;
+  }
+
+  // We convert to an nsIInputStream here, because Blob is not threadsafe, and
+  // we don't convert it earlier because we need to know how large this is so we
+  // can update bufferedAmount.
+  nsCOMPtr<nsIInputStream> msgStream;
+  aBlob.CreateInputStream(getter_AddRefs(msgStream), aRv);
+  if (NS_WARN_IF(aRv.Failed())) {
+    return;
+  }
+
+  SendErrnoToErrorResult(mConnection->SendBlob(mStream, msgStream), aRv);
+  if (!aRv.Failed()) {
+    IncrementBufferedAmount(msgLength, aRv);
+  }
 }
 
 dom::Nullable<uint16_t> DataChannel::GetMaxPacketLifeTime() const {
   if (mPrPolicy == SCTP_PR_SCTP_TTL) {
     return dom::Nullable<uint16_t>(mPrValue);
   }
   return dom::Nullable<uint16_t>();
 }
@@ -3188,32 +3237,16 @@ void DataChannel::AppReady() {
                  "Shouldn't have queued messages if not WAITING_TO_OPEN");
   }
   mQueuedMessages.Clear();
   mQueuedMessages.Compact();
   // We never use it again...  We could even allocate the array in the odd
   // cases we need it.
 }
 
-size_t DataChannel::GetBufferedAmountLocked() const {
-  size_t buffered = 0;
-
-  for (auto& msg : mBufferedData) {
-    buffered += msg->GetLeft();
-  }
-  // XXX Note: per Michael Tuexen, there's no way to currently get the buffered
-  // amount from the SCTP stack for a single stream.  It is on their to-do
-  // list, and once we import a stack with support for that, we'll need to
-  // add it to what we buffer.  Also we'll need to ask for notification of a
-  // per- stream buffer-low event and merge that into the handling of buffer-low
-  // (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
-
-  return buffered;
-}
-
 uint32_t DataChannel::GetBufferedAmountLowThreshold() {
   return mBufferedThreshold;
 }
 
 // Never fire immediately, as it's defined to fire on transitions, not state
 void DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold) {
   mBufferedThreshold = aThreshold;
 }
--- a/netwerk/sctp/datachannel/DataChannel.h
+++ b/netwerk/sctp/datachannel/DataChannel.h
@@ -15,17 +15,17 @@
 #include <errno.h>
 #include "nsISupports.h"
 #include "nsCOMPtr.h"
 #include "mozilla/WeakPtr.h"
 #include "nsString.h"
 #include "nsThreadUtils.h"
 #include "nsTArray.h"
 #include "nsDeque.h"
-#include "nsIInputStream.h"
+#include "mozilla/dom/Blob.h"
 #include "mozilla/Mutex.h"
 #include "DataChannelProtocol.h"
 #include "DataChannelListener.h"
 #include "mozilla/net/NeckoTargetHolder.h"
 #ifdef SCTP_DTLS_SUPPORTED
 #  include "mtransport/sigslot.h"
 #  include "mtransport/transportlayer.h"  // For TransportLayer::State
 #endif
@@ -240,20 +240,22 @@ class DataChannelConnection final : publ
   uint32_t UpdateCurrentStreamIndex();
   uint32_t GetCurrentStreamIndex();
   int SendControlMessage(const uint8_t* data, uint32_t len, uint16_t stream);
   int SendOpenAckMessage(uint16_t stream);
   int SendOpenRequestMessage(const nsACString& label,
                              const nsACString& protocol, uint16_t stream,
                              bool unordered, uint16_t prPolicy,
                              uint32_t prValue);
-  bool SendBufferedMessages(nsTArray<nsAutoPtr<BufferedOutgoingMsg>>& buffer);
-  int SendMsgInternal(OutgoingMsg& msg);
+  bool SendBufferedMessages(nsTArray<nsAutoPtr<BufferedOutgoingMsg>>& buffer,
+                            size_t* aWritten);
+  int SendMsgInternal(OutgoingMsg& msg, size_t* aWritten);
   int SendMsgInternalOrBuffer(nsTArray<nsAutoPtr<BufferedOutgoingMsg>>& buffer,
-                              OutgoingMsg& msg, bool& buffered);
+                              OutgoingMsg& msg, bool& buffered,
+                              size_t* aWritten);
   int SendDataMsgInternalOrBuffer(DataChannel& channel, const uint8_t* data,
                                   size_t len, uint32_t ppid);
   int SendDataMsg(DataChannel& channel, const uint8_t* data, size_t len,
                   uint32_t ppidPartial, uint32_t ppidFinal);
   int SendDataMsgCommon(uint16_t stream, const nsACString& aMsg, bool isBinary);
 
   void DeliverQueuedData(uint16_t stream);
 
@@ -373,18 +375,18 @@ class DataChannel {
         mProtocol(protocol),
         mState(state),
         mStream(stream),
         mPrPolicy(policy),
         mPrValue(value),
         mFlags(flags),
         mId(0),
         mIsRecvBinary(false),
-        mBufferedThreshold(0)  // default from spec
-        ,
+        mBufferedThreshold(0),  // default from spec
+        mBufferedAmount(0),
         mMainThreadEventTarget(connection->GetNeckoTarget()) {
     NS_ASSERTION(mConnection, "NULL connection");
   }
 
  private:
   ~DataChannel();
 
  public:
@@ -410,45 +412,35 @@ class DataChannel {
 
   // Send a string
   void SendMsg(const nsACString& aMsg, ErrorResult& aRv);
 
   // Send a binary message (TypedArray)
   void SendBinaryMsg(const nsACString& aMsg, ErrorResult& aRv);
 
   // Send a binary blob
-  void SendBinaryStream(nsIInputStream* aBlob, ErrorResult& aRv);
+  void SendBinaryBlob(dom::Blob& aBlob, ErrorResult& aRv);
 
   uint16_t GetType() { return mPrPolicy; }
 
   dom::Nullable<uint16_t> GetMaxPacketLifeTime() const;
 
   dom::Nullable<uint16_t> GetMaxRetransmits() const;
 
   bool GetOrdered() {
     return !(mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED);
   }
 
+  void IncrementBufferedAmount(uint32_t aSize, ErrorResult& aRv);
+  void DecrementBufferedAmount(uint32_t aSize);
+
   // Amount of data buffered to send
   uint32_t GetBufferedAmount() {
-    if (!mConnection) {
-      return 0;
-    }
-
-    MutexAutoLock lock(mConnection->mLock);
-    size_t buffered = GetBufferedAmountLocked();
-
-#if (SIZE_MAX > UINT32_MAX)
-    if (buffered >
-        UINT32_MAX) {  // paranoia - >4GB buffered is very very unlikely
-      buffered = UINT32_MAX;
-    }
-#endif
-
-    return buffered;
+    MOZ_ASSERT(NS_IsMainThread());
+    return mBufferedAmount;
   }
 
   // Trigger amount for generating BufferedAmountLow events
   uint32_t GetBufferedAmountLowThreshold();
   void SetBufferedAmountLowThreshold(uint32_t aThreshold);
 
   // Find out state
   uint16_t GetReadyState() {
@@ -475,30 +467,32 @@ class DataChannel {
   DataChannelListener* mListener;
   nsCOMPtr<nsISupports> mContext;
 
  private:
   friend class DataChannelOnMessageAvailable;
   friend class DataChannelConnection;
 
   nsresult AddDataToBinaryMsg(const char* data, uint32_t size);
-  size_t GetBufferedAmountLocked() const;
   bool EnsureValidStream(ErrorResult& aRv);
 
   RefPtr<DataChannelConnection> mConnection;
   nsCString mLabel;
   nsCString mProtocol;
   uint16_t mState;
   uint16_t mStream;
   uint16_t mPrPolicy;
   uint32_t mPrValue;
   uint32_t mFlags;
   uint32_t mId;
   bool mIsRecvBinary;
   size_t mBufferedThreshold;
+  // Read/written on main only. Decremented via message-passing, because the
+  // spec requires us to queue a task for this.
+  size_t mBufferedAmount;
   nsCString mRecvBuffer;
   nsTArray<nsAutoPtr<BufferedOutgoingMsg>>
       mBufferedData;  // GUARDED_BY(mConnection->mLock)
   nsTArray<nsCOMPtr<nsIRunnable>> mQueuedMessages;
   nsCOMPtr<nsIEventTarget> mMainThreadEventTarget;
 };
 
 // used to dispatch notifications of incoming data to the main thread
@@ -509,18 +503,16 @@ class DataChannelOnMessageAvailable : pu
   enum {
     ON_CONNECTION,
     ON_DISCONNECTED,
     ON_CHANNEL_CREATED,
     ON_CHANNEL_OPEN,
     ON_CHANNEL_CLOSED,
     ON_DATA_STRING,
     ON_DATA_BINARY,
-    BUFFER_LOW_THRESHOLD,
-    NO_LONGER_BUFFERED,
   }; /* types */
 
   DataChannelOnMessageAvailable(
       int32_t aType, DataChannelConnection* aConnection, DataChannel* aChannel,
       nsCString& aData)  // XXX this causes inefficiency
       : Runnable("DataChannelOnMessageAvailable"),
         mType(aType),
         mChannel(aChannel),
@@ -556,19 +548,17 @@ class DataChannelOnMessageAvailable : pu
     // Note: calling the listeners can indirectly cause the listeners to be
     // made available for GC (by removing event listeners), especially for
     // OnChannelClosed().  We hold a ref to the Channel and the listener
     // while calling this.
     switch (mType) {
       case ON_DATA_STRING:
       case ON_DATA_BINARY:
       case ON_CHANNEL_OPEN:
-      case ON_CHANNEL_CLOSED:
-      case BUFFER_LOW_THRESHOLD:
-      case NO_LONGER_BUFFERED: {
+      case ON_CHANNEL_CLOSED: {
         MutexAutoLock lock(mChannel->mListenerLock);
         if (!mChannel->mListener) {
           DATACHANNEL_LOG((
               "DataChannelOnMessageAvailable (%d) with null Listener!", mType));
           return NS_OK;
         }
 
         switch (mType) {
@@ -580,22 +570,16 @@ class DataChannelOnMessageAvailable : pu
                                                           mData);
             break;
           case ON_CHANNEL_OPEN:
             mChannel->mListener->OnChannelConnected(mChannel->mContext);
             break;
           case ON_CHANNEL_CLOSED:
             mChannel->mListener->OnChannelClosed(mChannel->mContext);
             break;
-          case BUFFER_LOW_THRESHOLD:
-            mChannel->mListener->OnBufferLow(mChannel->mContext);
-            break;
-          case NO_LONGER_BUFFERED:
-            mChannel->mListener->NotBuffered(mChannel->mContext);
-            break;
         }
         break;
       }
       case ON_DISCONNECTED:
         // If we've disconnected, make sure we close all the streams - from
         // mainthread!
         mConnection->CloseAll();
         MOZ_FALLTHROUGH;