Bug 1178091: Implement RTCDataChannel BufferedAmountLowThreshold and bufferedamountlow event r=smaug,drno
authorRandell Jesup <rjesup@jesup.org>
Mon, 28 Sep 2015 19:02:23 -0400
changeset 264859 23cd3f0edf0af2974c1c53a6e8f913733561be53
parent 264858 3b2c73d50f5a251a7b24893da545386fae8a5ff1
child 264860 40e2d33b759e2032d2bbde4aa70427c0e52eedbb
push id29450
push usercbook@mozilla.com
push dateTue, 29 Sep 2015 10:00:39 +0000
treeherdermozilla-central@acdb22976ff8 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerssmaug, drno
bugs1178091
milestone44.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1178091: Implement RTCDataChannel BufferedAmountLowThreshold and bufferedamountlow event r=smaug,drno
dom/base/nsDOMDataChannel.cpp
dom/base/nsDOMDataChannel.h
dom/base/nsGkAtomList.h
dom/webidl/DataChannel.webidl
netwerk/sctp/datachannel/DataChannel.cpp
netwerk/sctp/datachannel/DataChannel.h
netwerk/sctp/datachannel/DataChannelListener.h
--- a/dom/base/nsDOMDataChannel.cpp
+++ b/dom/base/nsDOMDataChannel.cpp
@@ -208,23 +208,35 @@ nsDOMDataChannel::GetReadyState(nsAStrin
 }
 
 uint32_t
 nsDOMDataChannel::BufferedAmount() const
 {
   return mDataChannel->GetBufferedAmount();
 }
 
+uint32_t
+nsDOMDataChannel::BufferedAmountLowThreshold() const
+{
+  return mDataChannel->GetBufferedAmountLowThreshold();
+}
+
 NS_IMETHODIMP
 nsDOMDataChannel::GetBufferedAmount(uint32_t* aBufferedAmount)
 {
   *aBufferedAmount = BufferedAmount();
   return NS_OK;
 }
 
+void
+nsDOMDataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold)
+{
+  mDataChannel->SetBufferedAmountLowThreshold(aThreshold);
+}
+
 NS_IMETHODIMP nsDOMDataChannel::GetBinaryType(nsAString & aBinaryType)
 {
   switch (mBinaryType) {
   case DC_BINARY_TYPE_ARRAYBUFFER:
     aBinaryType.AssignLiteral("arraybuffer");
     break;
   case DC_BINARY_TYPE_BLOB:
     aBinaryType.AssignLiteral("blob");
@@ -463,16 +475,24 @@ nsDOMDataChannel::OnChannelConnected(nsI
 nsresult
 nsDOMDataChannel::OnChannelClosed(nsISupports* aContext)
 {
   LOG(("%p(%p): %s - Dispatching\n",this,(void*)mDataChannel,__FUNCTION__));
 
   return OnSimpleEvent(aContext, NS_LITERAL_STRING("close"));
 }
 
+nsresult
+nsDOMDataChannel::OnBufferLow(nsISupports* aContext)
+{
+  LOG(("%p(%p): %s - Dispatching\n",this,(void*)mDataChannel,__FUNCTION__));
+
+  return OnSimpleEvent(aContext, NS_LITERAL_STRING("bufferedamountlow"));
+}
+
 void
 nsDOMDataChannel::AppReady()
 {
   mDataChannel->AppReady();
 }
 
 /* static */
 nsresult
--- a/dom/base/nsDOMDataChannel.h
+++ b/dom/base/nsDOMDataChannel.h
@@ -49,21 +49,24 @@ public:
     return GetOwner();
   }
 
   // WebIDL
   // Uses XPIDL GetLabel.
   bool Reliable() const;
   mozilla::dom::RTCDataChannelState ReadyState() const;
   uint32_t BufferedAmount() const;
+  uint32_t BufferedAmountLowThreshold() const;
+  void SetBufferedAmountLowThreshold(uint32_t aThreshold);
   IMPL_EVENT_HANDLER(open)
   IMPL_EVENT_HANDLER(error)
   IMPL_EVENT_HANDLER(close)
   // Uses XPIDL Close.
   IMPL_EVENT_HANDLER(message)
+  IMPL_EVENT_HANDLER(bufferedamountlow)
   mozilla::dom::RTCDataChannelType BinaryType() const
   {
     return static_cast<mozilla::dom::RTCDataChannelType>(
       static_cast<int>(mBinaryType));
   }
   void SetBinaryType(mozilla::dom::RTCDataChannelType aType)
   {
     mBinaryType = static_cast<DataChannelBinaryType>(
@@ -92,16 +95,19 @@ public:
   virtual nsresult OnSimpleEvent(nsISupports* aContext, const nsAString& aName);
 
   virtual nsresult
   OnChannelConnected(nsISupports* aContext) override;
 
   virtual nsresult
   OnChannelClosed(nsISupports* aContext) override;
 
+  virtual nsresult
+  OnBufferLow(nsISupports* aContext) override;
+
   virtual void
   AppReady();
 
 protected:
   ~nsDOMDataChannel();
 
 private:
   void Send(nsIInputStream* aMsgStream, const nsACString& aMsgString,
--- a/dom/base/nsGkAtomList.h
+++ b/dom/base/nsGkAtomList.h
@@ -694,16 +694,17 @@ GK_ATOM(onbeforepaste, "onbeforepaste")
 GK_ATOM(onbeforeevicted, "onbeforeevicted")
 GK_ATOM(onbeforeprint, "onbeforeprint")
 GK_ATOM(onbeforescriptexecute, "onbeforescriptexecute")
 GK_ATOM(onbeforeunload, "onbeforeunload")
 GK_ATOM(onblocked, "onblocked")
 GK_ATOM(onblur, "onblur")
 GK_ATOM(onbroadcast, "onbroadcast")
 GK_ATOM(onbusy, "onbusy")
+GK_ATOM(onbufferedamountlow, "onbufferedamountlow")
 GK_ATOM(oncached, "oncached")
 GK_ATOM(oncallschanged, "oncallschanged")
 GK_ATOM(oncancel, "oncancel")
 GK_ATOM(oncardstatechange, "oncardstatechange")
 GK_ATOM(oncfstatechange, "oncfstatechange")
 GK_ATOM(onchange, "onchange")
 GK_ATOM(oncharacteristicchanged, "oncharacteristicchanged")
 GK_ATOM(onchargingchange, "onchargingchange")
--- a/dom/webidl/DataChannel.webidl
+++ b/dom/webidl/DataChannel.webidl
@@ -16,21 +16,23 @@ enum RTCDataChannelType {
 
 // XXX This interface is called RTCDataChannel in the spec.
 interface DataChannel : EventTarget
 {
   readonly attribute DOMString label;
   readonly attribute boolean reliable;
   readonly attribute RTCDataChannelState readyState;
   readonly attribute unsigned long bufferedAmount;
+  attribute unsigned long bufferedAmountLowThreshold;
   attribute EventHandler onopen;
   attribute EventHandler onerror;
   attribute EventHandler onclose;
   void close();
   attribute EventHandler onmessage;
+  attribute EventHandler onbufferedamountlow;
   attribute RTCDataChannelType binaryType;
   [Throws]
   void send(DOMString data);
   [Throws]
   void send(Blob data);
   [Throws]
   void send(ArrayBuffer data);
   [Throws]
--- a/netwerk/sctp/datachannel/DataChannel.cpp
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -144,17 +144,17 @@ public:
     return NS_OK;
   }
 };
 
 NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
 
 
 BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
-                         uint32_t length) : mLength(length)
+                         size_t length) : mLength(length)
 {
   mSpa = new sctp_sendv_spa;
   *mSpa = spa;
   char *tmp = new char[length]; // infallible malloc!
   memcpy(tmp, data, length);
   mData = tmp;
 }
 
@@ -1118,21 +1118,26 @@ DataChannelConnection::SendDeferredMessa
 
     if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
       bool failed_send = false;
       int32_t result;
 
       if (channel->mState == CLOSED || channel->mState == CLOSING) {
         channel->mBufferedData.Clear();
       }
+
+      uint32_t buffered_amount = channel->GetBufferedAmount();
+      uint32_t threshold = channel->GetBufferedAmountLowThreshold();
+      bool was_over_threshold = buffered_amount >= threshold;
+
       while (!channel->mBufferedData.IsEmpty() &&
              !failed_send) {
         struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
         const char *data           = channel->mBufferedData[0]->mData;
-        uint32_t len               = channel->mBufferedData[0]->mLength;
+        size_t len                 = channel->mBufferedData[0]->mLength;
 
         // SCTP will return EMSGSIZE if the message is bigger than the buffer
         // size (or EAGAIN if there isn't space)
         if ((result = usrsctp_sendv(mSocket, data, len,
                                     nullptr, 0,
                                     (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
                                     SCTP_SENDV_SPA,
                                     0)) < 0) {
@@ -1142,17 +1147,29 @@ DataChannelConnection::SendDeferredMessa
             LOG(("queue full again when resending %d bytes (%d)", len, result));
           } else {
             LOG(("error %d re-sending string", errno));
             failed_send = true;
           }
         } else {
           LOG(("Resent buffer of %d bytes (%d)", len, result));
           sent = true;
+          // In theory this could underflow if >4GB was buffered and re
+          // truncated in GetBufferedAmount(), but this won't cause any problems.
+          buffered_amount -= channel->mBufferedData[0]->mLength;
           channel->mBufferedData.RemoveElementAt(0);
+          // can never fire with default threshold of 0
+          if (was_over_threshold && buffered_amount < threshold) {
+            LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
+                 channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
+            NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
+                                                DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
+                                                this, channel)));
+            was_over_threshold = false;
+          }
         }
       }
       if (channel->mBufferedData.IsEmpty())
         channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
       else
         still_blocked = true;
     }
     if (still_blocked)
@@ -1283,17 +1300,17 @@ DataChannelConnection::DeliverQueuedData
 {
   mLock.AssertCurrentThreadOwns();
 
   uint32_t i = 0;
   while (i < mQueuedData.Length()) {
     // Careful! we may modify the array length from within the loop!
     if (mQueuedData[i]->mStream == stream) {
       LOG(("Delivering queued data for stream %u, length %u",
-           stream, mQueuedData[i]->mLength));
+           stream, (unsigned int) mQueuedData[i]->mLength));
       // Deliver the queued data
       HandleDataMessage(mQueuedData[i]->mPpid,
                         mQueuedData[i]->mData, mQueuedData[i]->mLength,
                         mQueuedData[i]->mStream);
       mQueuedData.RemoveElementAt(i);
       continue; // don't bump index since we removed the element
     }
     i++;
@@ -2194,17 +2211,17 @@ request_error_cleanup:
   // we'll be destroying the channel, but it never really got set up
   // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
   // Dispatch it to ourselves
   return nullptr;
 }
 
 int32_t
 DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
-                                       uint32_t length, uint32_t ppid)
+                                       size_t length, uint32_t ppid)
 {
   uint16_t flags;
   struct sctp_sendv_spa spa;
   int32_t result;
 
   NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
   NS_WARN_IF_FALSE(length > 0, "Length is 0?!");
 
@@ -2262,17 +2279,17 @@ DataChannelConnection::SendMsgInternal(D
     LOG(("error %d sending string", errno));
   }
   return result;
 }
 
 // Handles fragmenting binary messages
 int32_t
 DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
-                                  uint32_t len,
+                                  size_t len,
                                   uint32_t ppid_partial, uint32_t ppid_final)
 {
   // Since there's a limit on network buffer size and no limits on message
   // size, and we don't want to use EOR mode (multiple writes for a
   // message, but all other streams are blocked until you finish sending
   // this message), we need to add application-level fragmentation of large
   // messages.  On a reliable channel, these can be simply rebuilt into a
   // large message.  On an unreliable channel, we can't and don't know how
@@ -2287,17 +2304,17 @@ DataChannelConnection::SendBinary(DataCh
   if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
       channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
       !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
     int32_t sent=0;
     uint32_t origlen = len;
     LOG(("Sending binary message length %u in chunks", len));
     // XXX check flags for out-of-order, or force in-order for large binary messages
     while (len > 0) {
-      uint32_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
+      size_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
       uint32_t ppid;
       len -= sendlen;
       ppid = len > 0 ? ppid_partial : ppid_final;
       LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid));
       // Note that these might end up being deferred and queued.
       sent += SendMsgInternal(channel, data, sendlen, ppid);
       data += sendlen;
     }
@@ -2617,23 +2634,46 @@ DataChannel::AppReady()
   mQueuedMessages.Compact();
   // We never use it again...  We could even allocate the array in the odd
   // cases we need it.
 }
 
 uint32_t
 DataChannel::GetBufferedAmount()
 {
-  uint32_t buffered = 0;
-  for (uint32_t i = 0; i < mBufferedData.Length(); ++i) {
-    buffered += mBufferedData[i]->mLength;
+  size_t buffered = 0;
+  for (auto& buffer : mBufferedData) {
+    buffered += buffer->mLength;
+  }
+  // XXX Note: per Michael Tuexen, there's no way to currently get the buffered
+  // amount from the SCTP stack for a single stream.  It is on their to-do
+  // list, and once we import a stack with support for that, we'll need to
+  // add it to what we buffer.  Also we'll need to ask for notification of a per-
+  // stream buffer-low event and merge that into the handling of buffer-low
+  // (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
+
+  if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely
+    buffered = UINT32_MAX;
   }
   return buffered;
 }
 
+uint32_t
+DataChannel::GetBufferedAmountLowThreshold()
+{
+  return mBufferedThreshold;
+}
+
+// Never fire immediately, as it's defined to fire on transitions, not state
+void
+DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold)
+{
+  mBufferedThreshold = aThreshold;
+}
+
 // Called with mLock locked!
 void
 DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
 {
   if (!mReady &&
       (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
     mQueuedMessages.AppendElement(aMessage);
   } else {
--- a/netwerk/sctp/datachannel/DataChannel.h
+++ b/netwerk/sctp/datachannel/DataChannel.h
@@ -52,22 +52,22 @@ class DataChannelConnection;
 class DataChannel;
 class DataChannelOnMessageAvailable;
 
 // For queuing outgoing messages
 class BufferedMsg
 {
 public:
   BufferedMsg(struct sctp_sendv_spa &spa,const char *data,
-              uint32_t length);
+              size_t length);
   ~BufferedMsg();
 
   struct sctp_sendv_spa *mSpa;
   const char *mData;
-  uint32_t mLength;
+  size_t mLength;
 };
 
 // for queuing incoming data messages before the Open or
 // external negotiation is indicated to us
 class QueuedDataMessage
 {
 public:
   QueuedDataMessage(uint16_t stream, uint32_t ppid,
@@ -208,19 +208,19 @@ private:
   uint16_t FindFreeStream();
   bool RequestMoreStreams(int32_t aNeeded = 16);
   int32_t SendControlMessage(void *msg, uint32_t len, uint16_t stream);
   int32_t SendOpenRequestMessage(const nsACString& label, const nsACString& protocol,
                                  uint16_t stream,
                                  bool unordered, uint16_t prPolicy, uint32_t prValue);
   int32_t SendOpenAckMessage(uint16_t stream);
   int32_t SendMsgInternal(DataChannel *channel, const char *data,
-                          uint32_t length, uint32_t ppid);
+                          size_t length, uint32_t ppid);
   int32_t SendBinary(DataChannel *channel, const char *data,
-                     uint32_t len, uint32_t ppid_partial, uint32_t ppid_final);
+                     size_t len, uint32_t ppid_partial, uint32_t ppid_final);
   int32_t SendMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary);
 
   void DeliverQueuedData(uint16_t stream);
 
   already_AddRefed<DataChannel> OpenFinish(already_AddRefed<DataChannel>&& aChannel);
 
   void StartDefer();
   bool SendDeferredMessages();
@@ -323,16 +323,17 @@ public:
     , mProtocol(protocol)
     , mState(state)
     , mReady(false)
     , mStream(stream)
     , mPrPolicy(policy)
     , mPrValue(value)
     , mFlags(flags)
     , mIsRecvBinary(false)
+    , mBufferedThreshold(0) // default from spec
     {
       NS_ASSERTION(mConnection,"NULL connection");
     }
 
 private:
   ~DataChannel();
 
 public:
@@ -382,16 +383,20 @@ public:
 
   uint16_t GetType() { return mPrPolicy; }
 
   bool GetOrdered() { return !(mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED); }
 
   // Amount of data buffered to send
   uint32_t GetBufferedAmount();
 
+  // Trigger amount for generating BufferedAmountLow events
+  uint32_t GetBufferedAmountLowThreshold();
+  void SetBufferedAmountLowThreshold(uint32_t aThreshold);
+
   // Find out state
   uint16_t GetReadyState()
     {
       if (mConnection) {
         MutexAutoLock lock(mConnection->mLock);
         if (mState == WAITING_TO_OPEN)
           return CONNECTING;
         return mState;
@@ -424,16 +429,17 @@ private:
   uint16_t mState;
   bool     mReady;
   uint16_t mStream;
   uint16_t mPrPolicy;
   uint32_t mPrValue;
   uint32_t mFlags;
   uint32_t mId;
   bool mIsRecvBinary;
+  size_t mBufferedThreshold;
   nsCString mRecvBuffer;
   nsTArray<nsAutoPtr<BufferedMsg> > mBufferedData;
   nsTArray<nsCOMPtr<nsIRunnable> > mQueuedMessages;
 };
 
 // used to dispatch notifications of incoming data to the main thread
 // Patterned on CallOnMessageAvailable in WebSockets
 // Also used to proxy other items to MainThread
@@ -443,16 +449,17 @@ public:
   enum {
     ON_CONNECTION,
     ON_DISCONNECTED,
     ON_CHANNEL_CREATED,
     ON_CHANNEL_OPEN,
     ON_CHANNEL_CLOSED,
     ON_DATA,
     START_DEFER,
+    BUFFER_LOW_THRESHOLD,
   };  /* types */
 
   DataChannelOnMessageAvailable(int32_t     aType,
                                 DataChannelConnection *aConnection,
                                 DataChannel *aChannel,
                                 nsCString   &aData,  // XXX this causes inefficiency
                                 int32_t     aLen)
     : mType(aType),
@@ -484,16 +491,17 @@ public:
 
   NS_IMETHOD Run()
   {
     MOZ_ASSERT(NS_IsMainThread());
     switch (mType) {
       case ON_DATA:
       case ON_CHANNEL_OPEN:
       case ON_CHANNEL_CLOSED:
+      case BUFFER_LOW_THRESHOLD:
         {
           MutexAutoLock lock(mChannel->mListenerLock);
           if (!mChannel->mListener) {
             DATACHANNEL_LOG(("DataChannelOnMessageAvailable (%d) with null Listener!",mType));
             return NS_OK;
           }
 
           switch (mType) {
@@ -505,16 +513,19 @@ public:
               }
               break;
             case ON_CHANNEL_OPEN:
               mChannel->mListener->OnChannelConnected(mChannel->mContext);
               break;
             case ON_CHANNEL_CLOSED:
               mChannel->mListener->OnChannelClosed(mChannel->mContext);
               break;
+            case BUFFER_LOW_THRESHOLD:
+              mChannel->mListener->OnBufferLow(mChannel->mContext);
+              break;
           }
           break;
         }
       case ON_DISCONNECTED:
         // If we've disconnected, make sure we close all the streams - from mainthread!
         mConnection->CloseAll();
         // fall through
       case ON_CHANNEL_CREATED:
--- a/netwerk/sctp/datachannel/DataChannelListener.h
+++ b/netwerk/sctp/datachannel/DataChannelListener.h
@@ -27,13 +27,16 @@ public:
   virtual nsresult OnBinaryMessageAvailable(nsISupports *aContext,
                                             const nsACString& message) = 0;
 
   // Called when the channel is connected
   virtual nsresult OnChannelConnected(nsISupports *aContext) = 0;
 
   // Called when the channel is closed
   virtual nsresult OnChannelClosed(nsISupports *aContext) = 0;
+
+  // Called when the BufferedAmount drops below the BufferedAmountLowThreshold
+  virtual nsresult OnBufferLow(nsISupports *aContext) = 0;
 };
 
 }
 
 #endif